Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/AlertV2.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package org.opensearch.commons.alerting.model

import org.opensearch.common.lucene.uid.Versions
import org.opensearch.commons.alerting.model.Alert.Companion.ALERT_ID_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.ALERT_VERSION_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.ERROR_MESSAGE_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.EXECUTION_ID_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.MONITOR_ID_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.MONITOR_NAME_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.MONITOR_VERSION_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.NO_ID
import org.opensearch.commons.alerting.model.Alert.Companion.NO_VERSION
import org.opensearch.commons.alerting.model.Alert.Companion.SCHEMA_VERSION_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.SEVERITY_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.TRIGGER_ID_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.TRIGGER_NAME_FIELD
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.commons.alerting.util.instant
import org.opensearch.commons.alerting.util.nonOptionalTimeField
import org.opensearch.commons.alerting.util.optionalTimeField
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException
import java.time.Instant

data class AlertV2(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does Alert not have a state?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tests: tests will be added post internal demo and customer playground, as the goal currently is to see if RBAC and other AuthZ can be implemented before internal and customer demo.

For Alert States: no, Alerts in AlertingV2 will not have states. The general flow of alerts is that a PPLMonitor will fire and forget them. Alerts will then get deleted by a separate Alert cleanup mechanism based on the configured Alert expiration time (AlertExpirer.kt) is the upcoming class that does this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI we should discuss: opensearch-project/security#4500.

We should move away from storing user copies...esp for new use cases

Copy link
Member

@cwperks cwperks Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Roles Injection..used for Alerting monitor runtime..that would still require storing a user copy, however, for visibility and sharing purposes security will be pushing for the new model.

Roles Injection will also need replaced eventually. That is used in cases where the creator of an alerting monitor has DLS restrictions.

That being said, there is a known issue that a contributor has graciously been helping us with but its stuck in review in the alerting repo: opensearch-project/alerting#1917

val id: String = NO_ID,
val version: Long = NO_VERSION,
val schemaVersion: Int = NO_SCHEMA_VERSION,
val monitorId: String,
val monitorName: String,
val monitorVersion: Long,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is monitorVersion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would refer to the version of the monitor this alert was generated from since sometimes the monitor is updated, so the current query might not be applicable anymore to the alert.

// val monitorUser: User?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: cleanup commented code unless intentional. This applies to everywhere else as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I noticed the older alert contained this. Why do we want to remove it now? I believe this helped with rbac support.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments are intentional. In all coming PRs, RBAC related code is commented out as markers for what RBAC data needs to be stored, and where RBAC checks need to be made. There are no plans to remove monitorUser from Alerts

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI we should discuss: opensearch-project/security#4500.

We should move away from storing user copies...esp for new use cases

val triggerId: String,
val triggerName: String,
val queryResults: Map<String, Any>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be the same structure for PPL, SQL, DSL?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For PPL and SQL yes. DSL responses can technically be stored as a Map<String, Any>, but the PPL Alerting logic expects this to follow the format of SQL/PPL Plugin Execute API Response format: https://docs.opensearch.org/latest/search-plugins/sql/sql-ppl-api/#example-request

val triggeredTime: Instant,
val expirationTime: Instant?,
val errorMessage: String? = null,
val severity: String,
val executionId: String? = null
) : Writeable, ToXContent {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(),
version = sin.readLong(),
schemaVersion = sin.readInt(),
monitorId = sin.readString(),
monitorName = sin.readString(),
monitorVersion = sin.readLong(),
// monitorUser = if (sin.readBoolean()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this commented out?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See please: #873 (comment)

// User(sin)
// } else {
// null
// },
triggerId = sin.readString(),
triggerName = sin.readString(),
queryResults = sin.readMap()!!.toMap(),
triggeredTime = sin.readInstant(),
expirationTime = sin.readOptionalInstant(),
errorMessage = sin.readOptionalString(),
severity = sin.readString(),
executionId = sin.readOptionalString()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeInt(schemaVersion)
out.writeString(monitorId)
out.writeString(monitorName)
out.writeLong(monitorVersion)
// out.writeBoolean(monitorUser != null)
// monitorUser?.writeTo(out)
out.writeString(triggerId)
out.writeString(triggerName)
out.writeMap(queryResults)
out.writeInstant(triggeredTime)
out.writeOptionalInstant(expirationTime)
out.writeOptionalString(errorMessage)
out.writeString(severity)
out.writeOptionalString(executionId)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(ALERT_ID_FIELD, id)
.field(ALERT_VERSION_FIELD, version)
.field(MONITOR_ID_FIELD, monitorId)
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(MONITOR_VERSION_FIELD, monitorVersion)
.field(MONITOR_NAME_FIELD, monitorName)
.field(EXECUTION_ID_FIELD, executionId)
.field(TRIGGER_ID_FIELD, triggerId)
.field(TRIGGER_NAME_FIELD, triggerName)
.field(QUERY_RESULTS_FIELD, queryResults)
.field(ERROR_MESSAGE_FIELD, errorMessage)
.field(SEVERITY_FIELD, severity)
.nonOptionalTimeField(TRIGGERED_TIME_FIELD, triggeredTime)
.optionalTimeField(EXPIRATION_TIME_FIELD, expirationTime)
.endObject()

// if (!secure) {
// builder.optionalUserField(MONITOR_USER_FIELD, monitorUser)
// }

return builder
}

fun asTemplateArg(): Map<String, Any?> {
return mapOf(
ALERT_ID_FIELD to id,
ALERT_VERSION_FIELD to version,
ERROR_MESSAGE_FIELD to errorMessage,
EXECUTION_ID_FIELD to executionId,
EXPIRATION_TIME_FIELD to expirationTime?.toEpochMilli(),
SEVERITY_FIELD to severity
)
}

