Skip to content

Commit 2f9be0a

Browse files
committed
PPL Monitor and Trigger Models
Signed-off-by: Dennis Toepker <[email protected]>
1 parent 0c10473 commit 2f9be0a

File tree

5 files changed

+766
-0
lines changed

5 files changed

+766
-0
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package org.opensearch.commons.alerting.model
2+
3+
import org.opensearch.common.CheckedFunction
4+
import org.opensearch.commons.alerting.model.PPLMonitor.Companion.PPL_MONITOR_TYPE
5+
import org.opensearch.core.ParseField
6+
import org.opensearch.core.common.io.stream.StreamInput
7+
import org.opensearch.core.common.io.stream.StreamOutput
8+
import org.opensearch.core.xcontent.NamedXContentRegistry
9+
import org.opensearch.core.xcontent.XContentParser
10+
import org.opensearch.core.xcontent.XContentParserUtils
11+
import java.io.IOException
12+
import java.time.Instant
13+
14+
interface MonitorV2 : ScheduledJob {
15+
override val id: String
16+
override val version: Long
17+
override val name: String
18+
override val enabled: Boolean
19+
override val schedule: Schedule
20+
override val lastUpdateTime: Instant // required for scheduled job maintenance
21+
override val enabledTime: Instant? // required for scheduled job maintenance
22+
val triggers: List<TriggerV2>
23+
24+
fun asTemplateArg(): Map<String, Any?>
25+
26+
enum class MonitorV2Type(val value: String) {
27+
PPL_MONITOR(PPL_MONITOR_TYPE);
28+
29+
override fun toString(): String {
30+
return value
31+
}
32+
33+
companion object {
34+
fun enumFromString(value: String): MonitorV2Type? {
35+
return MonitorV2Type.entries.find { it.value == value }
36+
}
37+
}
38+
}
39+
40+
companion object {
41+
// scheduled job field names
42+
const val MONITOR_V2_TYPE = "monitor_v2" // scheduled job type is MonitorV2
43+
44+
// field names
45+
const val NAME_FIELD = "name"
46+
const val MONITOR_TYPE_FIELD = "monitor_type"
47+
const val ENABLED_FIELD = "enabled"
48+
const val SCHEDULE_FIELD = "schedule"
49+
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
50+
const val ENABLED_TIME_FIELD = "enabled_time"
51+
const val TRIGGERS_FIELD = "triggers"
52+
53+
// default values
54+
const val NO_ID = ""
55+
const val NO_VERSION = 1L
56+
57+
val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(
58+
ScheduledJob::class.java,
59+
ParseField(MONITOR_V2_TYPE),
60+
CheckedFunction { parse(it) }
61+
)
62+
63+
@JvmStatic
64+
@Throws(IOException::class)
65+
fun parse(xcp: XContentParser): MonitorV2 {
66+
/* parse outer object for monitorV2 type, then delegate to correct monitorV2 parser */
67+
68+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) // outer monitor object start
69+
70+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) // monitor type field name
71+
val monitorTypeText = xcp.currentName()
72+
val monitorType = MonitorV2Type.enumFromString(monitorTypeText)
73+
?: throw IllegalStateException("when parsing MonitorV2, received invalid monitor type: $monitorTypeText")
74+
75+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) // inner monitor object start
76+
77+
return when (monitorType) {
78+
MonitorV2Type.PPL_MONITOR -> PPLMonitor.parse(xcp)
79+
}
80+
}
81+
82+
fun readFrom(sin: StreamInput): MonitorV2 {
83+
return when (val monitorType = sin.readEnum(MonitorV2Type::class.java)) {
84+
MonitorV2Type.PPL_MONITOR -> PPLMonitor(sin)
85+
else -> throw IllegalStateException("Unexpected input \"$monitorType\" when reading MonitorV2")
86+
}
87+
}
88+
89+
fun writeTo(out: StreamOutput, monitorV2: MonitorV2) {
90+
when (monitorV2) {
91+
is PPLMonitor -> {
92+
out.writeEnum(MonitorV2.MonitorV2Type.PPL_MONITOR)
93+
monitorV2.writeTo(out)
94+
}
95+
}
96+
}
97+
}
98+
}
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
package org.opensearch.commons.alerting.model
2+
3+
import org.apache.logging.log4j.LogManager
4+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.ENABLED_FIELD
5+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.ENABLED_TIME_FIELD
6+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.LAST_UPDATE_TIME_FIELD
7+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.MONITOR_TYPE_FIELD
8+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.MONITOR_V2_TYPE
9+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.NAME_FIELD
10+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.NO_ID
11+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.NO_VERSION
12+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.SCHEDULE_FIELD
13+
import org.opensearch.commons.alerting.model.MonitorV2.Companion.TRIGGERS_FIELD
14+
import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID
15+
import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION
16+
import org.opensearch.commons.alerting.util.instant
17+
import org.opensearch.commons.alerting.util.nonOptionalTimeField
18+
import org.opensearch.commons.alerting.util.optionalTimeField
19+
import org.opensearch.core.common.io.stream.StreamInput
20+
import org.opensearch.core.common.io.stream.StreamOutput
21+
import org.opensearch.core.xcontent.ToXContent
22+
import org.opensearch.core.xcontent.XContentBuilder
23+
import org.opensearch.core.xcontent.XContentParser
24+
import org.opensearch.core.xcontent.XContentParserUtils
25+
import java.io.IOException
26+
import java.time.Instant
27+
28+
private val logger = LogManager.getLogger(PPLMonitor::class.java)
29+
30+
// TODO: probably change this to be called PPLSQLMonitor. A PPL Monitor and SQL Monitor
31+
// TODO: would have the exact same functionality, except the choice of language
32+
// TODO: when calling PPL/SQL plugin's execute API would be different.
33+
// TODO: we dont need 2 different monitor types for that, just a simple if check
34+
// TODO: for query language at monitor execution time
35+
data class PPLMonitor(
36+
override val id: String = NO_ID,
37+
override val version: Long = NO_VERSION,
38+
override val name: String,
39+
override val enabled: Boolean,
40+
override val schedule: Schedule,
41+
override val lastUpdateTime: Instant,
42+
override val enabledTime: Instant?,
43+
override val triggers: List<TriggerV2>,
44+
val queryLanguage: QueryLanguage = QueryLanguage.PPL, // default to PPL, SQL not currently supported
45+
val query: String
46+
) : MonitorV2 {
47+
48+
// specify scheduled job type
49+
override val type = MONITOR_V2_TYPE
50+
51+
override fun fromDocument(id: String, version: Long): PPLMonitor = copy(id = id, version = version)
52+
53+
init {
54+
// SQL monitors are not yet supported
55+
if (this.queryLanguage == QueryLanguage.SQL) {
56+
throw IllegalStateException("Monitors with SQL queries are not supported")
57+
}
58+
59+
// for checking trigger ID uniqueness
60+
val triggerIds = mutableSetOf<String>()
61+
triggers.forEach { trigger ->
62+
require(triggerIds.add(trigger.id)) { "Duplicate trigger id: ${trigger.id}. Trigger ids must be unique." }
63+
}
64+
65+
if (enabled) {
66+
requireNotNull(enabledTime)
67+
} else {
68+
require(enabledTime == null)
69+
}
70+
71+
triggers.forEach { trigger ->
72+
require(trigger is PPLTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$PPL_MONITOR_TYPE]" }
73+
}
74+
75+
// TODO: create setting for max triggers and check for max triggers here
76+
}
77+
78+
@Throws(IOException::class)
79+
constructor(sin: StreamInput) : this(
80+
id = sin.readString(),
81+
version = sin.readLong(),
82+
name = sin.readString(),
83+
enabled = sin.readBoolean(),
84+
schedule = Schedule.readFrom(sin),
85+
lastUpdateTime = sin.readInstant(),
86+
enabledTime = sin.readOptionalInstant(),
87+
triggers = sin.readList(TriggerV2::readFrom),
88+
queryLanguage = sin.readEnum(QueryLanguage::class.java),
89+
query = sin.readString()
90+
)
91+
92+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
93+
builder.startObject() // overall start object
94+
95+
// if this is being written as ScheduledJob, add extra object layer and add ScheduledJob
96+
// related metadata, default to false
97+
if (params.paramAsBoolean("with_type", false)) {
98+
builder.startObject(MONITOR_V2_TYPE)
99+
}
100+
101+
// wrap PPLMonitor in outer object named after its monitor type
102+
// required for MonitorV2 XContentParser to first encounter this,
103+
// read in monitor type, then delegate to correct parse() function
104+
builder.startObject(PPL_MONITOR_TYPE) // monitor type start object
105+
106+
builder.field(NAME_FIELD, name)
107+
builder.field(SCHEDULE_FIELD, schedule)
108+
builder.field(ENABLED_FIELD, enabled)
109+
builder.optionalTimeField(ENABLED_TIME_FIELD, enabledTime)
110+
builder.nonOptionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
111+
builder.field(TRIGGERS_FIELD, triggers.toTypedArray())
112+
builder.field(QUERY_LANGUAGE_FIELD, queryLanguage.value)
113+
builder.field(QUERY_FIELD, query)
114+
115+
builder.endObject() // monitor type end object
116+
117+
// if ScheduledJob metadata was added, end the extra object layer that was created
118+
if (params.paramAsBoolean("with_type", false)) {
119+
builder.endObject()
120+
}
121+
122+
builder.endObject() // overall end object
123+
124+
return builder
125+
}
126+
127+
@Throws(IOException::class)
128+
override fun writeTo(out: StreamOutput) {
129+
out.writeString(id)
130+
out.writeLong(version)
131+
out.writeString(name)
132+
out.writeBoolean(enabled)
133+
if (schedule is CronSchedule) {
134+
out.writeEnum(Schedule.TYPE.CRON)
135+
} else {
136+
out.writeEnum(Schedule.TYPE.INTERVAL)
137+
}
138+
out.writeInstant(lastUpdateTime)
139+
out.writeOptionalInstant(enabledTime)
140+
out.writeVInt(triggers.size)
141+
triggers.forEach {
142+
out.writeEnum(TriggerV2.TriggerV2Type.PPL_TRIGGER)
143+
it.writeTo(out)
144+
}
145+
out.writeEnum(queryLanguage)
146+
out.writeString(query)
147+
}
148+
149+
override fun asTemplateArg(): Map<String, Any?> {
150+
return mapOf(
151+
_ID to id,
152+
_VERSION to version,
153+
NAME_FIELD to name,
154+
ENABLED_FIELD to enabled,
155+
SCHEDULE_FIELD to schedule,
156+
LAST_UPDATE_TIME_FIELD to lastUpdateTime.toEpochMilli(),
157+
ENABLED_TIME_FIELD to enabledTime?.toEpochMilli(),
158+
TRIGGERS_FIELD to triggers,
159+
QUERY_LANGUAGE_FIELD to queryLanguage.value,
160+
QUERY_FIELD to query
161+
)
162+
}
163+
164+
enum class QueryLanguage(val value: String) {
165+
PPL(PPL_QUERY_LANGUAGE),
166+
SQL(SQL_QUERY_LANGUAGE);
167+
168+
companion object {
169+
fun enumFromString(value: String): QueryLanguage? = QueryLanguage.entries.firstOrNull { it.value == value }
170+
}
171+
}
172+
173+
companion object {
174+
// monitor type name
175+
const val PPL_MONITOR_TYPE = "ppl_monitor" // TODO: eventually change to SQL_PPL_MONITOR_TYPE
176+
177+
// query languages
178+
const val PPL_QUERY_LANGUAGE = "ppl"
179+
const val SQL_QUERY_LANGUAGE = "sql"
180+
181+
// field names
182+
const val QUERY_LANGUAGE_FIELD = "query_language"
183+
const val QUERY_FIELD = "query"
184+
185+
@JvmStatic
186+
@JvmOverloads
187+
@Throws(IOException::class)
188+
fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): PPLMonitor {
189+
var name: String? = null
190+
var monitorType: String = PPL_MONITOR_TYPE
191+
var enabled = true
192+
var schedule: Schedule? = null
193+
var lastUpdateTime: Instant? = null
194+
var enabledTime: Instant? = null
195+
val triggers: MutableList<TriggerV2> = mutableListOf()
196+
var queryLanguage: QueryLanguage = QueryLanguage.PPL // default to PPL
197+
var query: String? = null
198+
199+
/* parse */
200+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
201+
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
202+
val fieldName = xcp.currentName()
203+
xcp.nextToken()
204+
205+
when (fieldName) {
206+
NAME_FIELD -> name = xcp.text()
207+
MONITOR_TYPE_FIELD -> monitorType = xcp.text()
208+
ENABLED_FIELD -> enabled = xcp.booleanValue()
209+
SCHEDULE_FIELD -> schedule = Schedule.parse(xcp)
210+
ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
211+
LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant()
212+
TRIGGERS_FIELD -> {
213+
XContentParserUtils.ensureExpectedToken(
214+
XContentParser.Token.START_ARRAY,
215+
xcp.currentToken(),
216+
xcp
217+
)
218+
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
219+
triggers.add(PPLTrigger.parseInner(xcp))
220+
}
221+
}
222+
QUERY_LANGUAGE_FIELD -> {
223+
val input = xcp.text()
224+
val enumMatchResult = QueryLanguage.enumFromString(input)
225+
?: throw IllegalArgumentException("Invalid value for $QUERY_LANGUAGE_FIELD: $input. Supported values are ${QueryLanguage.entries.map { it.value }}")
226+
queryLanguage = enumMatchResult
227+
}
228+
QUERY_FIELD -> query = xcp.text()
229+
else -> throw IllegalArgumentException("Unexpected field \"$fieldName\" when parsing PPL Monitor")
230+
}
231+
}
232+
233+
/* validations */
234+
235+
// TODO: add validations for throttle actions time range
236+
// (see alerting's TransportIndexMonitorAction.validateActionThrottle)
237+
238+
// ensure MonitorV2 XContent being parsed by PPLMonitor class is PPL Monitor type
239+
if (monitorType != PPL_MONITOR_TYPE) {
240+
throw IllegalArgumentException("Invalid monitor type: $monitorType")
241+
}
242+
243+
// ensure there's at least 1 trigger
244+
if (triggers.isEmpty()) {
245+
throw IllegalArgumentException("Monitor must include at least 1 trigger")
246+
}
247+
248+
// if enabled, set time of MonitorV2 creation/update is set as enable time
249+
if (enabled && enabledTime == null) {
250+
enabledTime = Instant.now()
251+
} else if (!enabled) {
252+
enabledTime = null
253+
}
254+
255+
lastUpdateTime = lastUpdateTime ?: Instant.now()
256+
257+
// check for required fields
258+
requireNotNull(name) { "Monitor name is null" }
259+
requireNotNull(schedule) { "Schedule is null" }
260+
requireNotNull(queryLanguage) { "Query language is null" }
261+
requireNotNull(query) { "Query is null" }
262+
requireNotNull(lastUpdateTime) { "Last update time is null" }
263+
264+
if (queryLanguage == QueryLanguage.SQL) {
265+
throw IllegalArgumentException("SQL queries are not supported. Please use a PPL query.")
266+
}
267+
268+
/* return PPLMonitor */
269+
return PPLMonitor(
270+
id,
271+
version,
272+
name,
273+
enabled,
274+
schedule,
275+
lastUpdateTime,
276+
enabledTime,
277+
triggers,
278+
queryLanguage,
279+
query
280+
)
281+
}
282+
}
283+
}

0 commit comments

Comments
 (0)