@@ -91,7 +91,7 @@ class DocLevelMonitorFanOutRequestTests {
9191 assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
9292 assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
9393 assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
94- assertEquals(sin.read(), - 1 )
94+ assertEquals(sin.read(), 0 )
9595 assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
9696 }
9797
@@ -156,7 +156,7 @@ class DocLevelMonitorFanOutRequestTests {
156156 assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
157157 assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
158158 assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
159- assertEquals(sin.read(), - 1 )
159+ assertEquals(sin.read().toString(), sin.read(), 0 )
160160 }
161161
162162 @Test
@@ -217,4 +217,68 @@ class DocLevelMonitorFanOutRequestTests {
217217 assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
218218 assertEquals(sin.read(), - 1 )
219219 }
220+
221+ @Test
222+ fun `test doc level monitor fan out request as stream when there are additional bytes left to handle` () {
223+ val docQuery = DocLevelQuery (query = " test_field:\" us-west-2\" " , fields = listOf (), name = " 3" )
224+ val docLevelInput = DocLevelMonitorInput (" description" , listOf (" test-index" ), listOf (docQuery))
225+
226+ val trigger = randomDocumentLevelTrigger(condition = Script (" return true" ))
227+ val monitor = randomDocumentLevelMonitor(
228+ inputs = listOf (docLevelInput),
229+ triggers = listOf (trigger),
230+ enabled = true ,
231+ schedule = IntervalSchedule (1 , ChronoUnit .MINUTES )
232+ )
233+ val monitorMetadata = MonitorMetadata (
234+ " test" ,
235+ SequenceNumbers .UNASSIGNED_SEQ_NO ,
236+ SequenceNumbers .UNASSIGNED_PRIMARY_TERM ,
237+ Monitor .NO_ID ,
238+ listOf (ActionExecutionTime (" " , Instant .now())),
239+ mutableMapOf (" index" to mutableMapOf (" 1" to " 1" )),
240+ mutableMapOf (" test-index" to " .opensearch-sap-test_windows-queries-000001" )
241+ )
242+ val indexExecutionContext = IndexExecutionContext (
243+ listOf (docQuery),
244+ mutableMapOf (" index" to mutableMapOf (" 1" to " 1" )),
245+ mutableMapOf (" index" to mutableMapOf (" 1" to " 1" )),
246+ " test-index" ,
247+ " test-index" ,
248+ listOf (" test-index" ),
249+ listOf (" test-index" ),
250+ listOf (" test-field" ),
251+ listOf (" 1" , " 2" )
252+ )
253+ val workflowRunContext = WorkflowRunContext (
254+ Workflow .NO_ID ,
255+ Workflow .NO_ID ,
256+ Monitor .NO_ID ,
257+ mutableMapOf (" index" to listOf (" 1" )),
258+ true
259+ )
260+ val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest (
261+ monitor,
262+ false ,
263+ monitorMetadata,
264+ UUID .randomUUID().toString(),
265+ indexExecutionContext,
266+ listOf (ShardId (" test-index" , UUID .randomUUID().toString(), 0 )),
267+ listOf (" test-index" ),
268+ workflowRunContext
269+ )
270+ val out = BytesStreamOutput ()
271+ docLevelMonitorFanOutRequest.writeTo(out )
272+ out .writeByte(Byte .MIN_VALUE )
273+ val sin = StreamInput .wrap(out .bytes().toBytesRef().bytes)
274+ val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest (sin)
275+ assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)
276+ assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId)
277+ assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata)
278+ assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
279+ assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
280+ assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
281+ assertEquals(sin.read(), 0 )
282+ assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
283+ }
220284}
0 commit comments