99import com .hedera .node .config .ConfigProvider ;
1010import com .hedera .node .config .data .BlockNodeConnectionConfig ;
1111import com .hedera .node .internal .network .BlockNodeConfig ;
12+ import com .hedera .pbj .runtime .grpc .GrpcException ;
1213import com .hedera .pbj .runtime .grpc .Pipeline ;
1314import edu .umd .cs .findbugs .annotations .NonNull ;
15+ import edu .umd .cs .findbugs .annotations .Nullable ;
1416import java .time .Duration ;
1517import java .util .Objects ;
1618import java .util .concurrent .Flow ;
1719import java .util .concurrent .ScheduledExecutorService ;
1820import java .util .concurrent .ScheduledFuture ;
1921import java .util .concurrent .TimeUnit ;
2022import java .util .concurrent .atomic .AtomicBoolean ;
23+ import java .util .concurrent .atomic .AtomicLong ;
2124import java .util .concurrent .atomic .AtomicReference ;
2225import org .apache .logging .log4j .LogManager ;
2326import org .apache .logging .log4j .Logger ;
@@ -46,6 +49,10 @@ public class BlockNodeConnection implements Pipeline<PublishStreamResponse> {
4649
4750 private static final Logger logger = LogManager .getLogger (BlockNodeConnection .class );
4851
52+ /**
53+ * Counter used to get unique identities for connection instances.
54+ */
55+ private static final AtomicLong connectionIdCounter = new AtomicLong (0 );
4956 /**
5057 * A longer retry delay for when the connection encounters an error.
5158 */
@@ -80,7 +87,7 @@ public class BlockNodeConnection implements Pipeline<PublishStreamResponse> {
8087 */
8188 private final BlockStreamPublishServiceClient blockStreamPublishServiceClient ;
8289
83- private Pipeline <? super PublishStreamRequest > requestPipeline ;
90+ private final AtomicReference < Pipeline <? super PublishStreamRequest >> requestPipelineRef = new AtomicReference <>() ;
8491 /**
8592 * Reference to the current state of this connection.
8693 */
@@ -95,6 +102,10 @@ public class BlockNodeConnection implements Pipeline<PublishStreamResponse> {
95102 * When the connection is closed or reset, this task is cancelled.
96103 */
97104 private ScheduledFuture <?> streamResetTask ;
105+ /**
106+ * The unique ID of this connection instance.
107+ */
108+ private final String connectionId ;
98109
99110 /**
100111 * Represents the possible states of a Block Node connection.
@@ -103,20 +114,37 @@ public enum ConnectionState {
103114 /**
104115 * bidi RequestObserver needs to be created.
105116 */
106- UNINITIALIZED ,
117+ UNINITIALIZED ( false ) ,
107118 /**
108119 * bidi RequestObserver is established but this connection has not been chosen as the active one (priority based).
109120 */
110- PENDING ,
121+ PENDING ( false ) ,
111122 /**
112123 * Connection is active. Block Stream Worker Thread is sending PublishStreamRequest's to the block node through async bidi stream.
113124 */
114- ACTIVE ,
125+ ACTIVE (false ),
126+ /**
127+ * The connection is being closed. Once in this state, only cleanup operations should be permitted.
128+ */
129+ CLOSING (true ),
115130 /**
116131 * Connection has been closed and pipeline terminated. This is a terminal state.
117132 * No more requests can be sent and no more responses will be received.
118133 */
119- CLOSED
134+ CLOSED (true );
135+
136+ private final boolean isTerminal ;
137+
138+ ConnectionState (final boolean isTerminal ) {
139+ this .isTerminal = isTerminal ;
140+ }
141+
142+ /**
143+ * @return true if the state represents a terminal or end-state for the connection lifecycle, else false
144+ */
145+ boolean isTerminal () {
146+ return isTerminal ;
147+ }
120148 }
121149
122150 /**
@@ -150,14 +178,17 @@ public BlockNodeConnection(
150178 final var blockNodeConnectionConfig =
151179 configProvider .getConfiguration ().getConfigData (BlockNodeConnectionConfig .class );
152180 this .streamResetPeriod = blockNodeConnectionConfig .streamResetPeriod ();
181+ connectionId = String .format ("%04d" , connectionIdCounter .incrementAndGet ());
153182 }
154183
155184 /**
156185 * Creates a new bidi request pipeline for this block node connection.
157186 */
158- public void createRequestPipeline () {
159- if (requestPipeline == null ) {
160- requestPipeline = blockStreamPublishServiceClient .publishBlockStream (this );
187+ public synchronized void createRequestPipeline () {
188+ if (requestPipelineRef .get () == null ) {
189+ final Pipeline <? super PublishStreamRequest > pipeline =
190+ blockStreamPublishServiceClient .publishBlockStream (this );
191+ requestPipelineRef .set (pipeline );
161192 updateConnectionState (ConnectionState .PENDING );
162193 blockStreamMetrics .recordConnectionOpened ();
163194 }
@@ -166,17 +197,46 @@ public void createRequestPipeline() {
166197 /**
167198 * Updates the connection's state.
168199 * @param newState the new state to transition to
200+ * @return true if the state was successfully updated to the new state, else false
201+ */
202+ public boolean updateConnectionState (@ NonNull final ConnectionState newState ) {
203+ return updateConnectionState (null , newState );
204+ }
205+
206+ /**
207+ * Updates this connection's state if the current state matches the expected states (if specified).
208+ *
209+ * @param expectedCurrentState the expected current connection state (optional)
210+ * @param newState the new state to transition to
211+ * @return true if the state was successfully updated to the new state, else false
169212 */
170- public void updateConnectionState (@ NonNull final ConnectionState newState ) {
213+ private boolean updateConnectionState (
214+ @ Nullable final ConnectionState expectedCurrentState , @ NonNull final ConnectionState newState ) {
171215 requireNonNull (newState , "newState must not be null" );
172- final ConnectionState oldState = connectionState .getAndSet (newState );
173- logger .debug ("[{}] Connection state transitioned from {} to {}" , this , oldState , newState );
216+
217+ if (expectedCurrentState != null ) {
218+ if (connectionState .compareAndSet (expectedCurrentState , newState )) {
219+ logger .debug ("[{}] Connection state transitioned from {} to {}" , this , expectedCurrentState , newState );
220+ } else {
221+ logger .warn (
222+ "[{}] Failed to transition state from {} to {} because current state does not match expected state" ,
223+ this ,
224+ expectedCurrentState ,
225+ newState );
226+ return false ;
227+ }
228+ } else {
229+ final ConnectionState oldState = connectionState .getAndSet (newState );
230+ logger .debug ("[{}] Connection state transitioned from {} to {}" , this , oldState , newState );
231+ }
174232
175233 if (newState == ConnectionState .ACTIVE ) {
176234 scheduleStreamReset ();
177235 } else {
178236 cancelStreamReset ();
179237 }
238+
239+ return true ;
180240 }
181241
182242 /**
@@ -256,7 +316,6 @@ private void closeAndRestart(final long blockNumber, final boolean callOnComplet
256316 * notifying the connection manager and calling onComplete on the request pipeline.
257317 */
258318 public void handleStreamFailure () {
259- logger .debug ("[{}] handleStreamFailure" , this );
260319 closeAndReschedule (LONGER_RETRY_DELAY , true );
261320 }
262321
@@ -265,7 +324,6 @@ public void handleStreamFailure() {
265324 * notifying the connection manager without calling onComplete on the request pipeline.
266325 */
267326 public void handleStreamFailureWithoutOnComplete () {
268- logger .debug ("[{}] handleStreamFailureWithoutOnComplete" , this );
269327 closeAndReschedule (LONGER_RETRY_DELAY , false );
270328 }
271329
@@ -286,7 +344,7 @@ private void handleAcknowledgement(@NonNull final BlockAcknowledgement acknowled
286344 * Acknowledges the blocks up to the specified block number.
287345 * @param acknowledgedBlockNumber the block number that has been known to be persisted and verified by the block node
288346 */
289- private void acknowledgeBlocks (long acknowledgedBlockNumber , boolean maybeJumpToBlock ) {
347+ private void acknowledgeBlocks (final long acknowledgedBlockNumber , final boolean maybeJumpToBlock ) {
290348 final long currentBlockStreaming = blockNodeConnectionManager .currentStreamingBlockNumber ();
291349 final long currentBlockProducing = blockBufferService .getLastBlockNumberProduced ();
292350
@@ -461,7 +519,7 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) {
461519 *
462520 * @param code the code on why stream was ended
463521 */
464- public void endTheStreamWith (PublishStreamRequest .EndStream .Code code ) {
522+ public void endTheStreamWith (final PublishStreamRequest .EndStream .Code code ) {
465523 final var earliestBlockNumber = blockBufferService .getEarliestAvailableBlockNumber ();
466524 final var highestAckedBlockNumber = blockBufferService .getHighestAckedBlockNumber ();
467525
@@ -485,9 +543,27 @@ public void endTheStreamWith(PublishStreamRequest.EndStream.Code code) {
485543 public void sendRequest (@ NonNull final PublishStreamRequest request ) {
486544 requireNonNull (request , "request must not be null" );
487545
488- if (getConnectionState () == ConnectionState .ACTIVE && requestPipeline != null ) {
546+ final Pipeline <? super PublishStreamRequest > pipeline = requestPipelineRef .get ();
547+ if (getConnectionState () == ConnectionState .ACTIVE && pipeline != null ) {
489548 try {
490- requestPipeline .onNext (request );
549+ if (logger .isDebugEnabled ()) {
550+ logger .debug (
551+ "[{}] Sending request to block node (type={})" ,
552+ this ,
553+ request .request ().kind ());
554+ } else if (logger .isTraceEnabled ()) {
555+ /*
556+ PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur
557+ a performance penality. Therefore, we only want to log the byte size at trace level.
558+ */
559+ logger .trace (
560+ "[{}] Sending request to block node (type={}, bytes={})" ,
561+ this ,
562+ request .request ().kind (),
563+ request .protobufSize ());
564+ }
565+ pipeline .onNext (request );
566+
491567 if (request .hasEndStream ()) {
492568 blockStreamMetrics .recordRequestEndStreamSent (
493569 request .endStream ().endCode ());
@@ -497,8 +573,17 @@ public void sendRequest(@NonNull final PublishStreamRequest request) {
497573 request .blockItems ().blockItems ().size ());
498574 }
499575 } catch (final RuntimeException e ) {
500- blockStreamMetrics .recordRequestSendFailure ();
501- throw e ;
576+ /*
577+ There is a possible, and somewhat expected, race condition when one thread is attempting to close this
578+ connection while a request is being sent on another thread. Because of this, an exception may get thrown
579+ but depending on the state of the connection it may be expected. Thus, if we do get an exception we only
580+ want to propagate it if the connection is still in an ACTIVE state. If we receive an error while the
581+ connection is in another state (e.g. CLOSING) then we want to ignore the error.
582+ */
583+ if (getConnectionState () == ConnectionState .ACTIVE ) {
584+ blockStreamMetrics .recordRequestSendFailure ();
585+ throw e ;
586+ }
502587 }
503588 }
504589 }
@@ -509,42 +594,51 @@ public void sendRequest(@NonNull final PublishStreamRequest request) {
509594 * @param callOnComplete whether to call onComplete on the request pipeline
510595 */
511596 public void close (final boolean callOnComplete ) {
512- if (getConnectionState () == ConnectionState .CLOSED ) {
513- logger .debug ("[{}] Connection already closed." , this );
597+ final ConnectionState connState = getConnectionState ();
598+ if (connState .isTerminal ()) {
599+ logger .debug ("[{}] Connection already in terminal state ({})" , this , connState );
600+ return ;
601+ }
602+
603+ if (!updateConnectionState (connState , ConnectionState .CLOSING )) {
604+ logger .debug ("[{}] State changed while trying to close connection; aborting close attempt" , this );
514605 return ;
515606 }
516607
517608 logger .debug ("[{}] Closing connection..." , this );
518609
519610 try {
520611 closePipeline (callOnComplete );
521- updateConnectionState (ConnectionState .CLOSED );
522612 jumpToBlock (-1L );
523613 blockStreamMetrics .recordConnectionClosed ();
524614 logger .debug ("[{}] Connection successfully closed" , this );
525615 } catch (final RuntimeException e ) {
526616 logger .warn ("[{}] Error occurred while attempting to close connection" , this );
617+ } finally {
618+ // regardless of outcome, mark the connection as closed
619+ updateConnectionState (ConnectionState .CLOSED );
527620 }
528621 }
529622
530623 private void closePipeline (final boolean callOnComplete ) {
531- if (requestPipeline != null ) {
624+ final Pipeline <? super PublishStreamRequest > pipeline = requestPipelineRef .get ();
625+
626+ if (pipeline != null ) {
532627 logger .debug ("[{}] Closing request pipeline for block node" , this );
533628 streamShutdownInProgress .set (true );
534629
535630 try {
536- if (getConnectionState () == ConnectionState .ACTIVE && callOnComplete ) {
537- requestPipeline .onComplete ();
631+ final ConnectionState state = getConnectionState ();
632+ if (state == ConnectionState .CLOSING && callOnComplete ) {
633+ pipeline .onComplete ();
538634 logger .debug ("[{}] Request pipeline successfully closed" , this );
539- } else {
540- logger .debug ("[{}] Request pipeline closed without onComplete - connection not active" , this );
541635 }
542636 } catch (final Exception e ) {
543637 logger .warn ("[{}] Error while completing request pipeline" , this , e );
544638 }
545639 // Clear the pipeline reference to prevent further use
546- // pipelineLock ensures no thread is mid-call to requestPipeline.onNext()
547- requestPipeline = null ;
640+ logger . debug ( "[{}] Request pipeline removed" , this );
641+ requestPipelineRef . compareAndSet ( pipeline , null ) ;
548642 }
549643 }
550644
@@ -572,7 +666,7 @@ private void jumpToBlock(final long blockNumber) {
572666 }
573667
574668 @ Override
575- public void onSubscribe (Flow .Subscription subscription ) {
669+ public void onSubscribe (final Flow .Subscription subscription ) {
576670 logger .debug ("[{}] onSubscribe" , this );
577671 subscription .request (Long .MAX_VALUE );
578672 }
@@ -626,14 +720,16 @@ public void onNext(final @NonNull PublishStreamResponse response) {
626720 */
627721 @ Override
628722 public void onError (final Throwable error ) {
629- logger .debug ("[{}] onError invoked {}" , this , error .getMessage ());
630- blockStreamMetrics .recordConnectionOnError ();
723+ // Suppress errors that happen when the connection is in a terminal state
724+ if (!getConnectionState ().isTerminal ()) {
725+ blockStreamMetrics .recordConnectionOnError ();
726+
727+ if (error instanceof final GrpcException grpcException ) {
728+ logger .debug ("[{}] Error received (grpcStatus={})" , this , grpcException .status (), grpcException );
729+ } else {
730+ logger .debug ("[{}] Error received" , this , error );
731+ }
631732
632- // Check if already in terminal state
633- if (getConnectionState () == ConnectionState .CLOSED ) {
634- logger .debug ("[{}] onError invoked but connection is already closed" , this );
635- } else if (getConnectionState () == ConnectionState .ACTIVE || getConnectionState () == ConnectionState .PENDING ) {
636- logger .warn ("[{}] onError being handled" , this , error );
637733 handleStreamFailure ();
638734 }
639735 }
@@ -670,7 +766,8 @@ public ConnectionState getConnectionState() {
670766
671767 @ Override
672768 public String toString () {
673- return blockNodeConfig .address () + ":" + blockNodeConfig .port () + "/" + getConnectionState ();
769+ return connectionId + "/" + blockNodeConfig .address () + ":" + blockNodeConfig .port () + "/"
770+ + getConnectionState ();
674771 }
675772
676773 @ Override
@@ -679,11 +776,11 @@ public boolean equals(final Object o) {
679776 return false ;
680777 }
681778 final BlockNodeConnection that = (BlockNodeConnection ) o ;
682- return Objects .equals (blockNodeConfig , that .blockNodeConfig );
779+ return connectionId == that . connectionId && Objects .equals (blockNodeConfig , that .blockNodeConfig );
683780 }
684781
685782 @ Override
686783 public int hashCode () {
687- return Objects .hash (blockNodeConfig );
784+ return Objects .hash (blockNodeConfig , connectionId );
688785 }
689786}
0 commit comments