Skip to content

Commit 58476d9

Browse files
authored
feat: quiescence controller (#21235)
Signed-off-by: Lazar Petrovic <[email protected]>
1 parent 62eb0f1 commit 58476d9

File tree

6 files changed

+649
-0
lines changed

6 files changed

+649
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.node.app.quiescence;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import org.hiero.consensus.model.transaction.Transaction;
6+
7+
public class BadMetadataException extends Exception {
8+
public BadMetadataException(@NonNull final Transaction txn) {
9+
super("Failed to find PreHandleResult in transaction metadata (%s)".formatted(txn.getMetadata()));
10+
}
11+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.node.app.quiescence;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import java.time.Instant;
6+
import org.hiero.base.CompareTo;
7+
import org.hiero.consensus.model.transaction.Transaction;
8+
9+
/**
10+
* Tracks all the information needed for quiescence for a specific block. This class is NOT thread-safe, it is expected
11+
* that all methods will be called from the same thread.
12+
*/
13+
public class QuiescenceBlockTracker {
14+
private final long blockNumber;
15+
private final QuiescenceController controller;
16+
private long relevantTransactionCount = 0;
17+
private Instant maxConsensusTime = Instant.EPOCH;
18+
private boolean blockFinalized = false;
19+
20+
/**
21+
* Constructs a new block tracker.
22+
*
23+
* @param blockNumber the block number
24+
* @param controller the quiescence controller
25+
*/
26+
QuiescenceBlockTracker(final long blockNumber, @NonNull final QuiescenceController controller) {
27+
this.blockNumber = blockNumber;
28+
this.controller = controller;
29+
}
30+
31+
/**
32+
* Notifies the block tracker that a transaction has been included in the block.
33+
*
34+
* @param txn the transaction included in the block
35+
*/
36+
public void blockTransaction(@NonNull final Transaction txn) {
37+
if (controller.isDisabled()) {
38+
// If quiescence is not enabled, ignore these calls
39+
return;
40+
}
41+
if (blockFinalized) {
42+
controller.disableQuiescence("Block already finalized but received more transactions");
43+
return;
44+
}
45+
try {
46+
if (QuiescenceUtils.isRelevantTransaction(txn)) {
47+
relevantTransactionCount++;
48+
}
49+
} catch (final BadMetadataException e) {
50+
controller.disableQuiescence(e);
51+
}
52+
}
53+
54+
/**
55+
* Notifies the block tracker that the consensus time has advanced. This is used to track the maximum consensus time
56+
* of a block. Note that consensus time can advance even when there are no transactions.
57+
*
58+
* @param newConsensusTime the new consensus time
59+
*/
60+
public void consensusTimeAdvanced(@NonNull final Instant newConsensusTime) {
61+
if (controller.isDisabled()) {
62+
// If quiescence is not enabled, ignore these calls
63+
return;
64+
}
65+
if (blockFinalized) {
66+
controller.disableQuiescence("Block already finalized");
67+
}
68+
maxConsensusTime = CompareTo.max(maxConsensusTime, newConsensusTime);
69+
}
70+
71+
/**
72+
* Notifies the block tracker that all transactions have been handled and the block is finalized. After this call,
73+
* no more transactions or consensus time updates should be sent to this block tracker.
74+
*/
75+
public void finishedHandlingTransactions() {
76+
if (controller.isDisabled()) {
77+
// If quiescence is not enabled, ignore these calls
78+
return;
79+
}
80+
blockFinalized = true;
81+
controller.blockFinalized(this);
82+
}
83+
84+
/**
85+
* Gets the block number.
86+
* @return the block number
87+
*/
88+
public long getBlockNumber() {
89+
return blockNumber;
90+
}
91+
92+
/**
93+
* Gets the number of relevant transactions in this block. Relevant transactions are explained in
94+
* {@link QuiescenceUtils#isRelevantTransaction(Transaction)}.
95+
*
96+
* @return the number of relevant transactions
97+
*/
98+
public long getRelevantTransactionCount() {
99+
return relevantTransactionCount;
100+
}
101+
102+
/**
103+
* Gets the maximum consensus time of this block.
104+
*
105+
* @return the maximum consensus time
106+
*/
107+
@NonNull
108+
public Instant getMaxConsensusTime() {
109+
return maxConsensusTime;
110+
}
111+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.node.app.quiescence;
3+
4+
import com.swirlds.config.api.ConfigData;
5+
import com.swirlds.config.api.ConfigProperty;
6+
import java.time.Duration;
7+
8+
/**
9+
* Configuration for quiescence.
10+
*
11+
* @param enabled indicates if quiescence is enabled
12+
* @param tctDuration the amount of time before the target consensus timestamp (TCT) when quiescence should not be
13+
* active
14+
*/
15+
@ConfigData("quiescence")
16+
public record QuiescenceConfig(
17+
@ConfigProperty(defaultValue = "false") boolean enabled,
18+
@ConfigProperty(defaultValue = "5s") Duration tctDuration) {}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.node.app.quiescence;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import edu.umd.cs.findbugs.annotations.Nullable;
6+
import java.time.Instant;
7+
import java.time.InstantSource;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Objects;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
import java.util.concurrent.atomic.AtomicReference;
14+
import java.util.function.LongSupplier;
15+
import org.apache.logging.log4j.LogManager;
16+
import org.apache.logging.log4j.Logger;
17+
import org.hiero.consensus.model.event.Event;
18+
import org.hiero.consensus.model.quiescence.QuiescenceCommand;
19+
import org.hiero.consensus.model.status.PlatformStatus;
20+
import org.hiero.consensus.model.transaction.Transaction;
21+
22+
/**
23+
* Tracks all the information needed to determine if the system is quiescent or not. This class is thread-safe, it is
24+
* expected that all methods may be called concurrently from different threads.
25+
*/
26+
public class QuiescenceController {
27+
private static final Logger logger = LogManager.getLogger(QuiescenceController.class);
28+
29+
private final QuiescenceConfig config;
30+
private final InstantSource time;
31+
private final LongSupplier pendingTransactionCount;
32+
33+
private final AtomicReference<Instant> nextTct;
34+
private final AtomicLong pipelineTransactionCount;
35+
private final Map<Long, QuiescenceBlockTracker> blockTrackers;
36+
37+
/**
38+
* Constructs a new quiescence controller.
39+
*
40+
* @param config the quiescence configuration
41+
* @param time the time source
42+
* @param pendingTransactionCount a supplier that provides the number of transactions submitted to the node but not
43+
* yet included put into an event
44+
*/
45+
public QuiescenceController(
46+
@NonNull final QuiescenceConfig config,
47+
@NonNull final InstantSource time,
48+
@NonNull final LongSupplier pendingTransactionCount) {
49+
this.config = Objects.requireNonNull(config);
50+
this.time = Objects.requireNonNull(time);
51+
this.pendingTransactionCount = Objects.requireNonNull(pendingTransactionCount);
52+
nextTct = new AtomicReference<>();
53+
pipelineTransactionCount = new AtomicLong(0);
54+
blockTrackers = new ConcurrentHashMap<>();
55+
}
56+
57+
/**
58+
* Notifies the controller that a list of transactions have been sent to be pre-handled. There transactions will be
59+
* handled soon or will become stale.
60+
*
61+
* @param transactions the transactions are being pre-handled
62+
*/
63+
public void onPreHandle(@NonNull final List<Transaction> transactions) {
64+
// Should be called at the end of Hedera.onPreHandle() when all transactions have been parsed
65+
if (isDisabled()) {
66+
return;
67+
}
68+
try {
69+
pipelineTransactionCount.addAndGet(QuiescenceUtils.countRelevantTransactions(transactions.iterator()));
70+
} catch (final BadMetadataException e) {
71+
disableQuiescence(e);
72+
}
73+
}
74+
75+
/**
76+
* This method should be called when starting to handle a new block. It returns a block tracker that should be
77+
* updated with transactions and consensus time and then notified when the block is finalized. Although this class
78+
* is thread-safe, the returned block tracker is not thread-safe and should only be used from a single thread.
79+
*
80+
* @param blockNumber the block number being started
81+
* @return the block tracker for the new block
82+
*/
83+
public @NonNull QuiescenceBlockTracker startingBlock(final long blockNumber) {
84+
// This should be called from HandleWorkflow when starting to handle a new block
85+
// This should return an object even if quiescence is disabled, so that the caller does not need to check
86+
// if quiescence is enabled or not. We will later ignore the object if quiescence is disabled.
87+
return new QuiescenceBlockTracker(blockNumber, this);
88+
}
89+
90+
/**
91+
* Called by a block tracker when the block has been finalized.
92+
*
93+
* @param blockTracker the block tracker that has been finalized
94+
*/
95+
void blockFinalized(@NonNull final QuiescenceBlockTracker blockTracker) {
96+
if (isDisabled()) {
97+
return;
98+
}
99+
final QuiescenceBlockTracker prevValue = blockTrackers.put(blockTracker.getBlockNumber(), blockTracker);
100+
if (prevValue != null) {
101+
disableQuiescence("Block %d was already finalized".formatted(blockTracker.getBlockNumber()));
102+
}
103+
}
104+
105+
/**
106+
* Notifies the controller that a block has been fully signed.
107+
*
108+
* @param blockNumber the fully signed block number
109+
*/
110+
public void blockFullySigned(final long blockNumber) {
111+
final QuiescenceBlockTracker blockTracker = blockTrackers.remove(blockNumber);
112+
if (blockTracker == null) {
113+
disableQuiescence("Cannot find block tracker for block %d".formatted(blockNumber));
114+
return;
115+
}
116+
updateTransactionCount(-blockTracker.getRelevantTransactionCount());
117+
nextTct.accumulateAndGet(blockTracker.getMaxConsensusTime(), QuiescenceController::tctUpdate);
118+
}
119+
120+
/**
121+
* Notifies the controller that an event has become stale and will not be handled.
122+
*
123+
* @param event the event that has become stale
124+
*/
125+
public void staleEvent(@NonNull final Event event) {
126+
if (isDisabled()) {
127+
return;
128+
}
129+
try {
130+
pipelineTransactionCount.addAndGet(-QuiescenceUtils.countRelevantTransactions(event.transactionIterator()));
131+
} catch (final BadMetadataException e) {
132+
disableQuiescence(e);
133+
}
134+
}
135+
136+
/**
137+
* Notifies the controller of the next target consensus time.
138+
*
139+
* @param targetConsensusTime the next target consensus time
140+
*/
141+
public void setNextTargetConsensusTime(@Nullable final Instant targetConsensusTime) {
142+
if (isDisabled()) {
143+
return;
144+
}
145+
nextTct.set(targetConsensusTime);
146+
}
147+
148+
/**
149+
* Notifies the controller that the platform status has changed.
150+
*
151+
* @param platformStatus the new platform status
152+
*/
153+
public void platformStatusUpdate(@NonNull final PlatformStatus platformStatus) {
154+
if (isDisabled()) {
155+
return;
156+
}
157+
if (platformStatus == PlatformStatus.RECONNECT_COMPLETE) {
158+
pipelineTransactionCount.set(0);
159+
blockTrackers.clear();
160+
}
161+
}
162+
163+
/**
164+
* Returns the current quiescence command.
165+
*
166+
* @return the current quiescence command
167+
*/
168+
public @NonNull QuiescenceCommand getQuiescenceStatus() {
169+
if (isDisabled()) {
170+
return QuiescenceCommand.DONT_QUIESCE;
171+
}
172+
if (pipelineTransactionCount.get() > 0) {
173+
return QuiescenceCommand.DONT_QUIESCE;
174+
}
175+
final Instant tct = nextTct.get();
176+
if (tct != null && tct.minus(config.tctDuration()).isBefore(time.instant())) {
177+
return QuiescenceCommand.DONT_QUIESCE;
178+
}
179+
if (pendingTransactionCount.getAsLong() > 0) {
180+
return QuiescenceCommand.BREAK_QUIESCENCE;
181+
}
182+
return QuiescenceCommand.QUIESCE;
183+
}
184+
185+
/**
186+
* Disables quiescence, logging the reason.
187+
*
188+
* @param reason the reason quiescence is being disabled
189+
*/
190+
void disableQuiescence(@NonNull final String reason) {
191+
disableQuiescence();
192+
logger.error("Disabling quiescence, reason: {}", reason);
193+
}
194+
195+
/**
196+
* Disables quiescence, logging the exception.
197+
*
198+
* @param exception the exception that caused quiescence to be disabled
199+
*/
200+
void disableQuiescence(@NonNull final Exception exception) {
201+
disableQuiescence();
202+
logger.error("Disabling quiescence due to exception:", exception);
203+
}
204+
205+
/**
206+
* Indicates if quiescence is disabled.
207+
*
208+
* @return true if quiescence is disabled, false otherwise
209+
*/
210+
boolean isDisabled() {
211+
return !config.enabled() || pipelineTransactionCount.get() < 0;
212+
}
213+
214+
private void disableQuiescence() {
215+
// During normal operation the count should never be negative, so we use that to indicate disabled.
216+
// We use Long.MIN_VALUE/2 to avoid any concurrent updates from overflowing and wrapping around to positive.
217+
pipelineTransactionCount.set(Long.MIN_VALUE / 2);
218+
}
219+
220+
private static Instant tctUpdate(@Nullable final Instant currentTct, @NonNull final Instant currentConsensusTime) {
221+
if (currentTct == null) {
222+
return null;
223+
}
224+
// once consensus time passes the TCT, we want to return null to indicate that there is no TCT
225+
return currentConsensusTime.isAfter(currentTct) ? null : currentTct;
226+
}
227+
228+
private void updateTransactionCount(final long delta) {
229+
final long updatedValue = pipelineTransactionCount.addAndGet(delta);
230+
if (updatedValue < 0) {
231+
disableQuiescence("Quiescence transaction count is negative, this indicates a bug");
232+
}
233+
}
234+
}

0 commit comments

Comments
 (0)