Skip to content

Commit d30dac5

Browse files
committed
adds flag for capturing serialization failure in Doc level monitor fan out request
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 8fc053b commit d30dac5

File tree

2 files changed

+23
-3
lines changed

2 files changed

+23
-3
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,17 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
3838
val shardIds: List<ShardId>
3939
val concreteIndicesSeenSoFar: List<String>
4040
val workflowRunContext: WorkflowRunContext?
41+
val hasSerializationFailed: Boolean
4142

4243
companion object {
44+
// flag flipped to true whenever a safeRead*() method fails to serialize a field correctly
45+
private var serializationFailedFlag: Boolean = false
4346
val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
4447
private fun safeReadMonitor(sin: StreamInput): Monitor =
4548
try {
4649
Monitor.readFrom(sin)!!
4750
} catch (e: Exception) {
51+
serializationFailedFlag = true
4852
log.error("Error parsing monitor in Doc level monitor fanout request", e)
4953
Monitor(
5054
"failed_serde", NO_VERSION, "failed_serde", true,
@@ -58,6 +62,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
5862
try {
5963
sin.readBoolean()
6064
} catch (e: Exception) {
65+
serializationFailedFlag = true
6166
log.error("Error parsing boolean in Doc level monitor fanout request", e)
6267
false
6368
}
@@ -66,6 +71,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
6671
try {
6772
MonitorMetadata.readFrom(sin)
6873
} catch (e: Exception) {
74+
serializationFailedFlag = true
6975
log.error("Error parsing monitor in Doc level monitor fanout request", e)
7076
MonitorMetadata(
7177
"failed_serde",
@@ -82,6 +88,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
8288
try {
8389
sin.readString()
8490
} catch (e: Exception) {
91+
serializationFailedFlag = true
8592
log.error("Error parsing string in Doc level monitor fanout request", e)
8693
""
8794
}
@@ -90,6 +97,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
9097
try {
9198
sin.readList(::ShardId)
9299
} catch (e: Exception) {
100+
serializationFailedFlag = true
93101
log.error("Error parsing shardId list in Doc level monitor fanout request", e)
94102
listOf(ShardId("failed_serde", "failed_serde", 999999))
95103
}
@@ -98,6 +106,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
98106
try {
99107
sin.readStringList()
100108
} catch (e: Exception) {
109+
serializationFailedFlag = true
101110
log.error("Error parsing string list in Doc level monitor fanout request", e)
102111
emptyList()
103112
}
@@ -106,6 +115,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
106115
try {
107116
if (sin.readBoolean()) WorkflowRunContext(sin) else null
108117
} catch (e: Exception) {
118+
serializationFailedFlag = true
109119
log.error("Error parsing workflow context in Doc level monitor fanout request", e)
110120
null
111121
}
@@ -125,6 +135,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
125135
} catch (e: EOFException) {
126136
indexExecutionContext
127137
} catch (e: Exception) {
138+
serializationFailedFlag = true
128139
log.error("Error parsing index execution context in Doc level monitor fanout request", e)
129140
while (sin.read() != -1) {
130141
try { // read and throw bytes until stream is entirely consumed
@@ -145,7 +156,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
145156
indexExecutionContext: IndexExecutionContext?,
146157
shardIds: List<ShardId>,
147158
concreteIndicesSeenSoFar: List<String>,
148-
workflowRunContext: WorkflowRunContext?
159+
workflowRunContext: WorkflowRunContext?,
160+
hasSerializationFailed: Boolean? = null
149161
) : super() {
150162
this.monitor = monitor
151163
this.dryRun = dryRun
@@ -156,6 +168,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
156168
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
157169
this.workflowRunContext = workflowRunContext
158170
require(false == shardIds.isEmpty()) { }
171+
this.hasSerializationFailed = hasSerializationFailed ?: false
159172
}
160173

161174
@Throws(IOException::class)
@@ -167,7 +180,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
167180
shardIds = safeReadShardIds(sin),
168181
concreteIndicesSeenSoFar = safeReadStringList(sin),
169182
workflowRunContext = safeReadWorkflowRunContext(sin),
170-
indexExecutionContext = safeReadIndexExecutionContext(sin)
183+
indexExecutionContext = safeReadIndexExecutionContext(sin),
184+
hasSerializationFailed = serializationFailedFlag
171185
)
172186

173187
@Throws(IOException::class)

src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package org.opensearch.commons.alerting.action
77

88
import org.junit.Assert.assertEquals
9+
import org.junit.Assert.assertFalse
910
import org.junit.Assert.assertNull
11+
import org.junit.Assert.assertTrue
1012
import org.junit.jupiter.api.Test
1113
import org.opensearch.common.io.stream.BytesStreamOutput
1214
import org.opensearch.commons.alerting.model.ActionExecutionTime
@@ -81,7 +83,6 @@ class DocLevelMonitorFanOutRequestTests {
8183
)
8284
val out = BytesStreamOutput()
8385
docLevelMonitorFanOutRequest.writeTo(out)
84-
monitor.writeTo(out)
8586
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
8687
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
8788
assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)
@@ -91,6 +92,7 @@ class DocLevelMonitorFanOutRequestTests {
9192
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
9293
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
9394
assertEquals(sin.read(), -1)
95+
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
9496
}
9597

9698
@Test
@@ -153,6 +155,8 @@ class DocLevelMonitorFanOutRequestTests {
153155
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
154156
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
155157
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
158+
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
159+
assertEquals(sin.read(), -1)
156160
}
157161

158162
@Test
@@ -210,5 +214,7 @@ class DocLevelMonitorFanOutRequestTests {
210214
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
211215
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
212216
assertNull(newDocLevelMonitorFanOutRequest.indexExecutionContext)
217+
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
218+
assertEquals(sin.read(), -1)
213219
}
214220
}

0 commit comments

Comments
 (0)