Skip to content

Commit 19a6af8

Browse files
committed
force consume stream in fan out request
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 4332937 commit 19a6af8

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.commons.alerting.action
77

8+
import org.apache.logging.log4j.LogManager
89
import org.opensearch.action.ActionRequest
910
import org.opensearch.action.ActionRequestValidationException
1011
import org.opensearch.commons.alerting.model.DataSources
@@ -22,11 +23,13 @@ import org.opensearch.core.xcontent.ToXContent
2223
import org.opensearch.core.xcontent.ToXContentObject
2324
import org.opensearch.core.xcontent.XContentBuilder
2425
import org.opensearch.index.seqno.SequenceNumbers
26+
import java.io.EOFException
2527
import java.io.IOException
2628
import java.time.Instant
2729
import java.time.temporal.ChronoUnit
2830

2931
class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
32+
private val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
3033
val monitor: Monitor
3134
val dryRun: Boolean
3235
val monitorMetadata: MonitorMetadata
@@ -99,12 +102,29 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
99102
null
100103
}
101104

102-
private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? =
103-
try {
104-
IndexExecutionContext(sin)
105+
private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? {
106+
var indexExecutionContext: IndexExecutionContext? = null
107+
return try {
108+
indexExecutionContext = IndexExecutionContext(sin)
109+
while (sin.read() != -1) {
110+
try { // read and throw bytes until stream is entirely consumed
111+
sin.readByte()
112+
} catch (_: EOFException) {
113+
}
114+
}
115+
return indexExecutionContext
116+
} catch (e: EOFException) {
117+
indexExecutionContext
105118
} catch (e: Exception) {
119+
while (sin.read() != -1) {
120+
try { // read and throw bytes until stream is entirely consumed
121+
sin.readByte()
122+
} catch (_: EOFException) {
123+
}
124+
}
106125
null
107126
}
127+
}
108128
}
109129

110130
constructor(
@@ -151,6 +171,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
151171
out.writeBoolean(workflowRunContext != null)
152172
workflowRunContext?.writeTo(out)
153173
indexExecutionContext?.writeTo(out)
174+
indexExecutionContext?.writeTo(out)
154175
}
155176

156177
override fun validate(): ActionRequestValidationException? {

src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package org.opensearch.commons.alerting.action
77

88
import org.junit.Assert.assertEquals
99
import org.junit.Assert.assertNull
10-
import org.junit.Assert.assertTrue
1110
import org.junit.jupiter.api.Test
1211
import org.opensearch.common.io.stream.BytesStreamOutput
1312
import org.opensearch.commons.alerting.model.ActionExecutionTime
@@ -82,6 +81,7 @@ class DocLevelMonitorFanOutRequestTests {
8281
)
8382
val out = BytesStreamOutput()
8483
docLevelMonitorFanOutRequest.writeTo(out)
84+
monitor.writeTo(out)
8585
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
8686
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
8787
assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)

0 commit comments

Comments
 (0)