Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import com.hedera.node.config.ConfigProvider;
import com.hedera.node.config.data.BlockNodeConnectionConfig;
import com.hedera.node.internal.network.BlockNodeConfig;
import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.Pipeline;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -46,6 +49,10 @@ public class BlockNodeConnection implements Pipeline<PublishStreamResponse> {

private static final Logger logger = LogManager.getLogger(BlockNodeConnection.class);

/**
* Counter used to get unique identities for connection instances.
*/
private static final AtomicLong connectionIdCounter = new AtomicLong(0);
/**
* A longer retry delay for when the connection encounters an error.
*/
Expand Down Expand Up @@ -80,7 +87,7 @@ public class BlockNodeConnection implements Pipeline<PublishStreamResponse> {
*/
private final BlockStreamPublishServiceClient blockStreamPublishServiceClient;

private Pipeline<? super PublishStreamRequest> requestPipeline;
private final AtomicReference<Pipeline<? super PublishStreamRequest>> requestPipelineRef = new AtomicReference<>();
/**
* Reference to the current state of this connection.
*/
Expand All @@ -95,6 +102,10 @@ public class BlockNodeConnection implements Pipeline<PublishStreamResponse> {
* When the connection is closed or reset, this task is cancelled.
*/
private ScheduledFuture<?> streamResetTask;
/**
* The unique ID of this connection instance.
*/
private final String connectionId;

/**
* Represents the possible states of a Block Node connection.
Expand All @@ -103,20 +114,37 @@ public enum ConnectionState {
/**
* bidi RequestObserver needs to be created.
*/
UNINITIALIZED,
UNINITIALIZED(false),
/**
* bidi RequestObserver is established but this connection has not been chosen as the active one (priority based).
*/
PENDING,
PENDING(false),
/**
* Connection is active. Block Stream Worker Thread is sending PublishStreamRequest's to the block node through async bidi stream.
*/
ACTIVE,
ACTIVE(false),
/**
* The connection is being closed. Once in this state, only cleanup operations should be permitted.
*/
CLOSING(true),
/**
* Connection has been closed and pipeline terminated. This is a terminal state.
* No more requests can be sent and no more responses will be received.
*/
CLOSED
CLOSED(true);

private final boolean isTerminal;

ConnectionState(final boolean isTerminal) {
this.isTerminal = isTerminal;
}

/**
* @return true if the state represents a terminal or end-state for the connection lifecycle, else false
*/
boolean isTerminal() {
return isTerminal;
}
}

/**
Expand Down Expand Up @@ -150,14 +178,17 @@ public BlockNodeConnection(
final var blockNodeConnectionConfig =
configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class);
this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod();
connectionId = String.format("%04d", connectionIdCounter.incrementAndGet());
}

/**
* Creates a new bidi request pipeline for this block node connection.
*/
public void createRequestPipeline() {
if (requestPipeline == null) {
requestPipeline = blockStreamPublishServiceClient.publishBlockStream(this);
public synchronized void createRequestPipeline() {
if (requestPipelineRef.get() == null) {
final Pipeline<? super PublishStreamRequest> pipeline =
blockStreamPublishServiceClient.publishBlockStream(this);
requestPipelineRef.set(pipeline);
updateConnectionState(ConnectionState.PENDING);
blockStreamMetrics.recordConnectionOpened();
}
Expand All @@ -166,17 +197,46 @@ public void createRequestPipeline() {
/**
* Updates the connection's state.
* @param newState the new state to transition to
* @return true if the state was successfully updated to the new state, else false
*/
public boolean updateConnectionState(@NonNull final ConnectionState newState) {
return updateConnectionState(null, newState);
}

/**
* Updates this connection's state if the current state matches the expected states (if specified).
*
* @param expectedCurrentState the expected current connection state (optional)
* @param newState the new state to transition to
* @return true if the state was successfully updated to the new state, else false
*/
public void updateConnectionState(@NonNull final ConnectionState newState) {
private boolean updateConnectionState(
@Nullable final ConnectionState expectedCurrentState, @NonNull final ConnectionState newState) {
requireNonNull(newState, "newState must not be null");
final ConnectionState oldState = connectionState.getAndSet(newState);
logger.debug("[{}] Connection state transitioned from {} to {}", this, oldState, newState);

if (expectedCurrentState != null) {
if (connectionState.compareAndSet(expectedCurrentState, newState)) {
logger.debug("[{}] Connection state transitioned from {} to {}", this, expectedCurrentState, newState);
} else {
logger.warn(
"[{}] Failed to transition state from {} to {} because current state does not match expected state",
this,
expectedCurrentState,
newState);
return false;
}
} else {
final ConnectionState oldState = connectionState.getAndSet(newState);
logger.debug("[{}] Connection state transitioned from {} to {}", this, oldState, newState);
}

if (newState == ConnectionState.ACTIVE) {
scheduleStreamReset();
} else {
cancelStreamReset();
}

return true;
}

/**
Expand Down Expand Up @@ -256,7 +316,6 @@ private void closeAndRestart(final long blockNumber, final boolean callOnComplet
* notifying the connection manager and calling onComplete on the request pipeline.
*/
public void handleStreamFailure() {
logger.debug("[{}] handleStreamFailure", this);
closeAndReschedule(LONGER_RETRY_DELAY, true);
}

Expand All @@ -265,7 +324,6 @@ public void handleStreamFailure() {
* notifying the connection manager without calling onComplete on the request pipeline.
*/
public void handleStreamFailureWithoutOnComplete() {
logger.debug("[{}] handleStreamFailureWithoutOnComplete", this);
closeAndReschedule(LONGER_RETRY_DELAY, false);
}

Expand All @@ -286,7 +344,7 @@ private void handleAcknowledgement(@NonNull final BlockAcknowledgement acknowled
* Acknowledges the blocks up to the specified block number.
* @param acknowledgedBlockNumber the block number that has been known to be persisted and verified by the block node
*/
private void acknowledgeBlocks(long acknowledgedBlockNumber, boolean maybeJumpToBlock) {
private void acknowledgeBlocks(final long acknowledgedBlockNumber, final boolean maybeJumpToBlock) {
final long currentBlockStreaming = blockNodeConnectionManager.currentStreamingBlockNumber();
final long currentBlockProducing = blockBufferService.getLastBlockNumberProduced();

Expand Down Expand Up @@ -461,7 +519,7 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) {
*
* @param code the code on why stream was ended
*/
public void endTheStreamWith(PublishStreamRequest.EndStream.Code code) {
public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) {
final var earliestBlockNumber = blockBufferService.getEarliestAvailableBlockNumber();
final var highestAckedBlockNumber = blockBufferService.getHighestAckedBlockNumber();

Expand All @@ -485,9 +543,27 @@ public void endTheStreamWith(PublishStreamRequest.EndStream.Code code) {
public void sendRequest(@NonNull final PublishStreamRequest request) {
requireNonNull(request, "request must not be null");

if (getConnectionState() == ConnectionState.ACTIVE && requestPipeline != null) {
final Pipeline<? super PublishStreamRequest> pipeline = requestPipelineRef.get();
if (getConnectionState() == ConnectionState.ACTIVE && pipeline != null) {
try {
requestPipeline.onNext(request);
if (logger.isDebugEnabled()) {
logger.debug(
"[{}] Sending request to block node (type={})",
this,
request.request().kind());
} else if (logger.isTraceEnabled()) {
/*
PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur
a performance penality. Therefore, we only want to log the byte size at trace level.
*/
logger.trace(
"[{}] Sending request to block node (type={}, bytes={})",
this,
request.request().kind(),
request.protobufSize());
}
pipeline.onNext(request);

if (request.hasEndStream()) {
blockStreamMetrics.recordRequestEndStreamSent(
request.endStream().endCode());
Expand All @@ -497,8 +573,17 @@ public void sendRequest(@NonNull final PublishStreamRequest request) {
request.blockItems().blockItems().size());
}
} catch (final RuntimeException e) {
blockStreamMetrics.recordRequestSendFailure();
throw e;
/*
There is a possible, and somewhat expected, race condition when one thread is attempting to close this
connection while a request is being sent on another thread. Because of this, an exception may get thrown
but depending on the state of the connection it may be expected. Thus, if we do get an exception we only
want to propagate it if the connection is still in an ACTIVE state. If we receive an error while the
connection is in another state (e.g. CLOSING) then we want to ignore the error.
*/
if (getConnectionState() == ConnectionState.ACTIVE) {
blockStreamMetrics.recordRequestSendFailure();
throw e;
}
}
}
}
Expand All @@ -509,42 +594,51 @@ public void sendRequest(@NonNull final PublishStreamRequest request) {
* @param callOnComplete whether to call onComplete on the request pipeline
*/
public void close(final boolean callOnComplete) {
if (getConnectionState() == ConnectionState.CLOSED) {
logger.debug("[{}] Connection already closed.", this);
final ConnectionState connState = getConnectionState();
if (connState.isTerminal()) {
logger.debug("[{}] Connection already in terminal state ({})", this, connState);
return;
}

if (!updateConnectionState(connState, ConnectionState.CLOSING)) {
logger.debug("[{}] State changed while trying to close connection; aborting close attempt", this);
return;
}

logger.debug("[{}] Closing connection...", this);

try {
closePipeline(callOnComplete);
updateConnectionState(ConnectionState.CLOSED);
jumpToBlock(-1L);
blockStreamMetrics.recordConnectionClosed();
logger.debug("[{}] Connection successfully closed", this);
} catch (final RuntimeException e) {
logger.warn("[{}] Error occurred while attempting to close connection", this);
} finally {
// regardless of outcome, mark the connection as closed
updateConnectionState(ConnectionState.CLOSED);
}
}

private void closePipeline(final boolean callOnComplete) {
if (requestPipeline != null) {
final Pipeline<? super PublishStreamRequest> pipeline = requestPipelineRef.get();

if (pipeline != null) {
logger.debug("[{}] Closing request pipeline for block node", this);
streamShutdownInProgress.set(true);

try {
if (getConnectionState() == ConnectionState.ACTIVE && callOnComplete) {
requestPipeline.onComplete();
final ConnectionState state = getConnectionState();
if (state == ConnectionState.CLOSING && callOnComplete) {
pipeline.onComplete();
logger.debug("[{}] Request pipeline successfully closed", this);
} else {
logger.debug("[{}] Request pipeline closed without onComplete - connection not active", this);
}
} catch (final Exception e) {
logger.warn("[{}] Error while completing request pipeline", this, e);
}
// Clear the pipeline reference to prevent further use
// pipelineLock ensures no thread is mid-call to requestPipeline.onNext()
requestPipeline = null;
logger.debug("[{}] Request pipeline removed", this);
requestPipelineRef.compareAndSet(pipeline, null);
}
}

Expand Down Expand Up @@ -572,7 +666,7 @@ private void jumpToBlock(final long blockNumber) {
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
public void onSubscribe(final Flow.Subscription subscription) {
logger.debug("[{}] onSubscribe", this);
subscription.request(Long.MAX_VALUE);
}
Expand Down Expand Up @@ -626,14 +720,16 @@ public void onNext(final @NonNull PublishStreamResponse response) {
*/
@Override
public void onError(final Throwable error) {
logger.debug("[{}] onError invoked {}", this, error.getMessage());
blockStreamMetrics.recordConnectionOnError();
// Suppress errors that happen when the connection is in a terminal state
if (!getConnectionState().isTerminal()) {
blockStreamMetrics.recordConnectionOnError();

if (error instanceof final GrpcException grpcException) {
logger.debug("[{}] Error received (grpcStatus={})", this, grpcException.status(), grpcException);
} else {
logger.debug("[{}] Error received", this, error);
}

// Check if already in terminal state
if (getConnectionState() == ConnectionState.CLOSED) {
logger.debug("[{}] onError invoked but connection is already closed", this);
} else if (getConnectionState() == ConnectionState.ACTIVE || getConnectionState() == ConnectionState.PENDING) {
logger.warn("[{}] onError being handled", this, error);
handleStreamFailure();
}
}
Expand Down Expand Up @@ -670,7 +766,8 @@ public ConnectionState getConnectionState() {

@Override
public String toString() {
return blockNodeConfig.address() + ":" + blockNodeConfig.port() + "/" + getConnectionState();
return connectionId + "/" + blockNodeConfig.address() + ":" + blockNodeConfig.port() + "/"
+ getConnectionState();
}

@Override
Expand All @@ -679,11 +776,11 @@ public boolean equals(final Object o) {
return false;
}
final BlockNodeConnection that = (BlockNodeConnection) o;
return Objects.equals(blockNodeConfig, that.blockNodeConfig);
return connectionId == that.connectionId && Objects.equals(blockNodeConfig, that.blockNodeConfig);
}

@Override
public int hashCode() {
return Objects.hash(blockNodeConfig);
return Objects.hash(blockNodeConfig, connectionId);
}
}
Loading
Loading