File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed
raft/src/main/java/org/apache/kafka/raft Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -192,7 +192,8 @@ public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
192
192
Set <ReplicaKey > replicaKeys = new HashSet <>();
193
193
beginQuorumEpochTimer .update (currentTimeMs );
194
194
for (ReplicaState state : voterStates .values ()) {
195
- if (beginQuorumEpochTimer .currentTimeMs () - state .lastFetchTimestamp >= beginQuorumEpochTimeoutMs ) {
195
+ if (beginQuorumEpochTimer .currentTimeMs () - state .lastFetchTimestamp >= beginQuorumEpochTimeoutMs
196
+ || !state .hasAcknowledgedLeader ) {
196
197
replicaKeys .add (state .replicaKey ());
197
198
}
198
199
}
@@ -963,6 +964,10 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {
963
964
kafkaRaftMetrics .updateNumObservers (observerStates .size ());
964
965
}
965
966
967
+ private static class VoterState {
968
+
969
+ }
970
+
966
971
public static class ReplicaState implements Comparable <ReplicaState > {
967
972
private ReplicaKey replicaKey ;
968
973
private Endpoints listeners ;
You can’t perform that action at this time.
0 commit comments