Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.logging.log4j.Logger;
import org.hiero.block.api.BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient;
import org.hiero.block.api.PublishStreamRequest;
import org.hiero.block.api.PublishStreamRequest.EndStream;
import org.hiero.block.api.PublishStreamResponse;
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
Expand Down Expand Up @@ -56,7 +57,7 @@
/**
* A longer retry delay for when the connection encounters an error.
*/
public static final Duration LONGER_RETRY_DELAY = Duration.ofSeconds(30);
public static final Duration THIRTY_SECONDS = Duration.ofSeconds(30);
/**
* The configuration specific to the block node this connection is for.
*/
Expand Down Expand Up @@ -278,53 +279,50 @@
*
* @param delay the delay before attempting to reconnect
*/
private void closeAndReschedule(@NonNull final Duration delay, final boolean callOnComplete) {
requireNonNull(delay, "delay must not be null");
private void closeAndReschedule(@Nullable final Duration delay, final boolean callOnComplete) {
close(callOnComplete);
blockNodeConnectionManager.rescheduleConnection(this, delay);
blockNodeConnectionManager.rescheduleConnection(this, delay, null);
}

/**
* Ends the stream with the specified code and reschedules with the specified delay.
* This method sends an end stream message before cleanup and retry logic.
* Ends the stream with the specified code and reschedules with a longer retry delay. This method sends an end stream
* message before cleanup and retry logic.
*
* @param code the code indicating why the stream was ended
* @param delay the delay before attempting to reconnect
*/
private void endStreamAndReschedule(
@NonNull final PublishStreamRequest.EndStream.Code code, @NonNull final Duration delay) {
private void endStreamAndReschedule(@NonNull final EndStream.Code code) {
requireNonNull(code, "code must not be null");
requireNonNull(delay, "delay must not be null");

endTheStreamWith(code);
blockNodeConnectionManager.rescheduleConnection(this, delay);
blockNodeConnectionManager.rescheduleConnection(this, BlockNodeConnection.THIRTY_SECONDS, null);
}

/**
* Closes the connection and restarts the stream at the specified block number.
* This method ensures proper cleanup and restart logic for immediate retries.
* Closes the connection and restarts the stream at the specified block number. This method ensures proper cleanup
* and restart logic for immediate retries.
*
* @param blockNumber the block number to restart at
*/
private void closeAndRestart(final long blockNumber, final boolean callOnComplete) {
close(callOnComplete);
blockNodeConnectionManager.restartConnection(this, blockNumber);
private void closeAndRestart(final long blockNumber) {
close(true);
blockNodeConnectionManager.rescheduleConnection(this, null, blockNumber);
}

/**
* Handles the failure of the stream by closing the connection,
* notifying the connection manager and calling onComplete on the request pipeline.
*/
public void handleStreamFailure() {
closeAndReschedule(LONGER_RETRY_DELAY, true);
logger.debug("[{}] handleStreamFailure", this);
closeAndReschedule(THIRTY_SECONDS, true);
}

/**
* Handles the failure of the stream by closing the connection,
* notifying the connection manager without calling onComplete on the request pipeline.
*/
public void handleStreamFailureWithoutOnComplete() {
closeAndReschedule(LONGER_RETRY_DELAY, false);
logger.debug("[{}] handleStreamFailureWithoutOnComplete", this);
closeAndReschedule(THIRTY_SECONDS, false);

Check warning on line 325 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java#L324-L325

Added lines #L324 - L325 were not covered by tests
}

/**
Expand Down Expand Up @@ -416,7 +414,7 @@
this,
blockNumber);

closeAndReschedule(LONGER_RETRY_DELAY, true);
closeAndReschedule(THIRTY_SECONDS, true);
}
case Code.TIMEOUT, Code.DUPLICATE_BLOCK, Code.BAD_BLOCK_PROOF, Code.INVALID_REQUEST -> {
// We should restart the stream at the block immediately
Expand All @@ -428,13 +426,13 @@
this,
restartBlockNumber);

closeAndRestart(restartBlockNumber, true);
closeAndRestart(restartBlockNumber);
}
case Code.SUCCESS -> {
// The block node orderly ended the stream. In this case, no errors occurred.
// We should wait for a longer period before attempting to retry.
logger.debug("[{}] Block node orderly ended the stream at block {}", this, blockNumber);
closeAndReschedule(LONGER_RETRY_DELAY, true);
closeAndReschedule(THIRTY_SECONDS, true);
}
case Code.BEHIND -> {
// The block node is behind us, check if we have the last verified block still available in order to
Expand All @@ -446,21 +444,21 @@
this,
restartBlockNumber);

closeAndRestart(restartBlockNumber, true);
closeAndRestart(restartBlockNumber);
} else {
// If we don't have the block state, we schedule retry for this connection and establish new one
// with different block node
logger.debug("[{}] Block node is behind and block state is not available.", this);

// Indicate that the block node should recover and catch up from another trustworthy block node
endStreamAndReschedule(TOO_FAR_BEHIND, LONGER_RETRY_DELAY);
endStreamAndReschedule(TOO_FAR_BEHIND);
}
}
case Code.UNKNOWN -> {
// This should never happen, but if it does, schedule this connection for a retry attempt
// and in the meantime select a new node to stream to
logger.error("[{}] Block node reported an unknown error at block {}.", this, blockNumber);
closeAndReschedule(LONGER_RETRY_DELAY, true);
closeAndReschedule(THIRTY_SECONDS, true);
}
}
}
Expand Down Expand Up @@ -510,7 +508,7 @@
+ "consensus node. Closing connection and will retry later",
this,
resendBlockNumber);
closeAndReschedule(LONGER_RETRY_DELAY, true);
closeAndReschedule(THIRTY_SECONDS, true);
}
}

Expand All @@ -531,6 +529,12 @@
.latestBlockNumber(highestAckedBlockNumber))
.build();

logger.debug(
"[{}] Sending EndStream request with code {} (earliestBlockNumber={}, highestAckedBlockNumber={})",
this,
code,
earliestBlockNumber,
highestAckedBlockNumber);
sendRequest(endStream);
close(true);
}
Expand Down
Loading
Loading