From d585ca4d1bfa5c0ebfdb0bb732b7ee530ba6fda4 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Thu, 25 Sep 2025 11:58:12 -0500 Subject: [PATCH 1/6] chore: improve handling of concurrent BN connection close operations Signed-off-by: Tim Farber-Newman --- .gitignore | 3 + .../impl/streaming/BlockNodeConnection.java | 104 ++++++++++++------ .../streaming/BlockNodeConnectionManager.java | 42 ++++--- .../streaming/BlockNodeConnectionTest.java | 69 ++++++++++++ .../simulator/SimulatedBlockNodeServer.java | 2 + 5 files changed, 166 insertions(+), 54 deletions(-) diff --git a/.gitignore b/.gitignore index 4a4ae44edfab..2799257c4a62 100644 --- a/.gitignore +++ b/.gitignore @@ -1086,3 +1086,6 @@ hedera-state-validator/validator.log snyk-code.html snyk-code.json snyk/ + +### Profiler config +.profileconfig.json \ No newline at end of file diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index d2f41c7109c2..5c41e630fe79 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -9,6 +9,7 @@ import com.hedera.node.config.ConfigProvider; import com.hedera.node.config.data.BlockNodeConnectionConfig; import com.hedera.node.internal.network.BlockNodeConfig; +import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.Pipeline; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.Duration; @@ -18,6 +19,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,6 +48,10 @@ public class BlockNodeConnection implements Pipeline { private static final Logger logger = LogManager.getLogger(BlockNodeConnection.class); + /** + * Counter used to get unique identities for connection instances. + */ + private static final AtomicLong connectionIdCounter = new AtomicLong(0); /** * A longer retry delay for when the connection encounters an error. */ @@ -80,7 +86,7 @@ public class BlockNodeConnection implements Pipeline { */ private final BlockStreamPublishServiceClient blockStreamPublishServiceClient; - private Pipeline requestPipeline; + private final AtomicReference> requestPipelineRef = new AtomicReference<>(); /** * Reference to the current state of this connection. */ @@ -95,6 +101,10 @@ public class BlockNodeConnection implements Pipeline { * When the connection is closed or reset, this task is cancelled. */ private ScheduledFuture streamResetTask; + /** + * The unique ID of this connection instance. + */ + private final String connectionId; /** * Represents the possible states of a Block Node connection. @@ -112,6 +122,7 @@ public enum ConnectionState { * Connection is active. Block Stream Worker Thread is sending PublishStreamRequest's to the block node through async bidi stream. */ ACTIVE, + CLOSING, /** * Connection has been closed and pipeline terminated. This is a terminal state. * No more requests can be sent and no more responses will be received. @@ -150,14 +161,17 @@ public BlockNodeConnection( final var blockNodeConnectionConfig = configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class); this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod(); + connectionId = String.format("%04d", connectionIdCounter.incrementAndGet()); } /** * Creates a new bidi request pipeline for this block node connection. */ - public void createRequestPipeline() { - if (requestPipeline == null) { - requestPipeline = blockStreamPublishServiceClient.publishBlockStream(this); + public synchronized void createRequestPipeline() { + if (requestPipelineRef.get() == null) { + final Pipeline pipeline = + blockStreamPublishServiceClient.publishBlockStream(this); + requestPipelineRef.set(pipeline); updateConnectionState(ConnectionState.PENDING); } } @@ -255,7 +269,6 @@ private void closeAndRestart(final long blockNumber, final boolean callOnComplet * notifying the connection manager and calling onComplete on the request pipeline. */ public void handleStreamFailure() { - logger.debug("[{}] handleStreamFailure", this); closeAndReschedule(LONGER_RETRY_DELAY, true); } @@ -264,7 +277,6 @@ public void handleStreamFailure() { * notifying the connection manager without calling onComplete on the request pipeline. */ public void handleStreamFailureWithoutOnComplete() { - logger.debug("[{}] handleStreamFailureWithoutOnComplete", this); closeAndReschedule(LONGER_RETRY_DELAY, false); } @@ -285,7 +297,7 @@ private void handleAcknowledgement(@NonNull final BlockAcknowledgement acknowled * Acknowledges the blocks up to the specified block number. * @param acknowledgedBlockNumber the block number that has been known to be persisted and verified by the block node */ - private void acknowledgeBlocks(long acknowledgedBlockNumber, boolean maybeJumpToBlock) { + private void acknowledgeBlocks(final long acknowledgedBlockNumber, final boolean maybeJumpToBlock) { final long currentBlockStreaming = blockNodeConnectionManager.currentStreamingBlockNumber(); final long currentBlockProducing = blockBufferService.getLastBlockNumberProduced(); @@ -459,7 +471,7 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) { * * @param code the code on why stream was ended */ - public void endTheStreamWith(PublishStreamRequest.EndStream.Code code) { + public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) { final var earliestBlockNumber = blockBufferService.getEarliestAvailableBlockNumber(); final var highestAckedBlockNumber = blockBufferService.getHighestAckedBlockNumber(); @@ -483,8 +495,22 @@ public void endTheStreamWith(PublishStreamRequest.EndStream.Code code) { public void sendRequest(@NonNull final PublishStreamRequest request) { requireNonNull(request, "request must not be null"); - if (getConnectionState() == ConnectionState.ACTIVE && requestPipeline != null) { - requestPipeline.onNext(request); + final Pipeline pipeline = requestPipelineRef.get(); + if (getConnectionState() == ConnectionState.ACTIVE && pipeline != null) { + try { + pipeline.onNext(request); + } catch (final RuntimeException e) { + /* + There is a possible, and somewhat expected, race condition when one thread is attempting to close this + connection while a request is being sent on another thread. Because of this, an exception may get thrown + but depending on the state of the connection it may be expected. Thus, if we do get an exception we only + want to propagate it if the connection is still in an ACTIVE state. If we receive an error while the + connection is in another state (e.g. CLOSING) then we want to ignore the error. + */ + if (getConnectionState() == ConnectionState.ACTIVE) { + throw e; + } + } } } @@ -494,8 +520,14 @@ public void sendRequest(@NonNull final PublishStreamRequest request) { * @param callOnComplete whether to call onComplete on the request pipeline */ public void close(final boolean callOnComplete) { - if (getConnectionState() == ConnectionState.CLOSED) { - logger.debug("[{}] Connection already closed.", this); + final ConnectionState connState = connectionState.get(); + if (connState == ConnectionState.CLOSING || connState == ConnectionState.CLOSED) { + logger.debug("[{}] Connection already in terminal state ({})", this, connState); + return; + } + + if (!connectionState.compareAndSet(connState, ConnectionState.CLOSING)) { + logger.debug("[{}] State changed while trying to close connection; aborting close attempt", this); return; } @@ -503,33 +535,40 @@ public void close(final boolean callOnComplete) { logger.debug("[{}] Closing connection...", this); closePipeline(callOnComplete); - updateConnectionState(ConnectionState.CLOSED); jumpToBlock(-1L); logger.debug("[{}] Connection successfully closed", this); } catch (final RuntimeException e) { logger.warn("[{}] Error occurred while attempting to close connection", this); + } finally { + // regardless of outcome, mark the connection as closed + updateConnectionState(ConnectionState.CLOSED); } } private void closePipeline(final boolean callOnComplete) { - if (requestPipeline != null) { + final Pipeline pipeline = requestPipelineRef.get(); + + if (pipeline != null) { logger.debug("[{}] Closing request pipeline for block node", this); streamShutdownInProgress.set(true); try { - if (getConnectionState() == ConnectionState.ACTIVE && callOnComplete) { - requestPipeline.onComplete(); + final ConnectionState state = connectionState.get(); + if (state == ConnectionState.CLOSING && callOnComplete) { + pipeline.onComplete(); logger.debug("[{}] Request pipeline successfully closed", this); } else { - logger.debug("[{}] Request pipeline closed without onComplete - connection not active", this); + logger.debug( + "[{}] Attempted to close pipeline, but connection state is not CLOSING (actual={})", + this, + state); } } catch (final Exception e) { logger.warn("[{}] Error while completing request pipeline", this, e); } // Clear the pipeline reference to prevent further use - // pipelineLock ensures no thread is mid-call to requestPipeline.onNext() - requestPipeline = null; + requestPipelineRef.compareAndSet(pipeline, null); } } @@ -542,18 +581,6 @@ public BlockNodeConfig getNodeConfig() { return blockNodeConfig; } - /** - * Restarts a new stream at a specified block number. - * This method will establish a new stream and start processing from the specified block number. - * - * @param blockNumber the block number to restart at - */ - private void restartStreamAtBlock(final long blockNumber) { - logger.debug("[{}] Scheduling stream restart at block {}", this, blockNumber); - blockNodeConnectionManager.scheduleConnectionAttempt( - this, BlockNodeConnectionManager.INITIAL_RETRY_DELAY, blockNumber); - } - /** * Restarts the worker thread at a specific block number without ending the stream. * This method will interrupt the current worker thread if it exists, @@ -569,7 +596,7 @@ private void jumpToBlock(final long blockNumber) { } @Override - public void onSubscribe(Flow.Subscription subscription) { + public void onSubscribe(final Flow.Subscription subscription) { logger.debug("[{}] onSubscribe", this); subscription.request(Long.MAX_VALUE); } @@ -622,7 +649,11 @@ public void onNext(final @NonNull PublishStreamResponse response) { */ @Override public void onError(final Throwable error) { - logger.debug("[{}] onError invoked {}", this, error.getMessage()); + if (error instanceof final GrpcException grpcException) { + logger.debug("[{}] Error received (grpcStatus={})", this, grpcException.status(), grpcException); + } else { + logger.debug("[{}] Error received", this, error); + } // Check if already in terminal state if (getConnectionState() == ConnectionState.CLOSED) { @@ -665,7 +696,8 @@ public ConnectionState getConnectionState() { @Override public String toString() { - return blockNodeConfig.address() + ":" + blockNodeConfig.port() + "/" + getConnectionState(); + return connectionId + "/" + blockNodeConfig.address() + ":" + blockNodeConfig.port() + "/" + + getConnectionState(); } @Override @@ -674,11 +706,11 @@ public boolean equals(final Object o) { return false; } final BlockNodeConnection that = (BlockNodeConnection) o; - return Objects.equals(blockNodeConfig, that.blockNodeConfig); + return connectionId == that.connectionId && Objects.equals(blockNodeConfig, that.blockNodeConfig); } @Override public int hashCode() { - return Objects.hash(blockNodeConfig); + return Objects.hash(blockNodeConfig, connectionId); } } diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 37e414e17d5f..df7981f6ae9b 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -390,7 +390,7 @@ public void connectionResetsTheStream(@NonNull final BlockNodeConnection connect */ private void removeConnectionAndClearActive(@NonNull final BlockNodeConnection connection) { requireNonNull(connection); - connections.remove(connection.getNodeConfig()); + connections.remove(connection.getNodeConfig(), connection); activeConnectionRef.compareAndSet(connection, null); } @@ -672,33 +672,40 @@ public void updateLastVerifiedBlock(@NonNull final BlockNodeConfig blockNodeConf blockBufferService.setLatestAcknowledgedBlock(blockNumber); } + private void sleep(final Duration duration) { + try { + Thread.sleep(duration); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + private void blockStreamWorkerLoop() { while (isConnectionManagerActive.get()) { + // use the same connection for all operations per iteration + final BlockNodeConnection connection = activeConnectionRef.get(); + + if (connection == null) { + sleep(workerLoopSleepDuration()); + continue; + } + try { // If signaled to jump to a specific block, do so jumpToBlockIfNeeded(); - final boolean shouldSleep = processStreamingToBlockNode(); + final boolean shouldSleep = processStreamingToBlockNode(connection); // Sleep for a short duration to avoid busy waiting if (shouldSleep) { - Thread.sleep(workerLoopSleepDuration()); + sleep(workerLoopSleepDuration()); } - } catch (final InterruptedException e) { - logger.error("Block stream worker interrupted", e); - Thread.currentThread().interrupt(); } catch (final UncheckedIOException e) { - logger.debug("UncheckedIOException caught in block stream worker loop {}", e.getMessage()); - final BlockNodeConnection activeConnection = activeConnectionRef.get(); - if (activeConnection != null) { - activeConnection.handleStreamFailureWithoutOnComplete(); - } + logger.debug("UncheckedIOException caught in block stream worker loop", e); + connection.handleStreamFailureWithoutOnComplete(); } catch (final Exception e) { - logger.debug("Exception caught in block stream worker loop {}", e.getMessage()); - final BlockNodeConnection activeConnection = activeConnectionRef.get(); - if (activeConnection != null) { - activeConnection.handleStreamFailure(); - } + logger.debug("Exception caught in block stream worker loop", e); + connection.handleStreamFailure(); } } } @@ -709,8 +716,7 @@ private void blockStreamWorkerLoop() { * @return true if the worker thread should sleep because of a lack of work to do, else false (the worker thread * should NOT sleep) */ - private boolean processStreamingToBlockNode() { - final BlockNodeConnection connection = activeConnectionRef.get(); + private boolean processStreamingToBlockNode(final BlockNodeConnection connection) { if (connection == null) { return true; } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 98a333d2fdc5..5aee57c1a569 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -2,11 +2,15 @@ package com.hedera.node.app.blocks.impl.streaming; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchRuntimeException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -26,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.hiero.block.api.BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient; import org.hiero.block.api.PublishStreamRequest; import org.hiero.block.api.PublishStreamRequest.EndStream; @@ -44,12 +49,15 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { private static final long ONCE_PER_DAY_MILLIS = Duration.ofHours(24).toMillis(); private static final VarHandle isStreamingEnabledHandle; + private static final VarHandle connectionStateHandle; static { try { final Lookup lookup = MethodHandles.lookup(); isStreamingEnabledHandle = MethodHandles.privateLookupIn(BlockNodeConnectionManager.class, lookup) .findVarHandle(BlockNodeConnectionManager.class, "isStreamingEnabled", AtomicBoolean.class); + connectionStateHandle = MethodHandles.privateLookupIn(BlockNodeConnection.class, lookup) + .findVarHandle(BlockNodeConnection.class, "connectionState", AtomicReference.class); } catch (final Exception e) { throw new RuntimeException(e); } @@ -493,6 +501,34 @@ void testSendRequest_observerNull() { verifyNoInteractions(bufferService); } + @Test + void testSendRequest_errorWhileActive() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + doThrow(new RuntimeException("kaboom!")).when(requestPipeline).onNext(any()); + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + final RuntimeException e = catchRuntimeException(() -> connection.sendRequest(request)); + assertThat(e).isInstanceOf(RuntimeException.class).hasMessage("kaboom!"); + } + + @Test + void testSendRequest_errorWhileNotActive() { + openConnectionAndResetMocks(); + doThrow(new RuntimeException("kaboom!")).when(requestPipeline).onNext(any()); + + final BlockNodeConnection spiedConnection = spy(connection); + doReturn(ConnectionState.ACTIVE, ConnectionState.CLOSING) + .when(spiedConnection) + .getConnectionState(); + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + spiedConnection.sendRequest(request); + + verify(requestPipeline).onNext(any()); + verify(spiedConnection, times(2)).getConnectionState(); + } + @Test void testClose() { openConnectionAndResetMocks(); @@ -543,6 +579,32 @@ void testClose_failure() { verifyNoInteractions(bufferService); } + @Test + void testClose_alreadyClosed() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.CLOSED); + + connection.close(true); + + verifyNoInteractions(connectionManager); + verifyNoInteractions(requestPipeline); + verifyNoInteractions(metrics); + verifyNoInteractions(bufferService); + } + + @Test + void testClose_alreadyClosing() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.CLOSING); + + connection.close(true); + + verifyNoInteractions(connectionManager); + verifyNoInteractions(requestPipeline); + verifyNoInteractions(metrics); + verifyNoInteractions(bufferService); + } + @Test void testOnError() { openConnectionAndResetMocks(); @@ -609,4 +671,11 @@ private void resetMocks() { private AtomicBoolean isStreamingEnabled() { return (AtomicBoolean) isStreamingEnabledHandle.get(connectionManager); } + + @SuppressWarnings("unchecked") + private AtomicReference connectionState() { + return (AtomicReference) connectionStateHandle.get(connection); + } + + private static class TracedAtomicBoolean extends AtomicBoolean {} } diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java index 8992aca345e1..c1aaf335ca95 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java @@ -355,6 +355,8 @@ public void onNext(final PublishStreamRequest request) { final var header = item.blockHeader(); final long blockNumber = header.number(); + handleBehindResponse(replies, blockNumber, Long.MAX_VALUE - 1); + // We might want to catch up using a supplier from // another BN simulator if (externalLastVerifiedBlockNumberSupplier != null From 51188b36fbc8bda1983efea36eebd1e046fb9a29 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Thu, 25 Sep 2025 12:12:16 -0500 Subject: [PATCH 2/6] fix tests and merge issues Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 2 +- .../streaming/BlockNodeConnectionManager.java | 1 + .../BlockNodeConnectionManagerTest.java | 20 +++++++++---------- .../streaming/BlockNodeConnectionTest.java | 5 +++++ .../simulator/SimulatedBlockNodeServer.java | 2 -- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index af8d090a9f89..7fff92d2b472 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -520,8 +520,8 @@ connection is in another state (e.g. CLOSING) then we want to ignore the error. */ if (getConnectionState() == ConnectionState.ACTIVE) { blockStreamMetrics.recordRequestSendFailure(); - } throw e; + } } } } diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 0d92207fd113..8d3ff60bbe58 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -717,6 +717,7 @@ private void blockStreamWorkerLoop() { /** * Send at most one request to the active block node - if there is one. * + * @param connection the connection to use for streaming block data * @return true if the worker thread should sleep because of a lack of work to do, else false (the worker thread * should NOT sleep) */ diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index f8787aa9a669..dcdfb0899108 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -104,8 +104,8 @@ class BlockNodeConnectionManagerTest extends BlockNodeCommunicationTestBase { jumpToBlockIfNeeded.setAccessible(true); jumpToBlockIfNeededHandle = lookup.unreflect(jumpToBlockIfNeeded); - final Method processStreamingToBlockNode = - BlockNodeConnectionManager.class.getDeclaredMethod("processStreamingToBlockNode"); + final Method processStreamingToBlockNode = BlockNodeConnectionManager.class + .getDeclaredMethod("processStreamingToBlockNode", BlockNodeConnection.class); processStreamingToBlockNode.setAccessible(true); processStreamingToBlockNodeHandle = lookup.unreflect(processStreamingToBlockNode); @@ -1022,7 +1022,7 @@ void testProcessStreamingToBlockNode_missingBlock_latestBlockAfterCurrentStreami doReturn(null).when(bufferService).getBlockState(10L); doReturn(11L).when(bufferService).getLastBlockNumberProduced(); - final boolean shouldSleep = invoke_processStreamingToBlockNode(); + final boolean shouldSleep = invoke_processStreamingToBlockNode(connection); assertThat(shouldSleep).isTrue(); @@ -1050,7 +1050,7 @@ void testProcessStreamingToBlockNode_missingBlock() { doReturn(null).when(bufferService).getBlockState(10L); doReturn(10L).when(bufferService).getLastBlockNumberProduced(); - final boolean shouldSleep = invoke_processStreamingToBlockNode(); + final boolean shouldSleep = invoke_processStreamingToBlockNode(connection); assertThat(shouldSleep).isTrue(); @@ -1073,7 +1073,7 @@ void testProcessStreamingToBlockNode_zeroRequests() { doReturn(blockState).when(bufferService).getBlockState(10L); doReturn(10L).when(bufferService).getLastBlockNumberProduced(); - final boolean shouldSleep = invoke_processStreamingToBlockNode(); + final boolean shouldSleep = invoke_processStreamingToBlockNode(connection); assertThat(shouldSleep).isTrue(); @@ -1099,7 +1099,7 @@ void testProcessStreamingToBlockNode_requestsReady() { doReturn(blockState).when(bufferService).getBlockState(10L); doReturn(10L).when(bufferService).getLastBlockNumberProduced(); - final boolean shouldSleep = invoke_processStreamingToBlockNode(); + final boolean shouldSleep = invoke_processStreamingToBlockNode(connection); assertThat(shouldSleep).isTrue(); // there is nothing in the queue left to process, so we should sleep verify(bufferService).getBlockState(10L); @@ -1127,7 +1127,7 @@ void testProcessStreamingToBlockNode_blockEnd_moveToNextBlock() { doReturn(blockState).when(bufferService).getBlockState(10L); doReturn(10L).when(bufferService).getLastBlockNumberProduced(); - final boolean shouldSleep = invoke_processStreamingToBlockNode(); + final boolean shouldSleep = invoke_processStreamingToBlockNode(connection); assertThat(shouldSleep) .isFalse(); // since we are moving blocks, we should not sleep and instead immediately re-check assertThat(currentStreamingBlock).hasValue(11L); // this should get incremented as we move to next @@ -1157,7 +1157,7 @@ void testProcessStreamingToBlockNode_moreRequestsAvailable() { doReturn(blockState).when(bufferService).getBlockState(10L); doReturn(10L).when(bufferService).getLastBlockNumberProduced(); - final boolean shouldSleep = invoke_processStreamingToBlockNode(); + final boolean shouldSleep = invoke_processStreamingToBlockNode(connection); assertThat(shouldSleep).isFalse(); // there is nothing in the queue left to process, so we should sleep verify(bufferService).getBlockState(10L); @@ -1480,9 +1480,9 @@ private void invoke_jumpToBlockIfNeeded() { } } - private boolean invoke_processStreamingToBlockNode() { + private boolean invoke_processStreamingToBlockNode(final BlockNodeConnection connection) { try { - return (Boolean) processStreamingToBlockNodeHandle.invoke(connectionManager); + return (Boolean) processStreamingToBlockNodeHandle.invoke(connectionManager, connection); } catch (final Throwable t) { throw new RuntimeException(t); } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 9cd096298041..dc46023c35cf 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -521,6 +521,9 @@ void testSendRequest_errorWhileActive() { final RuntimeException e = catchRuntimeException(() -> connection.sendRequest(request)); assertThat(e).isInstanceOf(RuntimeException.class).hasMessage("kaboom!"); + + verify(metrics).recordRequestSendFailure(); + verifyNoMoreInteractions(metrics); } @Test @@ -538,6 +541,8 @@ void testSendRequest_errorWhileNotActive() { verify(requestPipeline).onNext(any()); verify(spiedConnection, times(2)).getConnectionState(); + + verifyNoInteractions(metrics); } @Test diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java index c1aaf335ca95..8992aca345e1 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java @@ -355,8 +355,6 @@ public void onNext(final PublishStreamRequest request) { final var header = item.blockHeader(); final long blockNumber = header.number(); - handleBehindResponse(replies, blockNumber, Long.MAX_VALUE - 1); - // We might want to catch up using a supplier from // another BN simulator if (externalLastVerifiedBlockNumberSupplier != null From 3c8b4bab831f0c2e0b534b3906baa04476825014 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Thu, 25 Sep 2025 14:23:07 -0500 Subject: [PATCH 3/6] fix HAPI tests Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 48 ++++++++++++++----- .../BlockNodeConnectionManagerTest.java | 4 +- .../bdd/suites/blocknode/BlockNodeSuite.java | 43 +++++++++-------- 3 files changed, 62 insertions(+), 33 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 7fff92d2b472..d3614b599887 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -12,6 +12,7 @@ import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.Pipeline; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import java.time.Duration; import java.util.Objects; import java.util.concurrent.Flow; @@ -180,17 +181,46 @@ public synchronized void createRequestPipeline() { /** * Updates the connection's state. * @param newState the new state to transition to + * @return true if the state was successfully updated to the new state, else false */ - public void updateConnectionState(@NonNull final ConnectionState newState) { + public boolean updateConnectionState(@NonNull final ConnectionState newState) { + return updateConnectionState(null, newState); + } + + /** + * Updates this connection's state if the current state matches the expected states (if specified). + * + * @param expectedCurrentState the expected current connection state (optional) + * @param newState the new state to transition to + * @return true if the state was successfully updated to the new state, else false + */ + private boolean updateConnectionState( + @Nullable final ConnectionState expectedCurrentState, @NonNull final ConnectionState newState) { requireNonNull(newState, "newState must not be null"); - final ConnectionState oldState = connectionState.getAndSet(newState); - logger.debug("[{}] Connection state transitioned from {} to {}", this, oldState, newState); + + if (expectedCurrentState != null) { + if (connectionState.compareAndSet(expectedCurrentState, newState)) { + logger.debug("[{}] Connection state transitioned from {} to {}", this, expectedCurrentState, newState); + } else { + logger.warn( + "[{}] Failed to transition state from {} to {} because current state does not match expected state", + this, + expectedCurrentState, + newState); + return false; + } + } else { + final ConnectionState oldState = connectionState.getAndSet(newState); + logger.debug("[{}] Connection state transitioned from {} to {}", this, oldState, newState); + } if (newState == ConnectionState.ACTIVE) { scheduleStreamReset(); } else { cancelStreamReset(); } + + return true; } /** @@ -532,13 +562,13 @@ connection is in another state (e.g. CLOSING) then we want to ignore the error. * @param callOnComplete whether to call onComplete on the request pipeline */ public void close(final boolean callOnComplete) { - final ConnectionState connState = connectionState.get(); + final ConnectionState connState = getConnectionState(); if (connState == ConnectionState.CLOSING || connState == ConnectionState.CLOSED) { logger.debug("[{}] Connection already in terminal state ({})", this, connState); return; } - if (!connectionState.compareAndSet(connState, ConnectionState.CLOSING)) { + if (!updateConnectionState(connState, ConnectionState.CLOSING)) { logger.debug("[{}] State changed while trying to close connection; aborting close attempt", this); return; } @@ -566,20 +596,16 @@ private void closePipeline(final boolean callOnComplete) { streamShutdownInProgress.set(true); try { - final ConnectionState state = connectionState.get(); + final ConnectionState state = getConnectionState(); if (state == ConnectionState.CLOSING && callOnComplete) { pipeline.onComplete(); logger.debug("[{}] Request pipeline successfully closed", this); - } else { - logger.debug( - "[{}] Attempted to close pipeline, but connection state is not CLOSING (actual={})", - this, - state); } } catch (final Exception e) { logger.warn("[{}] Error while completing request pipeline", this, e); } // Clear the pipeline reference to prevent further use + logger.debug("[{}] Request pipeline removed", this); requestPipelineRef.compareAndSet(pipeline, null); } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index dcdfb0899108..023b4e433181 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -104,8 +104,8 @@ class BlockNodeConnectionManagerTest extends BlockNodeCommunicationTestBase { jumpToBlockIfNeeded.setAccessible(true); jumpToBlockIfNeededHandle = lookup.unreflect(jumpToBlockIfNeeded); - final Method processStreamingToBlockNode = BlockNodeConnectionManager.class - .getDeclaredMethod("processStreamingToBlockNode", BlockNodeConnection.class); + final Method processStreamingToBlockNode = BlockNodeConnectionManager.class.getDeclaredMethod( + "processStreamingToBlockNode", BlockNodeConnection.class); processStreamingToBlockNode.setAccessible(true); processStreamingToBlockNodeHandle = lookup.unreflect(processStreamingToBlockNode); diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index 5f6d0a04adb3..beaa1eede870 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -142,10 +142,10 @@ final Stream node0StreamingBlockNodeConnectionDropsCanStreamGenesis Duration.of(30, SECONDS), Duration.of(45, SECONDS), String.format( - "[localhost:%s/ACTIVE] Block node reported it is behind. Will restart stream at block 0.", + "/localhost:%s/ACTIVE] Block node reported it is behind. Will restart stream at block 0.", portNumbers.getFirst()), String.format( - "[localhost:%s/ACTIVE] Received EndOfStream response (block=9223372036854775807, responseCode=BEHIND)", + "/localhost:%s/ACTIVE] Received EndOfStream response (block=9223372036854775807, responseCode=BEHIND)", portNumbers.getFirst()))), doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(10).toNanos()))); @@ -192,10 +192,10 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { "onError invoked", String.format("Selected block node localhost:%s for connection attempt", portNumbers.get(1)), String.format( - "[localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", + "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", portNumbers.get(1)), String.format( - "[localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", + "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", portNumbers.get(1)))), waitUntilNextBlocks(10).withBackgroundTraffic(true), doingContextual(spec -> connectionDropTime.set(Instant.now())), @@ -206,10 +206,10 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { Duration.ofMinutes(1), Duration.of(45, SECONDS), String.format( - "[localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", + "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", portNumbers.get(2)), String.format( - "[localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", + "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", portNumbers.get(2)))), waitUntilNextBlocks(10).withBackgroundTraffic(true), doingContextual(spec -> connectionDropTime.set(Instant.now())), @@ -220,10 +220,10 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { Duration.ofMinutes(1), Duration.of(45, SECONDS), String.format( - "[localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", + "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", portNumbers.get(3)), String.format( - "[localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", + "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", portNumbers.get(3)))), waitUntilNextBlocks(10).withBackgroundTraffic(true), doingContextual(spec -> connectionDropTime.set(Instant.now())), @@ -234,14 +234,17 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { Duration.ofMinutes(1), Duration.of(45, SECONDS), String.format( - "[localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", + "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", portNumbers.get(1)), String.format( - "[localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", + "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", portNumbers.get(1)), - String.format("[localhost:%s/ACTIVE] Closing connection...", portNumbers.get(3)), + String.format("/localhost:%s/CLOSING] Closing connection...", portNumbers.get(3)), String.format( - "[localhost:%s/CLOSED] Connection state transitioned from ACTIVE to CLOSED", + "/localhost:%s/CLOSING] Connection state transitioned from ACTIVE to CLOSING", + portNumbers.get(3)), + String.format( + "/localhost:%s/CLOSED] Connection state transitioned from CLOSING to CLOSED", portNumbers.get(3)))), doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(20).toNanos()))); @@ -402,7 +405,7 @@ final Stream activeConnectionPeriodicallyRestarts() { Duration.of(30, SECONDS), Duration.of(15, SECONDS), String.format( - "[localhost:%s/ACTIVE] Scheduled periodic stream reset every PT10S", + "/localhost:%s/ACTIVE] Scheduled periodic stream reset every PT10S", portNumbers.getFirst()))), waitUntilNextBlocks(6).withBackgroundTraffic(true), sourcingContextual(spec -> assertHgcaaLogContainsTimeframe( @@ -412,12 +415,12 @@ final Stream activeConnectionPeriodicallyRestarts() { Duration.of(15, SECONDS), // Verify that the periodic reset is performed after the period and the connection is closed String.format( - "[localhost:%s/ACTIVE] Performing scheduled stream reset", portNumbers.getFirst()), - String.format("[localhost:%s/ACTIVE] Closing connection...", portNumbers.getFirst()), + "/localhost:%s/ACTIVE] Performing scheduled stream reset", portNumbers.getFirst()), + String.format("/localhost:%s/CLOSING] Closing connection...", portNumbers.getFirst()), String.format( - "[localhost:%s/CLOSED] Connection state transitioned from ACTIVE to CLOSED", + "/localhost:%s/CLOSING] Connection state transitioned from ACTIVE to CLOSING", portNumbers.getFirst()), - String.format("[localhost:%s/CLOSED] Connection successfully closed", portNumbers.getFirst()), + String.format("/localhost:%s/CLOSING] Connection successfully closed", portNumbers.getFirst()), // Select the next block node to connect to based on priorities "Selected block node", "Running connection task...", @@ -544,14 +547,14 @@ final Stream node0StreamingMultipleEndOfStreamsReceived() { Duration.ofMinutes(1), Duration.of(45, SECONDS), String.format( - "[localhost:%s/ACTIVE] Block node has exceeded the allowed number of EndOfStream responses", + "/localhost:%s/ACTIVE] Block node has exceeded the allowed number of EndOfStream responses", portNumbers.getFirst()), String.format("Selected block node localhost:%s for connection attempt", portNumbers.getLast()), String.format( - "[localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", + "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", portNumbers.getLast()), String.format( - "[localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", + "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE", portNumbers.getLast()))), waitUntilNextBlocks(5).withBackgroundTraffic(true)); } From 1303a54f77cc694c9d9a11bb69d76daa87ac493e Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Thu, 25 Sep 2025 14:29:58 -0500 Subject: [PATCH 4/6] revert gitignore addition Signed-off-by: Tim Farber-Newman --- .gitignore | 3 --- 1 file changed, 3 deletions(-) diff --git a/.gitignore b/.gitignore index 2799257c4a62..4a4ae44edfab 100644 --- a/.gitignore +++ b/.gitignore @@ -1086,6 +1086,3 @@ hedera-state-validator/validator.log snyk-code.html snyk-code.json snyk/ - -### Profiler config -.profileconfig.json \ No newline at end of file From d0cabf08cc0eb635fe01854c5f8b66fbe56c5693 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Fri, 26 Sep 2025 11:37:25 -0500 Subject: [PATCH 5/6] pr feedback Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 55 +++++++++++++------ .../streaming/BlockNodeConnectionManager.java | 2 +- .../BlockNodeConnectionManagerTest.java | 34 ++++++++++++ .../streaming/BlockNodeConnectionTest.java | 15 ++++- .../bdd/suites/blocknode/BlockNodeSuite.java | 1 - 5 files changed, 87 insertions(+), 20 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index d3614b599887..01771a838691 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -114,21 +114,37 @@ public enum ConnectionState { /** * bidi RequestObserver needs to be created. */ - UNINITIALIZED, + UNINITIALIZED(false), /** * bidi RequestObserver is established but this connection has not been chosen as the active one (priority based). */ - PENDING, + PENDING(false), /** * Connection is active. Block Stream Worker Thread is sending PublishStreamRequest's to the block node through async bidi stream. */ - ACTIVE, - CLOSING, + ACTIVE(false), + /** + * The connection is being closed. Once in this state, only cleanup operations should be permitted. + */ + CLOSING(true), /** * Connection has been closed and pipeline terminated. This is a terminal state. * No more requests can be sent and no more responses will be received. */ - CLOSED + CLOSED(true); + + private final boolean isTerminal; + + ConnectionState(final boolean isTerminal) { + this.isTerminal = isTerminal; + } + + /** + * @return true if the state represents a terminal or end-state for the connection lifecycle, else false + */ + boolean isTerminal() { + return isTerminal; + } } /** @@ -530,6 +546,13 @@ public void sendRequest(@NonNull final PublishStreamRequest request) { final Pipeline pipeline = requestPipelineRef.get(); if (getConnectionState() == ConnectionState.ACTIVE && pipeline != null) { try { + if (logger.isDebugEnabled()) { + logger.debug( + "[{}] Sending request to block node (type={}, bytes={})", + this, + request.request().kind(), + request.protobufSize()); + } pipeline.onNext(request); if (request.hasEndStream()) { @@ -563,7 +586,7 @@ connection is in another state (e.g. CLOSING) then we want to ignore the error. */ public void close(final boolean callOnComplete) { final ConnectionState connState = getConnectionState(); - if (connState == ConnectionState.CLOSING || connState == ConnectionState.CLOSED) { + if (connState.isTerminal()) { logger.debug("[{}] Connection already in terminal state ({})", this, connState); return; } @@ -688,18 +711,16 @@ public void onNext(final @NonNull PublishStreamResponse response) { */ @Override public void onError(final Throwable error) { - if (error instanceof final GrpcException grpcException) { - logger.debug("[{}] Error received (grpcStatus={})", this, grpcException.status(), grpcException); - } else { - logger.debug("[{}] Error received", this, error); - } - blockStreamMetrics.recordConnectionOnError(); + // Suppress errors that happen when the connection is in a terminal state + if (!getConnectionState().isTerminal()) { + blockStreamMetrics.recordConnectionOnError(); + + if (error instanceof final GrpcException grpcException) { + logger.debug("[{}] Error received (grpcStatus={})", this, grpcException.status(), grpcException); + } else { + logger.debug("[{}] Error received", this, error); + } - // Check if already in terminal state - if (getConnectionState() == ConnectionState.CLOSED) { - logger.debug("[{}] onError invoked but connection is already closed", this); - } else if (getConnectionState() == ConnectionState.ACTIVE || getConnectionState() == ConnectionState.PENDING) { - logger.warn("[{}] onError being handled", this, error); handleStreamFailure(); } } diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 8d3ff60bbe58..56b47c154111 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -722,7 +722,7 @@ private void blockStreamWorkerLoop() { * should NOT sleep) */ private boolean processStreamingToBlockNode(final BlockNodeConnection connection) { - if (connection == null) { + if (connection == null || ConnectionState.ACTIVE != connection.getConnectionState()) { return true; } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index 023b4e433181..a22c11742a3f 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -1018,6 +1018,7 @@ void testProcessStreamingToBlockNode_missingBlock_latestBlockAfterCurrentStreami activeConnectionRef.set(connection); final AtomicLong currentStreamingBlock = streamingBlockNumber(); currentStreamingBlock.set(10L); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(node1Config).when(connection).getNodeConfig(); doReturn(null).when(bufferService).getBlockState(10L); doReturn(11L).when(bufferService).getLastBlockNumberProduced(); @@ -1047,6 +1048,7 @@ void testProcessStreamingToBlockNode_missingBlock() { activeConnectionRef.set(connection); final AtomicLong currentStreamingBlock = streamingBlockNumber(); currentStreamingBlock.set(10L); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(null).when(bufferService).getBlockState(10L); doReturn(10L).when(bufferService).getLastBlockNumberProduced(); @@ -1070,6 +1072,7 @@ void testProcessStreamingToBlockNode_zeroRequests() { final AtomicLong currentStreamingBlock = streamingBlockNumber(); currentStreamingBlock.set(10L); final BlockState blockState = new BlockState(10L); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(blockState).when(bufferService).getBlockState(10L); doReturn(10L).when(bufferService).getLastBlockNumberProduced(); @@ -1094,6 +1097,7 @@ void testProcessStreamingToBlockNode_requestsReady() { currentStreamingBlock.set(10L); final BlockState blockState = mock(BlockState.class); final PublishStreamRequest req = createRequest(newBlockHeaderItem()); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(req).when(blockState).getRequest(0); doReturn(1).when(blockState).numRequestsCreated(); doReturn(blockState).when(bufferService).getBlockState(10L); @@ -1121,6 +1125,7 @@ void testProcessStreamingToBlockNode_blockEnd_moveToNextBlock() { currentStreamingBlock.set(10L); final BlockState blockState = mock(BlockState.class); final PublishStreamRequest req = createRequest(newBlockHeaderItem()); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(req).when(blockState).getRequest(0); doReturn(1).when(blockState).numRequestsCreated(); doReturn(true).when(blockState).isBlockProofSent(); @@ -1151,6 +1156,7 @@ void testProcessStreamingToBlockNode_moreRequestsAvailable() { currentStreamingBlock.set(10L); final BlockState blockState = mock(BlockState.class); final PublishStreamRequest req = createRequest(newBlockHeaderItem()); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(req).when(blockState).getRequest(0); doReturn(2).when(blockState).numRequestsCreated(); doReturn(false).when(blockState).isBlockProofSent(); @@ -1170,6 +1176,32 @@ void testProcessStreamingToBlockNode_moreRequestsAvailable() { verifyNoInteractions(metrics); } + @Test + void testProcessStreamingToBlockNode_noConnection() { + final boolean shouldSleep = invoke_processStreamingToBlockNode(null); + assertThat(shouldSleep).isTrue(); + + verifyNoInteractions(bufferService); + verifyNoInteractions(executorService); + verifyNoInteractions(metrics); + } + + @Test + void testProcessStreamingToBlockNode_connectionNotActive() { + final BlockNodeConnection connection = mock(BlockNodeConnection.class); + doReturn(ConnectionState.CLOSING).when(connection).getConnectionState(); + + final boolean shouldSleep = invoke_processStreamingToBlockNode(connection); + assertThat(shouldSleep).isTrue(); + + verify(connection).getConnectionState(); + + verifyNoMoreInteractions(connection); + verifyNoInteractions(bufferService); + verifyNoInteractions(executorService); + verifyNoInteractions(metrics); + } + @Test void testBlockStreamWorkerLoop_managerNotActive() { final AtomicBoolean isActive = isActiveFlag(); @@ -1193,6 +1225,7 @@ void testBlockStreamWorkerLoop() throws InterruptedException { final BlockState blockState = mock(BlockState.class); final PublishStreamRequest req1 = createRequest(newBlockHeaderItem()); final PublishStreamRequest req2 = createRequest(newBlockProofItem()); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(req1).when(blockState).getRequest(0); doReturn(req2).when(blockState).getRequest(1); doReturn(2).when(blockState).numRequestsCreated(); @@ -1247,6 +1280,7 @@ void testBlockStreamWorkerLoop_failure() throws InterruptedException { final BlockState blockState = mock(BlockState.class); final PublishStreamRequest req1 = createRequest(newBlockHeaderItem()); final PublishStreamRequest req2 = createRequest(newBlockProofItem()); + doReturn(ConnectionState.ACTIVE).when(connection).getConnectionState(); doReturn(req1).when(blockState).getRequest(0); doReturn(req2).when(blockState).getRequest(1); doReturn(2).when(blockState).numRequestsCreated(); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index dc46023c35cf..c02d5da47a5f 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -624,7 +624,7 @@ void testClose_alreadyClosing() { } @Test - void testOnError() { + void testOnError_activeConnection() { openConnectionAndResetMocks(); connection.updateConnectionState(ConnectionState.ACTIVE); @@ -644,6 +644,19 @@ void testOnError() { verifyNoInteractions(bufferService); } + @Test + void testOnError_terminalConnection() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.CLOSING); + + connection.onError(new RuntimeException("oh bother")); + + verifyNoInteractions(metrics); + verifyNoInteractions(requestPipeline); + verifyNoInteractions(connectionManager); + verifyNoInteractions(bufferService); + } + @Test void testOnCompleted_streamClosingInProgress() { openConnectionAndResetMocks(); diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index beaa1eede870..0366bbb1bf87 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -189,7 +189,6 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { connectionDropTime::get, Duration.ofMinutes(1), Duration.of(45, SECONDS), - "onError invoked", String.format("Selected block node localhost:%s for connection attempt", portNumbers.get(1)), String.format( "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING", From fb14a421c907844e43ed787ec57042ff015fc106 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Fri, 26 Sep 2025 13:32:29 -0500 Subject: [PATCH 6/6] log payload size at trace level Signed-off-by: Tim Farber-Newman --- .../app/blocks/impl/streaming/BlockNodeConnection.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 01771a838691..76a2e5743b0b 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -548,6 +548,15 @@ public void sendRequest(@NonNull final PublishStreamRequest request) { try { if (logger.isDebugEnabled()) { logger.debug( + "[{}] Sending request to block node (type={})", + this, + request.request().kind()); + } else if (logger.isTraceEnabled()) { + /* + PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur + a performance penality. Therefore, we only want to log the byte size at trace level. + */ + logger.trace( "[{}] Sending request to block node (type={}, bytes={})", this, request.request().kind(),