Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class Finding(
* Keeps the track of the workflow-monitor exact execution.
* Used for filtering the data when chaining monitors in a workflow.
*/
val executionId: String? = null
val executionId: String? = null,
val additionalFields: Map<String, Any>? = mapOf()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs plz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add version awareness to avoid serde issues

) : Writeable, ToXContent {

constructor(
Expand All @@ -46,7 +47,8 @@ class Finding(
index = index,
docLevelQueries = docLevelQueries,
timestamp = timestamp,
executionId = null
executionId = null,
additionalFields = null
)

@Throws(IOException::class)
Expand All @@ -59,7 +61,8 @@ class Finding(
index = sin.readString(),
docLevelQueries = sin.readList((DocLevelQuery)::readFrom),
timestamp = sin.readInstant(),
executionId = sin.readOptionalString()
executionId = sin.readOptionalString(),
additionalFields = sin.readMap()
)

fun asTemplateArg(): Map<String, Any?> {
Expand Down Expand Up @@ -87,6 +90,7 @@ class Finding(
.field(QUERIES_FIELD, docLevelQueries.toTypedArray())
.field(TIMESTAMP_FIELD, timestamp.toEpochMilli())
.field(EXECUTION_ID_FIELD, executionId)
.field(ADDITIONAL_FIELDS_FIELD, additionalFields)
builder.endObject()
return builder
}
Expand All @@ -102,6 +106,7 @@ class Finding(
out.writeCollection(docLevelQueries)
out.writeInstant(timestamp)
out.writeOptionalString(executionId)
out.writeMap(additionalFields)
}

companion object {
Expand All @@ -115,6 +120,7 @@ class Finding(
const val TIMESTAMP_FIELD = "timestamp"
const val EXECUTION_ID_FIELD = "execution_id"
const val NO_ID = ""
const val ADDITIONAL_FIELDS_FIELD = "additional_fields"

@JvmStatic
@Throws(IOException::class)
Expand All @@ -128,6 +134,7 @@ class Finding(
val queries: MutableList<DocLevelQuery> = mutableListOf()
lateinit var timestamp: Instant
var executionId: String? = null
var additionalFields: Map<String, Any>? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -161,6 +168,7 @@ class Finding(
timestamp = requireNotNull(xcp.instant())
}
EXECUTION_ID_FIELD -> executionId = xcp.textOrNull()
ADDITIONAL_FIELDS_FIELD -> additionalFields = xcp.map()
}
}

Expand All @@ -173,7 +181,8 @@ class Finding(
index = index,
docLevelQueries = queries,
timestamp = timestamp,
executionId = executionId
executionId = executionId,
additionalFields = additionalFields
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ data class Monitor(
val dataSources: DataSources = DataSources(),
val deleteQueryIndexInEveryRun: Boolean? = false,
val shouldCreateSingleAlertForFindings: Boolean? = false,
val owner: String? = "alerting"
val owner: String? = "alerting",
val metadataForFindings: List<String>? = listOf()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadataForFindings -> contextFields? findingFields?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java doc

) : ScheduledJob {

override val type = MONITOR_TYPE
Expand Down Expand Up @@ -123,7 +124,8 @@ data class Monitor(
} else {
false
},
owner = sin.readOptionalString()
owner = sin.readOptionalString(),
metadataForFindings = sin.readOptionalStringList()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add version awareness to avoid serde issues

)

// This enum classifies different Monitors
Expand Down Expand Up @@ -185,6 +187,7 @@ data class Monitor(
builder.field(DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD, deleteQueryIndexInEveryRun)
builder.field(SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD, shouldCreateSingleAlertForFindings)
builder.field(OWNER_FIELD, owner)
builder.field(METADATA_FOR_FINDINGS_FIELD, metadataForFindings)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -242,6 +245,7 @@ data class Monitor(
out.writeOptionalBoolean(shouldCreateSingleAlertForFindings)
}
out.writeOptionalString(owner)
out.writeOptionalStringCollection(metadataForFindings)
}

companion object {
Expand All @@ -264,6 +268,7 @@ data class Monitor(
const val DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD = "delete_query_index_in_every_run"
const val SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD = "should_create_single_alert_for_findings"
const val OWNER_FIELD = "owner"
const val METADATA_FOR_FINDINGS_FIELD = "metadata_for_findings"
val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}")

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
Expand Down Expand Up @@ -294,6 +299,7 @@ data class Monitor(
var deleteQueryIndexInEveryRun = false
var delegateMonitor = false
var owner = "alerting"
var metadataForFindings: MutableList<String> = mutableListOf()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -357,6 +363,17 @@ data class Monitor(
xcp.booleanValue()
}
OWNER_FIELD -> owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text()
METADATA_FOR_FINDINGS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)

while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
metadataForFindings.add(xcp.text())
}
}
else -> {
xcp.skipChildren()
}
Expand Down Expand Up @@ -385,7 +402,8 @@ data class Monitor(
dataSources,
deleteQueryIndexInEveryRun,
delegateMonitor,
owner
owner,
metadataForFindings
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ internal class GetFindingsResponseTests {
"monitor_name1",
"test_index1",
listOf(DocLevelQuery("1", "myQuery", listOf(), "fieldA:valABC", listOf())),
Instant.now()
Instant.now(),
additionalFields = mapOf(Pair("field1", 1), Pair("field2", "value"))
)
val findingDocument1 = FindingDocument("test_index1", "doc1", true, "document 1 payload")
val findingDocument2 = FindingDocument("test_index1", "doc2", true, "document 2 payload")
val findingDocument3 = FindingDocument("test_index1", "doc3", true, "document 3 payload")
val findingDocument4 = FindingDocument(
"test_index1",
"doc4",
true,
"document 4 payload"
)

val findingWithDocs1 = FindingWithDocs(finding1, listOf(findingDocument1, findingDocument2, findingDocument3))
val findingWithDocs1 = FindingWithDocs(finding1, listOf(findingDocument1, findingDocument2, findingDocument3, findingDocument4))

// Alerting GetFindingsResponse mock #2

Expand All @@ -43,12 +50,19 @@ internal class GetFindingsResponseTests {
"monitor_name2",
"test_index2",
listOf(DocLevelQuery("1", "myQuery", listOf(), "fieldA:valABC", listOf())),
Instant.now()
Instant.now(),
additionalFields = mapOf(Pair("field1", 1), Pair("field2", "value"))
)
val findingDocument21 = FindingDocument("test_index2", "doc21", true, "document 21 payload")
val findingDocument22 = FindingDocument("test_index2", "doc22", true, "document 22 payload")
val findingDocument24 = FindingDocument(
"test_index2",
"doc22",
true,
"document 22 payload"
)

val findingWithDocs2 = FindingWithDocs(finding2, listOf(findingDocument21, findingDocument22))
val findingWithDocs2 = FindingWithDocs(finding2, listOf(findingDocument21, findingDocument22, findingDocument24))

val req = GetFindingsResponse(RestStatus.OK, 2, listOf(findingWithDocs1, findingWithDocs2))
Assertions.assertNotNull(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.commons.alerting.randomBucketLevelTrigger
import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult
import org.opensearch.commons.alerting.randomChainedAlertTrigger
import org.opensearch.commons.alerting.randomDocLevelQuery
import org.opensearch.commons.alerting.randomDocumentLevelMonitor
import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult
import org.opensearch.commons.alerting.randomDocumentLevelTrigger
import org.opensearch.commons.alerting.randomInputRunResults
Expand Down Expand Up @@ -113,6 +114,20 @@ class WriteableTests {
Assertions.assertEquals(newWorkflow, workflow, "Round tripping Workflow failed")
}

@Test
fun `test query-level monitor with metadata for findings as stream`() {
val monitor = randomDocumentLevelMonitor().copy(
inputs = listOf(DocLevelMonitorInput(indices = listOf("<test-{now/d}>"), queries = emptyList())),
triggers = emptyList(),
metadataForFindings = listOf("field1", "field2")
)
val out = BytesStreamOutput()
monitor.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newMonitor = Monitor(sin)
Assertions.assertEquals(monitor, newMonitor, "Round tripping QueryLevelMonitor doesn't work")
}

@Test
fun `test query-level trigger as stream`() {
val trigger = randomQueryLevelTrigger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.commons.alerting.randomAlert
import org.opensearch.commons.alerting.randomBucketLevelMonitor
import org.opensearch.commons.alerting.randomBucketLevelTrigger
import org.opensearch.commons.alerting.randomDocLevelQuery
import org.opensearch.commons.alerting.randomDocumentLevelMonitor
import org.opensearch.commons.alerting.randomQueryLevelMonitor
import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser
import org.opensearch.commons.alerting.randomQueryLevelTrigger
Expand Down Expand Up @@ -127,6 +128,7 @@ class XContentTests {
}
}

@Test
fun `test query-level monitor parsing`() {
val monitor = randomQueryLevelMonitor()

Expand All @@ -135,6 +137,19 @@ class XContentTests {
assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor)
}

@Test
fun `test doc-level monitor parsing`() {
val monitor = randomDocumentLevelMonitor().copy(
inputs = listOf(DocLevelMonitorInput(indices = listOf("<test-{now/d}>"), queries = emptyList())),
triggers = emptyList(),
metadataForFindings = listOf("field1", "field2")
)

val monitorString = monitor.toJsonStringWithUser()
val parsedMonitor = Monitor.parse(parser(monitorString))
assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor)
}

@Test
fun `test monitor parsing with no name`() {
val monitorStringWithoutName = """
Expand Down