@@ -51,8 +51,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
5151 private currentClient : WebSocketEx | undefined ;
5252 private subscriptionNames = new Map < string , string > ( ) ;
5353 private queue = Promise . resolve ( ) ;
54- private mostRecentBatchTimestamp = new Date ( ) ;
55- private mostRecentACKTimestamp = new Date ( ) ;
54+ private mostRecentCompletedBatchTimestamp = new Date ( ) ;
55+ private mostRecentDispatchedBatchTimestamp = new Date ( ) ;
5656
5757 constructor (
5858 protected readonly logger : Logger ,
@@ -134,16 +134,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
134134
135135 // Record metrics
136136 this . metrics . setEventBatchSize ( batch . events . length ) ;
137- let timestamp = new Date ( ) ;
138- this . logger . log (
139- 'Recording batch interval of ' +
140- ( timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ) +
141- ' milliseconds' ,
142- ) ;
143- this . metrics . observeBatchInterval (
144- timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ,
145- ) ;
146- this . mostRecentBatchTimestamp = timestamp ;
137+ let batchIntervalMs = new Date ( ) . getTime ( ) - this . mostRecentCompletedBatchTimestamp . getTime ( ) ;
138+ this . logger . log ( `Recording batch interval of ${ batchIntervalMs } milliseconds` ) ;
139+ this . metrics . observeBatchInterval ( batchIntervalMs ) ;
147140
148141 const messages : WebSocketMessage [ ] = [ ] ;
149142 for ( const event of batch . events ) {
@@ -176,6 +169,10 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
176169 } ;
177170 this . awaitingAck . push ( message ) ;
178171 this . currentClient ?. send ( JSON . stringify ( message ) ) ;
172+
173+ // Set the most-recent batch dispatch time to now so when the next ACK comes back from FF
174+ // we can set metrics accordingly
175+ this . mostRecentDispatchedBatchTimestamp = new Date ( ) ;
179176 }
180177
181178 private async getSubscriptionName ( ctx : Context , subId : string ) {
@@ -210,16 +207,10 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
210207 return ;
211208 }
212209
213- let timestamp = new Date ( ) ;
214- this . logger . log (
215- 'Recording batch ACK interval of ' +
216- ( timestamp . getTime ( ) - this . mostRecentACKTimestamp . getTime ( ) ) +
217- ' milliseconds' ,
218- ) ;
219- this . metrics . observeBatchAckInterval (
220- timestamp . getTime ( ) - this . mostRecentACKTimestamp . getTime ( ) ,
221- ) ;
222- this . mostRecentACKTimestamp = timestamp ;
210+ let timeWaitingForACKms =
211+ new Date ( ) . getTime ( ) - this . mostRecentDispatchedBatchTimestamp . getTime ( ) ;
212+ this . logger . log ( `Recording batch ACK interval of ${ timeWaitingForACKms } milliseconds` ) ;
213+ this . metrics . observeBatchAckInterval ( timeWaitingForACKms ) ;
223214
224215 const inflight = this . awaitingAck . find ( msg => msg . id === data . id ) ;
225216 this . logger . log ( `Received ack ${ data . id } inflight=${ ! ! inflight } ` ) ;
@@ -237,5 +228,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
237228 this . socket . ack ( inflight . batchNumber ) ;
238229 }
239230 }
231+
232+ // Set the most-recent batch time to now - so when the next batch comes we can calculate
233+ // time between sending our ACK to the current batch and receiving the new one
234+ this . mostRecentCompletedBatchTimestamp = new Date ( ) ;
240235 }
241236}
0 commit comments