Skip to content

Commit aae19d4

Browse files
committed
change serde failure handling to expect failure only in monitor object for Doc Level monitor fanout request
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent d87d83e commit aae19d4

File tree

2 files changed

+58
-207
lines changed

2 files changed

+58
-207
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt

Lines changed: 57 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -40,138 +40,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
4040
val workflowRunContext: WorkflowRunContext?
4141
val hasSerializationFailed: Boolean
4242

43-
init {
44-
serializationFailedFlag = false
45-
}
4643
companion object {
47-
// flag flipped to true whenever a safeRead*() method fails to serialize a field correctly
48-
private var serializationFailedFlag: Boolean = false
4944
val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
50-
private fun safeReadMonitor(sin: StreamInput): Monitor =
51-
try {
52-
Monitor.readFrom(sin)!!
53-
} catch (e: Exception) {
54-
serializationFailedFlag = true
55-
log.error("Error parsing monitor in Doc level monitor fanout request", e)
56-
Monitor(
57-
"failed_serde", NO_VERSION, "failed_serde", true,
58-
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
59-
null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(),
60-
DataSources(), false, false, "failed"
61-
)
62-
}
63-
64-
private fun safeReadBoolean(sin: StreamInput): Boolean =
65-
try {
66-
sin.readBoolean()
67-
} catch (e: Exception) {
68-
serializationFailedFlag = true
69-
log.error("Error parsing boolean in Doc level monitor fanout request", e)
70-
false
71-
}
72-
73-
private fun safeReadMonitorMetadata(sin: StreamInput): MonitorMetadata =
74-
try {
75-
MonitorMetadata.readFrom(sin)
76-
} catch (e: Exception) {
77-
serializationFailedFlag = true
78-
log.error("Error parsing monitor in Doc level monitor fanout request", e)
79-
MonitorMetadata(
80-
"failed_serde",
81-
SequenceNumbers.UNASSIGNED_SEQ_NO,
82-
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
83-
"failed_serde",
84-
emptyList(),
85-
emptyMap(),
86-
mutableMapOf()
87-
)
88-
}
89-
90-
private fun safeReadString(sin: StreamInput): String =
91-
try {
92-
sin.readString()
93-
} catch (e: Exception) {
94-
serializationFailedFlag = true
95-
log.error("Error parsing string in Doc level monitor fanout request", e)
96-
""
97-
}
98-
99-
private fun safeReadShardIds(sin: StreamInput): List<ShardId> =
100-
try {
101-
sin.readList(::ShardId)
102-
} catch (e: Exception) {
103-
serializationFailedFlag = true
104-
log.error("Error parsing shardId list in Doc level monitor fanout request", e)
105-
emptyList()
106-
}
107-
108-
private fun safeReadStringList(sin: StreamInput): List<String> =
109-
try {
110-
sin.readStringList()
111-
} catch (e: Exception) {
112-
serializationFailedFlag = true
113-
log.error("Error parsing string list in Doc level monitor fanout request", e)
114-
emptyList()
115-
}
116-
117-
private fun safeReadWorkflowRunContext(sin: StreamInput): WorkflowRunContext? =
118-
try {
119-
if (sin.readBoolean()) WorkflowRunContext(sin) else null
120-
} catch (e: Exception) {
121-
serializationFailedFlag = true
122-
log.error("Error parsing workflow context in Doc level monitor fanout request", e)
123-
null
124-
}
125-
126-
private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? {
127-
var indexExecutionContext: IndexExecutionContext? = null
128-
return try {
129-
indexExecutionContext = IndexExecutionContext(sin)
130-
while (sin.read() != 0) {
131-
serializationFailedFlag = true
132-
// read and discard bytes until stream is entirely consumed
133-
try {
134-
sin.readByte()
135-
} catch (_: EOFException) {
136-
}
137-
}
138-
return indexExecutionContext
139-
} catch (e: EOFException) {
140-
indexExecutionContext
141-
} catch (e: Exception) {
142-
serializationFailedFlag = true
143-
log.error("Error parsing index execution context in Doc level monitor fanout request", e)
144-
while (sin.read() != 0) {
145-
try { // read and throw bytes until stream is entirely consumed
146-
sin.readByte()
147-
} catch (_: EOFException) {
148-
}
149-
}
150-
null
151-
}
152-
}
153-
}
154-
155-
private constructor(
156-
monitor: Monitor,
157-
dryRun: Boolean,
158-
monitorMetadata: MonitorMetadata,
159-
executionId: String,
160-
indexExecutionContext: IndexExecutionContext?,
161-
shardIds: List<ShardId>,
162-
concreteIndicesSeenSoFar: List<String>,
163-
workflowRunContext: WorkflowRunContext?,
164-
hasSerializationFailed: Boolean
165-
) : super() {
166-
this.monitor = monitor
167-
this.dryRun = dryRun
168-
this.monitorMetadata = monitorMetadata
169-
this.executionId = executionId
170-
this.indexExecutionContext = indexExecutionContext
171-
this.shardIds = shardIds
172-
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
173-
this.workflowRunContext = workflowRunContext
174-
this.hasSerializationFailed = hasSerializationFailed ?: false
17545
}
17646

17747
constructor(
@@ -196,16 +66,63 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
19666
}
19767

