diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index fe5cfe29..96c54058 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -5,21 +5,31 @@ package org.opensearch.commons.alerting.action +import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.commons.alerting.model.Alert.Companion.NO_VERSION +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.IndexExecutionContext +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorMetadata import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.index.shard.ShardId import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.ToXContentObject import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.seqno.SequenceNumbers +import java.io.EOFException import java.io.IOException +import java.time.Instant +import java.time.temporal.ChronoUnit class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { + val monitor: Monitor val dryRun: Boolean val monitorMetadata: MonitorMetadata @@ -28,6 +38,11 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val shardIds: List val concreteIndicesSeenSoFar: List val workflowRunContext: WorkflowRunContext? + val hasSerializationFailed: Boolean + + companion object { + val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java) + } constructor( monitor: Monitor, @@ -47,21 +62,70 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { this.shardIds = shardIds this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar this.workflowRunContext = workflowRunContext - require(false == shardIds.isEmpty()) { } + this.hasSerializationFailed = false } @Throws(IOException::class) - constructor(sin: StreamInput) : this( - monitor = Monitor.readFrom(sin)!!, - dryRun = sin.readBoolean(), - monitorMetadata = MonitorMetadata.readFrom(sin), - executionId = sin.readString(), - shardIds = sin.readList(::ShardId), - concreteIndicesSeenSoFar = sin.readStringList(), - workflowRunContext = if (sin.readBoolean()) { - WorkflowRunContext(sin) - } else { null }, - indexExecutionContext = IndexExecutionContext(sin) + constructor(sin: StreamInput) : super() { + var monitorSerializationSucceeded = true + var parsedMonitor = getDummyMonitor() + var parsedDryRun = false + var parsedMonitorMetadata: MonitorMetadata = MonitorMetadata( + "failed_serde", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + "failed_serde", + emptyList(), + emptyMap(), + mutableMapOf() + ) + var parsedShardIds: List = emptyList() + var parsedConcreteIndicesSeenSoFar = mutableListOf() + var parsedExecutionId: String = "" + var parsedWorkflowContext: WorkflowRunContext? = null + var parsedIndexExecutionContext: IndexExecutionContext? = null + try { + parsedMonitor = Monitor(sin) + parsedDryRun = sin.readBoolean() + parsedMonitorMetadata = MonitorMetadata.readFrom(sin) + parsedExecutionId = sin.readString() + parsedShardIds = sin.readList(::ShardId) + parsedConcreteIndicesSeenSoFar = sin.readStringList() + parsedWorkflowContext = if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else { + null + } + parsedIndexExecutionContext = IndexExecutionContext(sin) + } catch (e: Exception) { + log.error("Error parsing monitor in Doc level monitor fanout request", e) + monitorSerializationSucceeded = false + log.info("Force consuming stream in Doc level monitor fanout request") + while (sin.read() != 0) { + // read and discard bytes until stream is entirely consumed + try { + sin.readByte() + } catch (_: EOFException) { + } + } + } + + this.monitor = parsedMonitor + this.dryRun = parsedDryRun + this.monitorMetadata = parsedMonitorMetadata + this.executionId = parsedExecutionId + this.shardIds = parsedShardIds + this.concreteIndicesSeenSoFar = parsedConcreteIndicesSeenSoFar + this.workflowRunContext = parsedWorkflowContext + this.indexExecutionContext = parsedIndexExecutionContext + this.hasSerializationFailed = false == monitorSerializationSucceeded + } + + private fun getDummyMonitor() = Monitor( + "failed_serde", NO_VERSION, "failed_serde", true, + IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "", + null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(), + DataSources(), false, false, "failed" ) @Throws(IOException::class) @@ -88,7 +152,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { @Throws(IOException::class) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder.startObject() + return builder.startObject() .field("monitor", monitor) .field("dry_run", dryRun) .field("execution_id", executionId) @@ -96,6 +160,6 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { .field("shard_ids", shardIds) .field("concrete_indices", concreteIndicesSeenSoFar) .field("workflow_run_context", workflowRunContext) - return builder.endObject() + .endObject() } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt index fd67007a..88483f27 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/DocLevelMonitorInput.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.common.CheckedFunction import org.opensearch.core.ParseField import org.opensearch.core.common.io.stream.StreamInput @@ -23,7 +24,7 @@ data class DocLevelMonitorInput( sin.readString(), // description sin.readStringList(), // indices sin.readList(::DocLevelQuery), // docLevelQueries - sin.readOptionalBoolean() // fanoutEnabled + if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalBoolean() else true // fanoutEnabled ) override fun asTemplateArg(): Map { @@ -43,7 +44,9 @@ data class DocLevelMonitorInput( out.writeString(description) out.writeStringCollection(indices) out.writeCollection(queries) - out.writeOptionalBoolean(fanoutEnabled) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalBoolean(fanoutEnabled) + } } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt index 4ecf1e67..6b104d13 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable @@ -36,7 +37,7 @@ data class IndexExecutionContext( concreteIndexNames = sin.readStringList(), conflictingFields = sin.readStringList(), docIds = sin.readOptionalStringList(), - findingIds = sin.readOptionalStringList() + findingIds = if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalStringList() else emptyList() ) override fun writeTo(out: StreamOutput?) { @@ -49,7 +50,9 @@ data class IndexExecutionContext( out.writeStringCollection(concreteIndexNames) out.writeStringCollection(conflictingFields) out.writeOptionalStringCollection(docIds) - out.writeOptionalStringCollection(findingIds) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalStringCollection(findingIds) + } } override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index a0a5ed5b..18fdde5c 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.common.CheckedFunction import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.alerting.util.IndexUtils.Companion.MONITOR_MAX_INPUTS @@ -112,8 +113,16 @@ data class Monitor( } else { DataSources() }, - deleteQueryIndexInEveryRun = sin.readOptionalBoolean(), - shouldCreateSingleAlertForFindings = sin.readOptionalBoolean(), + deleteQueryIndexInEveryRun = if (sin.version.onOrAfter(Version.V_2_15_0)) { + sin.readOptionalBoolean() + } else { + false + }, + shouldCreateSingleAlertForFindings = if (sin.version.onOrAfter(Version.V_2_15_0)) { + sin.readOptionalBoolean() + } else { + false + }, owner = sin.readOptionalString() ) @@ -226,8 +235,12 @@ data class Monitor( out.writeMap(uiMetadata) out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field dataSources.writeTo(out) - out.writeOptionalBoolean(deleteQueryIndexInEveryRun) - out.writeOptionalBoolean(shouldCreateSingleAlertForFindings) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalBoolean(deleteQueryIndexInEveryRun) + } + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalBoolean(shouldCreateSingleAlertForFindings) + } out.writeOptionalString(owner) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt index 5d3cd7c1..a83b8815 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.model +import org.opensearch.Version import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable @@ -28,12 +29,12 @@ data class WorkflowRunContext( } constructor(sin: StreamInput) : this( - sin.readString(), - sin.readString(), - sin.readOptionalString(), - sin.readMap() as Map>, - sin.readBoolean(), - sin.readOptionalStringList() + workflowId = sin.readString(), + workflowMetadataId = sin.readString(), + chainedMonitorId = sin.readOptionalString(), + matchingDocIdsPerIndex = sin.readMap() as Map>, + auditDelegateMonitorAlerts = sin.readBoolean(), + findingIds = if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalStringList() else emptyList() ) override fun writeTo(out: StreamOutput) { @@ -42,7 +43,9 @@ data class WorkflowRunContext( out.writeOptionalString(chainedMonitorId) out.writeMap(matchingDocIdsPerIndex) out.writeBoolean(auditDelegateMonitorAlerts) - out.writeOptionalStringCollection(findingIds) + if (out.version.onOrAfter(Version.V_2_15_0)) { + out.writeOptionalStringCollection(findingIds) + } } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index 1ef82f18..75b9b364 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -6,6 +6,8 @@ package org.opensearch.commons.alerting.action import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertTrue import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.ActionExecutionTime @@ -88,6 +90,8 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + assertEquals(sin.read(), 0) + assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed) } @Test @@ -150,5 +154,65 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed) + assertEquals(sin.read().toString(), sin.read(), 0) + } + + @Test + fun `test serde failure returning dummy object instead of exception`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("return true")) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + val monitorMetadata = MonitorMetadata( + "test", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + Monitor.NO_ID, + listOf(ActionExecutionTime("", Instant.now())), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") + ) + val indexExecutionContext = IndexExecutionContext( + listOf(docQuery), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("index" to mutableMapOf("1" to "1")), + "test-index", + "test-index", + listOf("test-index"), + listOf("test-index"), + listOf("test-field"), + listOf("1", "2") + ) + val workflowRunContext = WorkflowRunContext( + Workflow.NO_ID, + Workflow.NO_ID, + Monitor.NO_ID, + mutableMapOf("index" to listOf("1")), + true + ) + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + false, + monitorMetadata, + UUID.randomUUID().toString(), + indexExecutionContext, + listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), + listOf("test-index"), + workflowRunContext + ) + val out = BytesStreamOutput() + out.writeString(UUID.randomUUID().toString()) + docLevelMonitorFanOutRequest.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) + assertEquals(sin.read(), 0) + assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed) } }