From 90ab0f0ca341a2eaf43cc662d086fc19935f36a2 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 5 Aug 2025 00:17:34 +0800 Subject: [PATCH 01/14] rename fun --- .../main/java/org/apache/kafka/raft/KafkaRaftClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 921bd72ecf25e..3db84877c1020 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -2856,7 +2856,7 @@ private long maybeSendRequests( return minBackoffMs; } - private long maybeSendRequest( + private long maybeSendRequests( long currentTimeMs, Set remoteVoters, Function destinationSupplier, @@ -3043,7 +3043,7 @@ private long maybeSendBeginQuorumEpochRequests( ) ); - timeUntilNextBeginQuorumSend = maybeSendRequest( + timeUntilNextBeginQuorumSend = maybeSendRequests( currentTimeMs, voters .voterKeys() @@ -3131,7 +3131,7 @@ private long maybeSendVoteRequests( if (!state.epochElection().isVoteRejected()) { VoterSet voters = partitionState.lastVoterSet(); boolean preVote = quorum.isProspective(); - return maybeSendRequest( + return maybeSendRequests( currentTimeMs, state.epochElection().unrecordedVoters(), voterId -> voters From 7d13fb307de1aa020789d12090e4634828e67634 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 5 Aug 2025 01:28:25 +0800 Subject: [PATCH 02/14] test --- .../apache/kafka/raft/KafkaRaftClient.java | 5 +++++ .../org/apache/kafka/raft/LeaderState.java | 21 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 3db84877c1020..3ecab18eca4aa 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1512,6 +1512,9 @@ private CompletableFuture handleFetchRequest( FetchRequest.replicaId(request), fetchPartition.replicaDirectoryId() ); + if (quorum.isLeader()) { + quorum.leaderStateOrThrow().updateLastReceivedFetchRequest(replicaKey, currentTimeMs); + } FetchResponseData response = tryCompleteFetchRequest( requestMetadata.listenerName(), requestMetadata.apiVersion(), @@ -3043,12 +3046,14 @@ private long maybeSendBeginQuorumEpochRequests( ) ); + Set needToSendBeginQuorumRequest = state.needSendBeginQuorumRequestNodes(currentTimeMs); timeUntilNextBeginQuorumSend = maybeSendRequests( currentTimeMs, voters .voterKeys() .stream() .filter(key -> key.id() != quorum.localIdOrThrow()) + .filter(key -> !needToSendBeginQuorumRequest.contains(key)) .collect(Collectors.toSet()), nodeSupplier, this::buildBeginQuorumEpochRequest diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 7c379275a0e50..3815dc3ff1447 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -88,6 +88,7 @@ public class LeaderState implements EpochState { private final Timer beginQuorumEpochTimer; private final int beginQuorumEpochTimeoutMs; private final KafkaRaftMetrics kafkaRaftMetrics; + private final Map lastFetchRequestMs = new HashMap<>(); // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; @@ -188,6 +189,26 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) { beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs); } + public void updateLastReceivedFetchRequest(ReplicaKey replicaKey, long currentTimeMs) { + beginQuorumEpochTimer.update(currentTimeMs); + lastFetchRequestMs.put(replicaKey, currentTimeMs); + } + + public Set needSendBeginQuorumRequestNodes(long currentTimeMs) { + Set replicaKeys = new HashSet<>(); + beginQuorumEpochTimer.update(currentTimeMs); + for (Map.Entry entry : lastFetchRequestMs.entrySet()) { + if (entry.getValue() - currentTimeMs >= beginQuorumEpochTimeoutMs) { + replicaKeys.add(entry.getKey()); + } + } + return replicaKeys; + } + + public long getLastFetchRequestTimeMs(ReplicaKey replicaKey) { + return lastFetchRequestMs.get(replicaKey); + } + /** * Get the remaining time in milliseconds until the checkQuorumTimer expires. * From 4b9d29d65d90fb44b8bf22bd626755a184cc0ef8 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 5 Aug 2025 01:29:28 +0800 Subject: [PATCH 03/14] fix build --- raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 3ecab18eca4aa..783fe8985f8da 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1476,6 +1476,7 @@ private boolean hasValidClusterId(String requestClusterId) { * - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch * or if either the fetch offset or the last fetched epoch is invalid */ + @SuppressWarnings("CyclomaticComplexity") private CompletableFuture handleFetchRequest( RaftRequest.Inbound requestMetadata, long currentTimeMs From 9a61c33068b307d5cf3a133318d5d82ae551f2ab Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 5 Aug 2025 13:34:39 +0800 Subject: [PATCH 04/14] add test --- .../kafka/raft/KafkaRaftClientTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 4687fd3d90376..09d6d73bd545e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -633,6 +633,38 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); } + @ParameterizedTest + @ValueSource(booleans = { false }) + public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int remoteId1 = localId + 1; + int remoteId2 = localId + 2; + Set voters = Set.of(localId, remoteId1, remoteId2); + ReplicaKey replicaKey = replicaKey(localId + 1, withKip853Rpc); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withKip853Rpc(withKip853Rpc) + .build(); + + context.unattachedToLeader(); + int epoch = context.currentEpoch(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + // begin epoch requests should be sent out every beginQuorumEpochTimeoutMs + context.time.sleep(context.beginQuorumEpochTimeoutMs); + context.client.poll(); + context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); + + context.deliverRequest(context.fetchRequest(epoch, replicaKey, 0, 0, 0)); + + int partialDelay = context.beginQuorumEpochTimeoutMs / 2; + context.time.sleep(partialDelay); + context.client.poll(); + // don't send BeginQuorumEpochRequest again if fetchRequest is sent. + context.assertSentBeginQuorumEpochRequest(epoch, Set.of()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean withKip853Rpc) throws Exception { From beac7b4469fc01371ceb9d5119635ff10f4824c1 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 5 Aug 2025 14:03:20 +0800 Subject: [PATCH 05/14] fix test --- .../java/org/apache/kafka/raft/KafkaRaftClientTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 09d6d73bd545e..031ea55030d92 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -634,7 +634,7 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio } @ParameterizedTest - @ValueSource(booleans = { false }) + @ValueSource(booleans = { true, false }) public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) throws Exception { int localId = randomReplicaId(); int remoteId1 = localId + 1; @@ -657,11 +657,10 @@ public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) context.deliverRequest(context.fetchRequest(epoch, replicaKey, 0, 0, 0)); - int partialDelay = context.beginQuorumEpochTimeoutMs / 2; - context.time.sleep(partialDelay); + context.time.sleep(context.beginQuorumEpochTimeoutMs); context.client.poll(); // don't send BeginQuorumEpochRequest again if fetchRequest is sent. - context.assertSentBeginQuorumEpochRequest(epoch, Set.of()); + context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2)); } From 07773c67f5262a603ee1bbb0a86a397a45affe81 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 5 Aug 2025 16:55:12 +0800 Subject: [PATCH 06/14] fix partital test --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 8 ++------ .../java/org/apache/kafka/raft/KafkaRaftClientTest.java | 2 ++ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 3815dc3ff1447..1fd3ed12a58f7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -191,24 +191,20 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) { public void updateLastReceivedFetchRequest(ReplicaKey replicaKey, long currentTimeMs) { beginQuorumEpochTimer.update(currentTimeMs); - lastFetchRequestMs.put(replicaKey, currentTimeMs); + lastFetchRequestMs.put(replicaKey, beginQuorumEpochTimer.currentTimeMs()); } public Set needSendBeginQuorumRequestNodes(long currentTimeMs) { Set replicaKeys = new HashSet<>(); beginQuorumEpochTimer.update(currentTimeMs); for (Map.Entry entry : lastFetchRequestMs.entrySet()) { - if (entry.getValue() - currentTimeMs >= beginQuorumEpochTimeoutMs) { + if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) { replicaKeys.add(entry.getKey()); } } return replicaKeys; } - public long getLastFetchRequestTimeMs(ReplicaKey replicaKey) { - return lastFetchRequestMs.get(replicaKey); - } - /** * Get the remaining time in milliseconds until the checkQuorumTimer expires. * diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 031ea55030d92..cd96350316a8f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -655,7 +655,9 @@ public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) context.client.poll(); context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); + context.time.sleep(context.beginQuorumEpochTimeoutMs / 3); context.deliverRequest(context.fetchRequest(epoch, replicaKey, 0, 0, 0)); + context.pollUntilResponse(); context.time.sleep(context.beginQuorumEpochTimeoutMs); context.client.poll(); From 742a20a70833484d5e4b49f862e728d8612dda10 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 6 Aug 2025 07:18:32 +0800 Subject: [PATCH 07/14] fix error from kip-853 --- .../main/java/org/apache/kafka/raft/KafkaRaftClient.java | 8 ++++++-- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 6 ++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 783fe8985f8da..fa99d8c286c55 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1514,6 +1514,7 @@ private CompletableFuture handleFetchRequest( fetchPartition.replicaDirectoryId() ); if (quorum.isLeader()) { + System.err.println("ZZZ update " + replicaKey + " time: " + currentTimeMs); quorum.leaderStateOrThrow().updateLastReceivedFetchRequest(replicaKey, currentTimeMs); } FetchResponseData response = tryCompleteFetchRequest( @@ -3047,14 +3048,17 @@ private long maybeSendBeginQuorumEpochRequests( ) ); - Set needToSendBeginQuorumRequest = state.needSendBeginQuorumRequestNodes(currentTimeMs); + Set needToSendBeginQuorumRequest = state.needSendBeginQuorumRequestReplicaKey(currentTimeMs); + Set needToSendBeginQuorumRequestNode = needToSendBeginQuorumRequest.stream(). + map(ReplicaKey::id).collect(Collectors.toSet()); + timeUntilNextBeginQuorumSend = maybeSendRequests( currentTimeMs, voters .voterKeys() .stream() .filter(key -> key.id() != quorum.localIdOrThrow()) - .filter(key -> !needToSendBeginQuorumRequest.contains(key)) + .filter(key -> !needToSendBeginQuorumRequestNode.contains(key.id())) .collect(Collectors.toSet()), nodeSupplier, this::buildBeginQuorumEpochRequest diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 1fd3ed12a58f7..9f505c6ef33bf 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -54,6 +54,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.raft.ReplicaKey.NO_DIRECTORY_ID; + /** * In the context of LeaderState, an acknowledged voter means one who has acknowledged the current leader by either * responding to a `BeginQuorumEpoch` request from the leader or by beginning to send `Fetch` requests. @@ -194,12 +196,12 @@ public void updateLastReceivedFetchRequest(ReplicaKey replicaKey, long currentTi lastFetchRequestMs.put(replicaKey, beginQuorumEpochTimer.currentTimeMs()); } - public Set needSendBeginQuorumRequestNodes(long currentTimeMs) { + public Set needSendBeginQuorumRequestReplicaKey(long currentTimeMs) { Set replicaKeys = new HashSet<>(); beginQuorumEpochTimer.update(currentTimeMs); for (Map.Entry entry : lastFetchRequestMs.entrySet()) { if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) { - replicaKeys.add(entry.getKey()); + replicaKeys.add(ReplicaKey.of(entry.getKey().id(), entry.getKey().directoryId().orElse(NO_DIRECTORY_ID))); } } return replicaKeys; From d758963c6f554d425f27a2b2ab4cda816756c9b9 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 6 Aug 2025 12:06:15 +0800 Subject: [PATCH 08/14] remove print --- raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index fa99d8c286c55..5e8ab9afcf38f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1514,7 +1514,6 @@ private CompletableFuture handleFetchRequest( fetchPartition.replicaDirectoryId() ); if (quorum.isLeader()) { - System.err.println("ZZZ update " + replicaKey + " time: " + currentTimeMs); quorum.leaderStateOrThrow().updateLastReceivedFetchRequest(replicaKey, currentTimeMs); } FetchResponseData response = tryCompleteFetchRequest( From bc0858a94cb79f52985a0e5ee56f938351e34e01 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 26 Aug 2025 13:09:43 +0800 Subject: [PATCH 09/14] Revert "fix error from kip-853" This reverts commit 742a20a70833484d5e4b49f862e728d8612dda10. --- .../main/java/org/apache/kafka/raft/KafkaRaftClient.java | 7 ++----- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 6 ++---- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 5e8ab9afcf38f..783fe8985f8da 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -3047,17 +3047,14 @@ private long maybeSendBeginQuorumEpochRequests( ) ); - Set needToSendBeginQuorumRequest = state.needSendBeginQuorumRequestReplicaKey(currentTimeMs); - Set needToSendBeginQuorumRequestNode = needToSendBeginQuorumRequest.stream(). - map(ReplicaKey::id).collect(Collectors.toSet()); - + Set needToSendBeginQuorumRequest = state.needSendBeginQuorumRequestNodes(currentTimeMs); timeUntilNextBeginQuorumSend = maybeSendRequests( currentTimeMs, voters .voterKeys() .stream() .filter(key -> key.id() != quorum.localIdOrThrow()) - .filter(key -> !needToSendBeginQuorumRequestNode.contains(key.id())) + .filter(key -> !needToSendBeginQuorumRequest.contains(key)) .collect(Collectors.toSet()), nodeSupplier, this::buildBeginQuorumEpochRequest diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 9f505c6ef33bf..1fd3ed12a58f7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -54,8 +54,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.kafka.raft.ReplicaKey.NO_DIRECTORY_ID; - /** * In the context of LeaderState, an acknowledged voter means one who has acknowledged the current leader by either * responding to a `BeginQuorumEpoch` request from the leader or by beginning to send `Fetch` requests. @@ -196,12 +194,12 @@ public void updateLastReceivedFetchRequest(ReplicaKey replicaKey, long currentTi lastFetchRequestMs.put(replicaKey, beginQuorumEpochTimer.currentTimeMs()); } - public Set needSendBeginQuorumRequestReplicaKey(long currentTimeMs) { + public Set needSendBeginQuorumRequestNodes(long currentTimeMs) { Set replicaKeys = new HashSet<>(); beginQuorumEpochTimer.update(currentTimeMs); for (Map.Entry entry : lastFetchRequestMs.entrySet()) { if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) { - replicaKeys.add(ReplicaKey.of(entry.getKey().id(), entry.getKey().directoryId().orElse(NO_DIRECTORY_ID))); + replicaKeys.add(entry.getKey()); } } return replicaKeys; From 26a3f94b88b0a638a23a31b5c69411b628e372c5 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 26 Aug 2025 16:42:41 +0800 Subject: [PATCH 10/14] add EquivalentTo method --- .../apache/kafka/raft/KafkaRaftClient.java | 11 +++++++++- .../org/apache/kafka/raft/ReplicaKey.java | 20 +++++++++++++++++++ .../kafka/raft/KafkaRaftClientTest.java | 6 +++--- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 783fe8985f8da..369a3c97b6950 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1513,9 +1513,11 @@ private CompletableFuture handleFetchRequest( FetchRequest.replicaId(request), fetchPartition.replicaDirectoryId() ); + if (quorum.isLeader()) { quorum.leaderStateOrThrow().updateLastReceivedFetchRequest(replicaKey, currentTimeMs); } + FetchResponseData response = tryCompleteFetchRequest( requestMetadata.listenerName(), requestMetadata.apiVersion(), @@ -3054,7 +3056,14 @@ private long maybeSendBeginQuorumEpochRequests( .voterKeys() .stream() .filter(key -> key.id() != quorum.localIdOrThrow()) - .filter(key -> !needToSendBeginQuorumRequest.contains(key)) + .filter(key -> { + for (ReplicaKey needToSend : needToSendBeginQuorumRequest) { + if (needToSend.equivalentTo(key)) { + return false; + } + } + return true; + }) .collect(Collectors.toSet()), nodeSupplier, this::buildBeginQuorumEpochRequest diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java b/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java index f25a1d55ba4a7..246cea7fb2ce5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java @@ -52,6 +52,26 @@ public int compareTo(ReplicaKey that) { } } + /** + * Determines whether this {@code ReplicaKey} is considered equivalent to the given one, + * based on the following rules: + *
    + *
  • The {@code id} fields must be equal.
  • + *
  • If both instances have a {@code directoryId} present, those must also be equal.
  • + *
  • If either instance does not have a {@code directoryId}, the comparison + * ignores {@code directoryId} and considers them equivalent.
  • + *
+ */ + public boolean equivalentTo(ReplicaKey that) { + if (id() != that.id()) return false; + + if (directoryId.isPresent() && that.directoryId.isPresent()) { + return directoryId.get().equals(that.directoryId().get()); + } else { + return true; + } + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index cd96350316a8f..17d3867c09efd 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -640,7 +640,7 @@ public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) int remoteId1 = localId + 1; int remoteId2 = localId + 2; Set voters = Set.of(localId, remoteId1, remoteId2); - ReplicaKey replicaKey = replicaKey(localId + 1, withKip853Rpc); + ReplicaKey replicaKey1 = replicaKey(localId + 1, withKip853Rpc); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) .withKip853Rpc(withKip853Rpc) @@ -656,12 +656,12 @@ public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); context.time.sleep(context.beginQuorumEpochTimeoutMs / 3); - context.deliverRequest(context.fetchRequest(epoch, replicaKey, 0, 0, 0)); + context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 0)); context.pollUntilResponse(); context.time.sleep(context.beginQuorumEpochTimeoutMs); context.client.poll(); - // don't send BeginQuorumEpochRequest again if fetchRequest is sent. + // don't send BeginQuorumEpochRequest again for replicaKey1 since fetchRequest is sent. context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2)); } From 43e868d60de29bfa5dc948c606c83860beff81e5 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 26 Aug 2025 18:09:45 +0800 Subject: [PATCH 11/14] assume all node have directoryId --- .../apache/kafka/raft/KafkaRaftClient.java | 9 +------ .../org/apache/kafka/raft/ReplicaKey.java | 20 --------------- .../kafka/raft/KafkaRaftClientTest.java | 25 ++++++++++--------- 3 files changed, 14 insertions(+), 40 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 369a3c97b6950..abf717419d03b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -3056,14 +3056,7 @@ private long maybeSendBeginQuorumEpochRequests( .voterKeys() .stream() .filter(key -> key.id() != quorum.localIdOrThrow()) - .filter(key -> { - for (ReplicaKey needToSend : needToSendBeginQuorumRequest) { - if (needToSend.equivalentTo(key)) { - return false; - } - } - return true; - }) + .filter(key -> !needToSendBeginQuorumRequest.contains(key)) .collect(Collectors.toSet()), nodeSupplier, this::buildBeginQuorumEpochRequest diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java b/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java index 246cea7fb2ce5..f25a1d55ba4a7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicaKey.java @@ -52,26 +52,6 @@ public int compareTo(ReplicaKey that) { } } - /** - * Determines whether this {@code ReplicaKey} is considered equivalent to the given one, - * based on the following rules: - *
    - *
  • The {@code id} fields must be equal.
  • - *
  • If both instances have a {@code directoryId} present, those must also be equal.
  • - *
  • If either instance does not have a {@code directoryId}, the comparison - * ignores {@code directoryId} and considers them equivalent.
  • - *
- */ - public boolean equivalentTo(ReplicaKey that) { - if (id() != that.id()) return false; - - if (directoryId.isPresent() && that.directoryId.isPresent()) { - return directoryId.get().equals(that.directoryId().get()); - } else { - return true; - } - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 17d3867c09efd..b89dd582b859f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; @@ -633,22 +634,22 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); } - @ParameterizedTest - @ValueSource(booleans = { true, false }) - public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - int remoteId1 = localId + 1; - int remoteId2 = localId + 2; - Set voters = Set.of(localId, remoteId1, remoteId2); - ReplicaKey replicaKey1 = replicaKey(localId + 1, withKip853Rpc); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) + @Test + public void testBeginQuorumShouldNotSendAfterFetchRequest() throws Exception { + ReplicaKey localId = replicaKey(randomReplicaId(), true); + int remoteId1 = localId.id() + 1; + int remoteId2 = localId.id() + 2; + ReplicaKey replicaKey1 = replicaKey(remoteId1, true); + ReplicaKey replicaKey2 = replicaKey(remoteId2, true); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId.id(), localId.directoryId().get()) + .withRaftProtocol(KIP_853_PROTOCOL) + .withStartingVoters(VoterSetTest.voterSet(Stream.of(localId, replicaKey1, replicaKey2)), KRaftVersion.KRAFT_VERSION_1) .build(); context.unattachedToLeader(); int epoch = context.currentEpoch(); - assertEquals(OptionalInt.of(localId), context.currentLeader()); + assertEquals(OptionalInt.of(localId.id()), context.currentLeader()); // begin epoch requests should be sent out every beginQuorumEpochTimeoutMs context.time.sleep(context.beginQuorumEpochTimeoutMs); From 49c746c671950c353e3b915f3fc234d9df133df2 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 27 Aug 2025 23:02:38 +0800 Subject: [PATCH 12/14] Use ReplicaState and rewrite test for edge case --- .../org/apache/kafka/raft/KafkaRaftClient.java | 11 +++-------- .../java/org/apache/kafka/raft/LeaderState.java | 14 ++++---------- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 13 +++++++++++-- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 9b4dc4aaea370..aa213471c3aa4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -358,7 +358,6 @@ private void updateLeaderEndOffsetAndTimestamp( long currentTimeMs ) { final LogOffsetMetadata endOffsetMetadata = log.endOffset(); - if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet())) { onUpdateLeaderHighWatermark(state, currentTimeMs); } @@ -1514,10 +1513,6 @@ private CompletableFuture handleFetchRequest( fetchPartition.replicaDirectoryId() ); - if (quorum.isLeader()) { - quorum.leaderStateOrThrow().updateLastReceivedFetchRequest(replicaKey, currentTimeMs); - } - FetchResponseData response = tryCompleteFetchRequest( requestMetadata.listenerName(), requestMetadata.apiVersion(), @@ -3095,15 +3090,15 @@ private long maybeSendBeginQuorumEpochRequests( ) ); - Set needToSendBeginQuorumRequest = state.needSendBeginQuorumRequestNodes(currentTimeMs); + Set needToSendBeginQuorumRequests = state.needToSendBeginQuorumRequests(currentTimeMs); timeUntilNextBeginQuorumSend = maybeSendRequests( currentTimeMs, voters .voterKeys() .stream() .filter(key -> key.id() != quorum.localIdOrThrow()) - .filter(key -> !needToSendBeginQuorumRequest.contains(key)) - .collect(Collectors.toSet()), + .filter(needToSendBeginQuorumRequests::contains) + .collect(Collectors.toUnmodifiableSet()), nodeSupplier, this::buildBeginQuorumEpochRequest ); diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 1fd3ed12a58f7..3b7d3979dfd40 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -88,7 +88,6 @@ public class LeaderState implements EpochState { private final Timer beginQuorumEpochTimer; private final int beginQuorumEpochTimeoutMs; private final KafkaRaftMetrics kafkaRaftMetrics; - private final Map lastFetchRequestMs = new HashMap<>(); // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; @@ -189,17 +188,12 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) { beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs); } - public void updateLastReceivedFetchRequest(ReplicaKey replicaKey, long currentTimeMs) { - beginQuorumEpochTimer.update(currentTimeMs); - lastFetchRequestMs.put(replicaKey, beginQuorumEpochTimer.currentTimeMs()); - } - - public Set needSendBeginQuorumRequestNodes(long currentTimeMs) { + public Set needToSendBeginQuorumRequests(long currentTimeMs) { Set replicaKeys = new HashSet<>(); beginQuorumEpochTimer.update(currentTimeMs); - for (Map.Entry entry : lastFetchRequestMs.entrySet()) { - if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) { - replicaKeys.add(entry.getKey()); + for (ReplicaState state : voterStates.values()) { + if (beginQuorumEpochTimer.currentTimeMs() - state.lastFetchTimestamp >= beginQuorumEpochTimeoutMs) { + replicaKeys.add(state.replicaKey()); } } return replicaKeys; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 15c8a055717ce..916ebd3a80451 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -636,7 +636,8 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio @Test public void testBeginQuorumShouldNotSendAfterFetchRequest() throws Exception { - ReplicaKey localId = replicaKey(randomReplicaId(), true); +// ReplicaKey localId = replicaKey(randomReplicaId(), true); + ReplicaKey localId = replicaKey(255, true); int remoteId1 = localId.id() + 1; int remoteId2 = localId.id() + 2; ReplicaKey replicaKey1 = replicaKey(remoteId1, true); @@ -656,14 +657,22 @@ public void testBeginQuorumShouldNotSendAfterFetchRequest() throws Exception { context.client.poll(); context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); + long partialDelay = context.beginQuorumEpochTimeoutMs / 3; context.time.sleep(context.beginQuorumEpochTimeoutMs / 3); context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 0)); context.pollUntilResponse(); - context.time.sleep(context.beginQuorumEpochTimeoutMs); + context.time.sleep(context.beginQuorumEpochTimeoutMs - partialDelay); context.client.poll(); // don't send BeginQuorumEpochRequest again for replicaKey1 since fetchRequest is sent. context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2)); + + context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 0)); + context.pollUntilResponse(); + context.time.sleep(context.beginQuorumEpochTimeoutMs); + context.client.poll(); + // should send BeginQuorumEpochRequest if sleep time equals beginQuorumEpochTimeoutMs + context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); } From c23bca4ce3f692ee45220e77c6694c7335ac76c2 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 28 Aug 2025 00:33:46 +0800 Subject: [PATCH 13/14] remove comment --- .../test/java/org/apache/kafka/raft/KafkaRaftClientTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 916ebd3a80451..2dd8fb06d3530 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -636,8 +636,7 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio @Test public void testBeginQuorumShouldNotSendAfterFetchRequest() throws Exception { -// ReplicaKey localId = replicaKey(randomReplicaId(), true); - ReplicaKey localId = replicaKey(255, true); + ReplicaKey localId = replicaKey(randomReplicaId(), true); int remoteId1 = localId.id() + 1; int remoteId2 = localId.id() + 2; ReplicaKey replicaKey1 = replicaKey(remoteId1, true); From f67e5bdfb8a2e3b02b74e34a28a7fcec7957f2ae Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 10 Sep 2025 18:36:35 +0800 Subject: [PATCH 14/14] make sure beginQuorumEpochRequst send to non acks node --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 3b7d3979dfd40..b97131c2db817 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -192,7 +192,8 @@ public Set needToSendBeginQuorumRequests(long currentTimeMs) { Set replicaKeys = new HashSet<>(); beginQuorumEpochTimer.update(currentTimeMs); for (ReplicaState state : voterStates.values()) { - if (beginQuorumEpochTimer.currentTimeMs() - state.lastFetchTimestamp >= beginQuorumEpochTimeoutMs) { + if (beginQuorumEpochTimer.currentTimeMs() - state.lastFetchTimestamp >= beginQuorumEpochTimeoutMs + || !state.hasAcknowledgedLeader) { replicaKeys.add(state.replicaKey()); } }