19868
@Throws(IOException::class)
199-
constructor(sin: StreamInput) : this(
200-
monitor = safeReadMonitor(sin),
201-
dryRun = safeReadBoolean(sin),
202-
monitorMetadata = safeReadMonitorMetadata(sin),
203-
executionId = safeReadString(sin),
204-
shardIds = safeReadShardIds(sin),
205-
concreteIndicesSeenSoFar = safeReadStringList(sin),
206-
workflowRunContext = safeReadWorkflowRunContext(sin),
207-
indexExecutionContext = safeReadIndexExecutionContext(sin),
208-
hasSerializationFailed = serializationFailedFlag
69+
constructor(sin: StreamInput) : super() {
70+
var monitorSerializationSucceeded = true
71+
var parsedMonitor = getDummyMonitor()
72+
try {
73+
parsedMonitor = Monitor(sin)
74+
} catch (e: Exception) {
75+
log.error("Error parsing monitor in Doc level monitor fanout request", e)
76+
monitorSerializationSucceeded = false
77+
log.info("Force consuming stream in Doc level monitor fanout request")
78+
while (sin.read() != 0) {
79+
// read and discard bytes until stream is entirely consumed
80+
try {
81+
sin.readByte()
82+
} catch (_: EOFException) {
83+
}
84+
}
85+
}
86+
if (monitorSerializationSucceeded) {
87+
this.monitor = parsedMonitor
88+
this.dryRun = sin.readBoolean()
89+
this.monitorMetadata = MonitorMetadata.readFrom(sin)
90+
this.executionId = sin.readString()
91+
this.shardIds = sin.readList(::ShardId)
92+
this.concreteIndicesSeenSoFar = sin.readStringList()
93+
this.workflowRunContext = if (sin.readBoolean()) {
94+
WorkflowRunContext(sin)
95+
} else {
96+
null
97+
}
98+
indexExecutionContext = IndexExecutionContext(sin)
99+
this.hasSerializationFailed = false == monitorSerializationSucceeded
100+
} else {
101+
this.monitor = parsedMonitor
102+
this.dryRun = false
103+
this.monitorMetadata = MonitorMetadata(
104+
"failed_serde",
105+
SequenceNumbers.UNASSIGNED_SEQ_NO,
106+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
107+
"failed_serde",
108+
emptyList(),
109+
emptyMap(),
110+
mutableMapOf()
111+
)
112+
this.executionId = ""
113+
this.shardIds = emptyList()
114+
this.concreteIndicesSeenSoFar = emptyList()
115+
this.workflowRunContext = null
116+
this.indexExecutionContext = null
117+
this.hasSerializationFailed = false == monitorSerializationSucceeded
118+
}
119+
}
120+
121+
private fun getDummyMonitor() = Monitor(
122+
"failed_serde", NO_VERSION, "failed_serde", true,
123+
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
124+
null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(),
125+
DataSources(), false, false, "failed"
209126
)
210127

211128
@Throws(IOException::class)

src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package org.opensearch.commons.alerting.action
77

88
import org.junit.Assert.assertEquals
99
import org.junit.Assert.assertFalse
10-
import org.junit.Assert.assertNull
1110
import org.junit.Assert.assertTrue
1211
import org.junit.jupiter.api.Test
1312
import org.opensearch.common.io.stream.BytesStreamOutput
@@ -164,65 +163,6 @@ class DocLevelMonitorFanOutRequestTests {
164163
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
165164
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
166165

167-
val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
168-
val monitor = randomDocumentLevelMonitor(
169-
inputs = listOf(docLevelInput),
170-
triggers = listOf(trigger),
171-
enabled = true,
172-
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
173-
)
174-
val monitorMetadata = MonitorMetadata(
175-
"test",
176-
SequenceNumbers.UNASSIGNED_SEQ_NO,
177-
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
178-
Monitor.NO_ID,
179-
listOf(ActionExecutionTime("", Instant.now())),
180-
mutableMapOf("index" to mutableMapOf("1" to "1")),
181-
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
182-
)
183-
val indexExecutionContext = IndexExecutionContext(
184-
listOf(docQuery),
185-
mutableMapOf("index" to mutableMapOf("1" to "1")),
186-
mutableMapOf("index" to mutableMapOf("1" to "1")),
187-
"test-index",
188-
"test-index",
189-
listOf("test-index"),
190-
listOf("test-index"),
191-
listOf("test-field"),
192-
listOf("1", "2")
193-
)
194-
val workflowRunContext = WorkflowRunContext(
195-
Workflow.NO_ID,
196-
Workflow.NO_ID,
197-
Monitor.NO_ID,
198-
mutableMapOf("index" to listOf("1")),
199-
true,
200-
listOf("finding1")
201-
)
202-
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
203-
monitor,
204-
false,
205-
monitorMetadata,
206-
UUID.randomUUID().toString(),
207-
indexExecutionContext,
208-
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
209-
listOf("test-index"),
210-
workflowRunContext
211-
)
212-
val out = BytesStreamOutput()
213-
monitor.writeTo(out)
214-
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
215-
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
216-
assertNull(newDocLevelMonitorFanOutRequest.indexExecutionContext)
217-
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
218-
assertEquals(sin.read(), -1)
219-
}
220-
221-
@Test
222-
fun `test doc level monitor fan out request as stream when there are additional bytes left to handle`() {
223-
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
224-
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
225-
226166
val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
227167
val monitor = randomDocumentLevelMonitor(
228168
inputs = listOf(docLevelInput),
@@ -268,16 +208,10 @@ class DocLevelMonitorFanOutRequestTests {
268208
workflowRunContext
269209
)
270210
val out = BytesStreamOutput()
211+
out.writeString(UUID.randomUUID().toString())
271212
docLevelMonitorFanOutRequest.writeTo(out)
272-
out.writeByte(Byte.MIN_VALUE)
273213
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
274214
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
275-
assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)
276-
assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId)
277-
assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata)
278-
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
279-
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
280-
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
281215
assertEquals(sin.read(), 0)
282216
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
283217
}

0 commit comments

Comments
 (0)