Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@

package org.opensearch.commons.alerting.action

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.Alert.Companion.NO_VERSION
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.seqno.SequenceNumbers
import java.io.EOFException
import java.io.IOException
import java.time.Instant
import java.time.temporal.ChronoUnit

class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

val monitor: Monitor
val dryRun: Boolean
val monitorMetadata: MonitorMetadata
Expand All @@ -28,6 +38,11 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
val shardIds: List<ShardId>
val concreteIndicesSeenSoFar: List<String>
val workflowRunContext: WorkflowRunContext?
val hasSerializationFailed: Boolean

companion object {
val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
}

constructor(
monitor: Monitor,
Expand All @@ -47,21 +62,70 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
this.shardIds = shardIds
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
this.workflowRunContext = workflowRunContext
require(false == shardIds.isEmpty()) { }
this.hasSerializationFailed = false
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitor = Monitor.readFrom(sin)!!,
dryRun = sin.readBoolean(),
monitorMetadata = MonitorMetadata.readFrom(sin),
executionId = sin.readString(),
shardIds = sin.readList(::ShardId),
concreteIndicesSeenSoFar = sin.readStringList(),
workflowRunContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else { null },
indexExecutionContext = IndexExecutionContext(sin)
constructor(sin: StreamInput) : super() {
var monitorSerializationSucceeded = true
var parsedMonitor = getDummyMonitor()
var parsedDryRun = false
var parsedMonitorMetadata: MonitorMetadata = MonitorMetadata(
"failed_serde",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
"failed_serde",
emptyList(),
emptyMap(),
mutableMapOf()
)
var parsedShardIds: List<ShardId> = emptyList()
var parsedConcreteIndicesSeenSoFar = mutableListOf<String>()
var parsedExecutionId: String = ""
var parsedWorkflowContext: WorkflowRunContext? = null
var parsedIndexExecutionContext: IndexExecutionContext? = null
try {
parsedMonitor = Monitor(sin)
parsedDryRun = sin.readBoolean()
parsedMonitorMetadata = MonitorMetadata.readFrom(sin)
parsedExecutionId = sin.readString()
parsedShardIds = sin.readList(::ShardId)
parsedConcreteIndicesSeenSoFar = sin.readStringList()
parsedWorkflowContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else {
null
}
parsedIndexExecutionContext = IndexExecutionContext(sin)
} catch (e: Exception) {
log.error("Error parsing monitor in Doc level monitor fanout request", e)
monitorSerializationSucceeded = false
log.info("Force consuming stream in Doc level monitor fanout request")
while (sin.read() != 0) {
// read and discard bytes until stream is entirely consumed
try {
sin.readByte()
} catch (_: EOFException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log here as well?

}
}
}

this.monitor = parsedMonitor
this.dryRun = parsedDryRun
this.monitorMetadata = parsedMonitorMetadata
this.executionId = parsedExecutionId
this.shardIds = parsedShardIds
this.concreteIndicesSeenSoFar = parsedConcreteIndicesSeenSoFar
this.workflowRunContext = parsedWorkflowContext
this.indexExecutionContext = parsedIndexExecutionContext
this.hasSerializationFailed = false == monitorSerializationSucceeded
}

private fun getDummyMonitor() = Monitor(
"failed_serde", NO_VERSION, "failed_serde", true,
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(),
DataSources(), false, false, "failed"
)

@Throws(IOException::class)
Expand All @@ -88,14 +152,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
return builder.startObject()
.field("monitor", monitor)
.field("dry_run", dryRun)
.field("execution_id", executionId)
.field("index_execution_context", indexExecutionContext)
.field("shard_ids", shardIds)
.field("concrete_indices", concreteIndicesSeenSoFar)
.field("workflow_run_context", workflowRunContext)
return builder.endObject()
.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.commons.alerting.model

import org.opensearch.Version
import org.opensearch.common.CheckedFunction
import org.opensearch.core.ParseField
import org.opensearch.core.common.io.stream.StreamInput
Expand All @@ -23,7 +24,7 @@ data class DocLevelMonitorInput(
sin.readString(), // description
sin.readStringList(), // indices
sin.readList(::DocLevelQuery), // docLevelQueries
sin.readOptionalBoolean() // fanoutEnabled
if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalBoolean() else true // fanoutEnabled
)

override fun asTemplateArg(): Map<String, Any> {
Expand All @@ -43,7 +44,9 @@ data class DocLevelMonitorInput(
out.writeString(description)
out.writeStringCollection(indices)
out.writeCollection(queries)
out.writeOptionalBoolean(fanoutEnabled)
if (out.version.onOrAfter(Version.V_2_15_0)) {
out.writeOptionalBoolean(fanoutEnabled)
}
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.commons.alerting.model

import org.opensearch.Version
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
Expand Down Expand Up @@ -36,7 +37,7 @@ data class IndexExecutionContext(
concreteIndexNames = sin.readStringList(),
conflictingFields = sin.readStringList(),
docIds = sin.readOptionalStringList(),
findingIds = sin.readOptionalStringList()
findingIds = if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalStringList() else emptyList()
)

override fun writeTo(out: StreamOutput?) {
Expand All @@ -49,7 +50,9 @@ data class IndexExecutionContext(
out.writeStringCollection(concreteIndexNames)
out.writeStringCollection(conflictingFields)
out.writeOptionalStringCollection(docIds)
out.writeOptionalStringCollection(findingIds)
if (out.version.onOrAfter(Version.V_2_15_0)) {
out.writeOptionalStringCollection(findingIds)
}
}

override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.commons.alerting.model

import org.opensearch.Version
import org.opensearch.common.CheckedFunction
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.commons.alerting.util.IndexUtils.Companion.MONITOR_MAX_INPUTS
Expand Down Expand Up @@ -112,8 +113,16 @@ data class Monitor(
} else {
DataSources()
},
deleteQueryIndexInEveryRun = sin.readOptionalBoolean(),
shouldCreateSingleAlertForFindings = sin.readOptionalBoolean(),
deleteQueryIndexInEveryRun = if (sin.version.onOrAfter(Version.V_2_15_0)) {
sin.readOptionalBoolean()
} else {
false
},
shouldCreateSingleAlertForFindings = if (sin.version.onOrAfter(Version.V_2_15_0)) {
sin.readOptionalBoolean()
} else {
false
},
owner = sin.readOptionalString()
)

Expand Down Expand Up @@ -226,8 +235,12 @@ data class Monitor(
out.writeMap(uiMetadata)
out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field
dataSources.writeTo(out)
out.writeOptionalBoolean(deleteQueryIndexInEveryRun)
out.writeOptionalBoolean(shouldCreateSingleAlertForFindings)
if (out.version.onOrAfter(Version.V_2_15_0)) {
out.writeOptionalBoolean(deleteQueryIndexInEveryRun)
}
if (out.version.onOrAfter(Version.V_2_15_0)) {
out.writeOptionalBoolean(shouldCreateSingleAlertForFindings)
}
out.writeOptionalString(owner)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.commons.alerting.model

import org.opensearch.Version
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
Expand All @@ -28,12 +29,12 @@ data class WorkflowRunContext(
}

constructor(sin: StreamInput) : this(
sin.readString(),
sin.readString(),
sin.readOptionalString(),
sin.readMap() as Map<String, List<String>>,
sin.readBoolean(),
sin.readOptionalStringList()
workflowId = sin.readString(),
workflowMetadataId = sin.readString(),
chainedMonitorId = sin.readOptionalString(),
matchingDocIdsPerIndex = sin.readMap() as Map<String, List<String>>,
auditDelegateMonitorAlerts = sin.readBoolean(),
findingIds = if (sin.version.onOrAfter(Version.V_2_15_0)) sin.readOptionalStringList() else emptyList()
)

override fun writeTo(out: StreamOutput) {
Expand All @@ -42,7 +43,9 @@ data class WorkflowRunContext(
out.writeOptionalString(chainedMonitorId)
out.writeMap(matchingDocIdsPerIndex)
out.writeBoolean(auditDelegateMonitorAlerts)
out.writeOptionalStringCollection(findingIds)
if (out.version.onOrAfter(Version.V_2_15_0)) {
out.writeOptionalStringCollection(findingIds)
}
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.ActionExecutionTime
Expand Down Expand Up @@ -88,6 +90,8 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
assertEquals(sin.read(), 0)
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
}

@Test
Expand Down Expand Up @@ -150,5 +154,65 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
assertEquals(sin.read().toString(), sin.read(), 0)
}

@Test
fun `test serde failure returning dummy object instead of exception`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
val monitorMetadata = MonitorMetadata(
"test",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Monitor.NO_ID,
listOf(ActionExecutionTime("", Instant.now())),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
)
val indexExecutionContext = IndexExecutionContext(
listOf(docQuery),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("index" to mutableMapOf("1" to "1")),
"test-index",
"test-index",
listOf("test-index"),
listOf("test-index"),
listOf("test-field"),
listOf("1", "2")
)
val workflowRunContext = WorkflowRunContext(
Workflow.NO_ID,
Workflow.NO_ID,
Monitor.NO_ID,
mutableMapOf("index" to listOf("1")),
true
)
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
false,
monitorMetadata,
UUID.randomUUID().toString(),
indexExecutionContext,
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
listOf("test-index"),
workflowRunContext
)
val out = BytesStreamOutput()
out.writeString(UUID.randomUUID().toString())
docLevelMonitorFanOutRequest.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
assertEquals(sin.read(), 0)
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
}
}