2626import org .apache .logging .log4j .Logger ;
2727import org .hiero .block .api .BlockStreamPublishServiceInterface .BlockStreamPublishServiceClient ;
2828import org .hiero .block .api .PublishStreamRequest ;
29+ import org .hiero .block .api .PublishStreamRequest .EndStream ;
2930import org .hiero .block .api .PublishStreamResponse ;
3031import org .hiero .block .api .PublishStreamResponse .BlockAcknowledgement ;
3132import org .hiero .block .api .PublishStreamResponse .EndOfStream ;
@@ -56,7 +57,7 @@ public class BlockNodeConnection implements Pipeline<PublishStreamResponse> {
5657 /**
5758 * A longer retry delay for when the connection encounters an error.
5859 */
59- public static final Duration LONGER_RETRY_DELAY = Duration .ofSeconds (30 );
60+ public static final Duration THIRTY_SECONDS = Duration .ofSeconds (30 );
6061 /**
6162 * The configuration specific to the block node this connection is for.
6263 */
@@ -278,53 +279,50 @@ private void cancelStreamReset() {
278279 *
279280 * @param delay the delay before attempting to reconnect
280281 */
281- private void closeAndReschedule (@ NonNull final Duration delay , final boolean callOnComplete ) {
282- requireNonNull (delay , "delay must not be null" );
282+ private void closeAndReschedule (@ Nullable final Duration delay , final boolean callOnComplete ) {
283283 close (callOnComplete );
284- blockNodeConnectionManager .rescheduleConnection (this , delay );
284+ blockNodeConnectionManager .rescheduleConnection (this , delay , null );
285285 }
286286
287287 /**
288- * Ends the stream with the specified code and reschedules with the specified delay.
289- * This method sends an end stream message before cleanup and retry logic.
288+ * Ends the stream with the specified code and reschedules with a longer retry delay. This method sends an end stream
289+ * message before cleanup and retry logic.
290290 *
291291 * @param code the code indicating why the stream was ended
292- * @param delay the delay before attempting to reconnect
293292 */
294- private void endStreamAndReschedule (
295- @ NonNull final PublishStreamRequest .EndStream .Code code , @ NonNull final Duration delay ) {
293+ private void endStreamAndReschedule (@ NonNull final EndStream .Code code ) {
296294 requireNonNull (code , "code must not be null" );
297- requireNonNull (delay , "delay must not be null" );
298-
299295 endTheStreamWith (code );
300- blockNodeConnectionManager .rescheduleConnection (this , delay );
296+ blockNodeConnectionManager .rescheduleConnection (this , BlockNodeConnection . THIRTY_SECONDS , null );
301297 }
302298
303299 /**
304- * Closes the connection and restarts the stream at the specified block number.
305- * This method ensures proper cleanup and restart logic for immediate retries.
300+ * Closes the connection and restarts the stream at the specified block number. This method ensures proper cleanup
301+ * and restart logic for immediate retries.
306302 *
307303 * @param blockNumber the block number to restart at
308304 */
309- private void closeAndRestart (final long blockNumber , final boolean callOnComplete ) {
310- close (callOnComplete );
311- blockNodeConnectionManager .restartConnection (this , blockNumber );
305+ private void closeAndRestart (final long blockNumber ) {
306+ close (true );
307+ blockNodeConnectionManager .rescheduleConnection (this , null , blockNumber );
312308 }
313309
314310 /**
315311 * Handles the failure of the stream by closing the connection,
316312 * notifying the connection manager and calling onComplete on the request pipeline.
317313 */
318314 public void handleStreamFailure () {
319- closeAndReschedule (LONGER_RETRY_DELAY , true );
315+ logger .debug ("[{}] handleStreamFailure" , this );
316+ closeAndReschedule (THIRTY_SECONDS , true );
320317 }
321318
322319 /**
323320 * Handles the failure of the stream by closing the connection,
324321 * notifying the connection manager without calling onComplete on the request pipeline.
325322 */
326323 public void handleStreamFailureWithoutOnComplete () {
327- closeAndReschedule (LONGER_RETRY_DELAY , false );
324+ logger .debug ("[{}] handleStreamFailureWithoutOnComplete" , this );
325+ closeAndReschedule (THIRTY_SECONDS , false );
328326 }
329327
330328 /**
@@ -416,7 +414,7 @@ private void handleEndOfStream(@NonNull final EndOfStream endOfStream) {
416414 this ,
417415 blockNumber );
418416
419- closeAndReschedule (LONGER_RETRY_DELAY , true );
417+ closeAndReschedule (THIRTY_SECONDS , true );
420418 }
421419 case Code .TIMEOUT , Code .DUPLICATE_BLOCK , Code .BAD_BLOCK_PROOF , Code .INVALID_REQUEST -> {
422420 // We should restart the stream at the block immediately
@@ -428,13 +426,13 @@ private void handleEndOfStream(@NonNull final EndOfStream endOfStream) {
428426 this ,
429427 restartBlockNumber );
430428
431- closeAndRestart (restartBlockNumber , true );
429+ closeAndRestart (restartBlockNumber );
432430 }
433431 case Code .SUCCESS -> {
434432 // The block node orderly ended the stream. In this case, no errors occurred.
435433 // We should wait for a longer period before attempting to retry.
436434 logger .debug ("[{}] Block node orderly ended the stream at block {}" , this , blockNumber );
437- closeAndReschedule (LONGER_RETRY_DELAY , true );
435+ closeAndReschedule (THIRTY_SECONDS , true );
438436 }
439437 case Code .BEHIND -> {
440438 // The block node is behind us, check if we have the last verified block still available in order to
@@ -446,21 +444,21 @@ private void handleEndOfStream(@NonNull final EndOfStream endOfStream) {
446444 this ,
447445 restartBlockNumber );
448446
449- closeAndRestart (restartBlockNumber , true );
447+ closeAndRestart (restartBlockNumber );
450448 } else {
451449 // If we don't have the block state, we schedule retry for this connection and establish new one
452450 // with different block node
453451 logger .debug ("[{}] Block node is behind and block state is not available." , this );
454452
455453 // Indicate that the block node should recover and catch up from another trustworthy block node
456- endStreamAndReschedule (TOO_FAR_BEHIND , LONGER_RETRY_DELAY );
454+ endStreamAndReschedule (TOO_FAR_BEHIND );
457455 }
458456 }
459457 case Code .UNKNOWN -> {
460458 // This should never happen, but if it does, schedule this connection for a retry attempt
461459 // and in the meantime select a new node to stream to
462460 logger .error ("[{}] Block node reported an unknown error at block {}." , this , blockNumber );
463- closeAndReschedule (LONGER_RETRY_DELAY , true );
461+ closeAndReschedule (THIRTY_SECONDS , true );
464462 }
465463 }
466464 }
@@ -510,7 +508,7 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) {
510508 + "consensus node. Closing connection and will retry later" ,
511509 this ,
512510 resendBlockNumber );
513- closeAndReschedule (LONGER_RETRY_DELAY , true );
511+ closeAndReschedule (THIRTY_SECONDS , true );
514512 }
515513 }
516514
@@ -531,6 +529,12 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) {
531529 .latestBlockNumber (highestAckedBlockNumber ))
532530 .build ();
533531
532+ logger .debug (
533+ "[{}] Sending EndStream request with code {} (earliestBlockNumber={}, highestAckedBlockNumber={})" ,
534+ this ,
535+ code ,
536+ earliestBlockNumber ,
537+ highestAckedBlockNumber );
534538 sendRequest (endStream );
535539 close (true );
536540 }
0 commit comments