@@ -7,17 +7,24 @@ package org.opensearch.commons.alerting.action
77
88import org.opensearch.action.ActionRequest
99import org.opensearch.action.ActionRequestValidationException
10+ import org.opensearch.commons.alerting.model.DataSources
1011import org.opensearch.commons.alerting.model.IndexExecutionContext
12+ import org.opensearch.commons.alerting.model.IntervalSchedule
1113import org.opensearch.commons.alerting.model.Monitor
14+ import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION
1215import org.opensearch.commons.alerting.model.MonitorMetadata
1316import org.opensearch.commons.alerting.model.WorkflowRunContext
17+ import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
1418import org.opensearch.core.common.io.stream.StreamInput
1519import org.opensearch.core.common.io.stream.StreamOutput
1620import org.opensearch.core.index.shard.ShardId
1721import org.opensearch.core.xcontent.ToXContent
1822import org.opensearch.core.xcontent.ToXContentObject
1923import org.opensearch.core.xcontent.XContentBuilder
24+ import org.opensearch.index.seqno.SequenceNumbers
2025import java.io.IOException
26+ import java.time.Instant
27+ import java.time.temporal.ChronoUnit
2128
2229class DocLevelMonitorFanOutRequest : ActionRequest , ToXContentObject {
2330 val monitor: Monitor
@@ -29,6 +36,80 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
2936 val concreteIndicesSeenSoFar: List <String >
3037 val workflowRunContext: WorkflowRunContext ?
3138
39+ companion object {
40+ private fun safeReadMonitor (sin : StreamInput ): Monitor =
41+ try {
42+ Monitor .readFrom(sin)!!
43+ } catch (e: Exception ) {
44+ Monitor (
45+ " failed_serde" , NO_VERSION , " failed_serde" , true ,
46+ IntervalSchedule (1 , ChronoUnit .MINUTES ), Instant .now(), Instant .now(), " " ,
47+ null , NO_SCHEMA_VERSION , emptyList(), emptyList(), emptyMap(),
48+ DataSources (), false , false , " failed"
49+ )
50+ }
51+
52+ private fun safeReadBoolean (sin : StreamInput ): Boolean =
53+ try {
54+ sin.readBoolean()
55+ } catch (e: Exception ) {
56+ false
57+ }
58+
59+ private fun safeReadMonitorMetadata (sin : StreamInput ): MonitorMetadata =
60+ try {
61+ MonitorMetadata .readFrom(sin)
62+ } catch (e: Exception ) {
63+ MonitorMetadata (
64+ " failed_serde" ,
65+ SequenceNumbers .UNASSIGNED_SEQ_NO ,
66+ SequenceNumbers .UNASSIGNED_PRIMARY_TERM ,
67+ " failed_serde" ,
68+ emptyList(),
69+ emptyMap(),
70+ mutableMapOf ()
71+ )
72+ }
73+
74+ private fun safeReadString (sin : StreamInput ): String =
75+ try {
76+ sin.readString()
77+ } catch (e: Exception ) {
78+ " "
79+ }
80+
81+ private fun safeReadShardIds (sin : StreamInput ): List <ShardId > =
82+ try {
83+ sin.readList(::ShardId )
84+ } catch (e: Exception ) {
85+ listOf (ShardId (" failed_serde" , " failed_serde" , 999999 ))
86+ }
87+
88+ private fun safeReadStringList (sin : StreamInput ): List <String > =
89+ try {
90+ sin.readStringList()
91+ } catch (e: Exception ) {
92+ emptyList()
93+ }
94+
95+ private fun safeReadWorkflowRunContext (sin : StreamInput ): WorkflowRunContext ? =
96+ try {
97+ if (sin.readBoolean()) WorkflowRunContext (sin) else null
98+ } catch (e: Exception ) {
99+ null
100+ }
101+
102+ private fun safeReadIndexExecutionContext (sin : StreamInput ): IndexExecutionContext =
103+ try {
104+ IndexExecutionContext (sin)
105+ } catch (e: Exception ) {
106+ IndexExecutionContext (
107+ emptyList(), mutableMapOf (), mutableMapOf (), " failed_serde" , " failed_serde" ,
108+ emptyList(), emptyList(), emptyList(), emptyList(), emptyList()
109+ )
110+ }
111+ }
112+
32113 constructor (
33114 monitor: Monitor ,
34115 dryRun: Boolean ,
@@ -47,21 +128,19 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
47128 this .shardIds = shardIds
48129 this .concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
49130 this .workflowRunContext = workflowRunContext
50- require(false == shardIds.isEmpty ()) { }
131+ require(shardIds.isNotEmpty ()) { " shardIds must not be empty " }
51132 }
52133
53134 @Throws(IOException ::class )
54135 constructor (sin: StreamInput ) : this (
55- monitor = Monitor .readFrom(sin)!! ,
56- dryRun = sin.readBoolean(),
57- monitorMetadata = MonitorMetadata .readFrom(sin),
58- executionId = sin.readString(),
59- shardIds = sin.readList(::ShardId ),
60- concreteIndicesSeenSoFar = sin.readStringList(),
61- workflowRunContext = if (sin.readBoolean()) {
62- WorkflowRunContext (sin)
63- } else { null },
64- indexExecutionContext = IndexExecutionContext (sin)
136+ safeReadMonitor(sin),
137+ safeReadBoolean(sin),
138+ safeReadMonitorMetadata(sin),
139+ safeReadString(sin),
140+ safeReadIndexExecutionContext(sin),
141+ safeReadShardIds(sin),
142+ safeReadStringList(sin),
143+ safeReadWorkflowRunContext(sin)
65144 )
66145
67146 @Throws(IOException ::class )
@@ -78,24 +157,25 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
78157 }
79158
80159 override fun validate (): ActionRequestValidationException ? {
81- var actionValidationException: ActionRequestValidationException ? = null
82- if (shardIds.isEmpty()) {
83- actionValidationException = ActionRequestValidationException ()
84- actionValidationException.addValidationError(" shard_ids is null or empty" )
160+ return if (shardIds.isEmpty()) {
161+ ActionRequestValidationException ().apply {
162+ addValidationError(" shard_ids is null or empty" )
163+ }
164+ } else {
165+ null
85166 }
86- return actionValidationException
87167 }
88168
89169 @Throws(IOException ::class )
90170 override fun toXContent (builder : XContentBuilder , params : ToXContent .Params ): XContentBuilder {
91- builder.startObject()
171+ return builder.startObject()
92172 .field(" monitor" , monitor)
93173 .field(" dry_run" , dryRun)
94174 .field(" execution_id" , executionId)
95175 .field(" index_execution_context" , indexExecutionContext)
96176 .field(" shard_ids" , shardIds)
97177 .field(" concrete_indices" , concreteIndicesSeenSoFar)
98178 .field(" workflow_run_context" , workflowRunContext)
99- return builder .endObject()
179+ .endObject()
100180 }
101181}
0 commit comments