companion object {
const val TRIGGERED_TIME_FIELD = "triggered_time"
const val EXPIRATION_TIME_FIELD = "expiration_time"
const val QUERY_RESULTS_FIELD = "query_results"

@JvmStatic
@JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): AlertV2 {
var schemaVersion = NO_SCHEMA_VERSION
lateinit var monitorId: String
lateinit var monitorName: String
var monitorVersion: Long = Versions.NOT_FOUND
// var monitorUser: User? = null
lateinit var triggerId: String
lateinit var triggerName: String
var queryResults: Map<String, Any> = mapOf()
lateinit var severity: String
var triggeredTime: Instant? = null
var expirationTime: Instant? = null
var errorMessage: String? = null
var executionId: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
MONITOR_ID_FIELD -> monitorId = xcp.text()
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue()
MONITOR_NAME_FIELD -> monitorName = xcp.text()
MONITOR_VERSION_FIELD -> monitorVersion = xcp.longValue()
// MONITOR_USER_FIELD ->
// monitorUser = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
// null
// } else {
// User.parse(xcp)
// }
TRIGGER_ID_FIELD -> triggerId = xcp.text()
TRIGGER_NAME_FIELD -> triggerName = xcp.text()
QUERY_RESULTS_FIELD -> queryResults = xcp.map()
TRIGGERED_TIME_FIELD -> triggeredTime = xcp.instant()
EXPIRATION_TIME_FIELD -> expirationTime = xcp.instant()
ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull()
EXECUTION_ID_FIELD -> executionId = xcp.textOrNull()
SEVERITY_FIELD -> severity = xcp.text()
}
}

return AlertV2(
id = id,
version = version,
schemaVersion = schemaVersion,
monitorId = requireNotNull(monitorId),
monitorName = requireNotNull(monitorName),
monitorVersion = monitorVersion,
// monitorUser = monitorUser,
triggerId = requireNotNull(triggerId),
triggerName = requireNotNull(triggerName),
queryResults = requireNotNull(queryResults),
triggeredTime = requireNotNull(triggeredTime),
expirationTime = expirationTime,
errorMessage = errorMessage,
severity = severity,
executionId = executionId
)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): AlertV2 {
return AlertV2(sin)
}
}
}
102 changes: 102 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/MonitorV2.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.opensearch.commons.alerting.model

import org.opensearch.common.CheckedFunction
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.PPLMonitor.Companion.PPL_MONITOR_TYPE
import org.opensearch.core.ParseField
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.time.Instant

interface MonitorV2 : ScheduledJob {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RFC's model of the v2 monitor is different from this. Can we get them in sync?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MonitorV2 is meant to serve as a more upstream interface. Please see PPLMonitor.kt for a model that matches the RFC.

override val id: String
override val version: Long
override val name: String
override val enabled: Boolean
override val schedule: Schedule
override val lastUpdateTime: Instant // required for scheduled job maintenance
override val enabledTime: Instant? // required for scheduled job maintenance
val triggers: List<TriggerV2>
val schemaVersion: Int // for updating monitors
val lookBackWindow: TimeValue? // how far back to look when querying data during monitor execution

fun asTemplateArg(): Map<String, Any?>

enum class MonitorV2Type(val value: String) {
PPL_MONITOR(PPL_MONITOR_TYPE);

override fun toString(): String {
return value
}

companion object {
fun enumFromString(value: String): MonitorV2Type? {
return MonitorV2Type.entries.find { it.value == value }
}
}
}

companion object {
// scheduled job field names
const val MONITOR_V2_TYPE = "monitor_v2" // scheduled job type is MonitorV2

// field names
const val NAME_FIELD = "name"
const val MONITOR_TYPE_FIELD = "monitor_type"
const val ENABLED_FIELD = "enabled"
const val SCHEDULE_FIELD = "schedule"
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
const val ENABLED_TIME_FIELD = "enabled_time"
const val TRIGGERS_FIELD = "triggers"
const val LOOK_BACK_WINDOW_FIELD = "look_back_window"

// default values
const val NO_ID = ""
const val NO_VERSION = 1L

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(
ScheduledJob::class.java,
ParseField(MONITOR_V2_TYPE),
CheckedFunction { parse(it) }
)

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): MonitorV2 {
/* parse outer object for monitorV2 type, then delegate to correct monitorV2 parser */

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) // outer monitor object start

XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) // monitor type field name
val monitorTypeText = xcp.currentName()
val monitorType = MonitorV2Type.enumFromString(monitorTypeText)
?: throw IllegalStateException("when parsing MonitorV2, received invalid monitor type: $monitorTypeText")

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) // inner monitor object start

return when (monitorType) {
MonitorV2Type.PPL_MONITOR -> PPLMonitor.parse(xcp)
}
}

fun readFrom(sin: StreamInput): MonitorV2 {
return when (val monitorType = sin.readEnum(MonitorV2Type::class.java)) {
MonitorV2Type.PPL_MONITOR -> PPLMonitor(sin)
else -> throw IllegalStateException("Unexpected input \"$monitorType\" when reading MonitorV2")
}
}

fun writeTo(out: StreamOutput, monitorV2: MonitorV2) {
when (monitorV2) {
is PPLMonitor -> {
out.writeEnum(MonitorV2.MonitorV2Type.PPL_MONITOR)
monitorV2.writeTo(out)
}
}
}
}
}
Loading
Loading