Skip to content

Conversation

TaiJuWu
Copy link
Collaborator

@TaiJuWu TaiJuWu commented Aug 7, 2025

Instead of sending out BeginQuorum requests to every voter on a cadence,
we can save on some requests by only sending to those which have not
fetched within the BeginQuorumEpoch timeout.

@github-actions github-actions bot added triage PRs from the community kraft small Small PRs labels Aug 7, 2025
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TaiJuWu thanks for your patch

beginQuorumEpochTimer.update(currentTimeMs);
for (Map.Entry<ReplicaKey, Long> entry : lastFetchRequestMs.entrySet()) {
if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) {
replicaKeys.add(ReplicaKey.of(entry.getKey().id(), entry.getKey().directoryId().orElse(NO_DIRECTORY_ID)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about replicaKeys.add(entry.getKey());?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

.voterKeys()
.stream()
.filter(key -> key.id() != quorum.localIdOrThrow())
.filter(key -> !needToSendBeginQuorumRequestNode.contains(key.id()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we only checking id? The unique key is actually composed of both id and directory id, right?

Copy link
Collaborator Author

@TaiJuWu TaiJuWu Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code and add a new method equivalentTo to check two replicaKey because the directions can be empty before KIP-853.``

After 4.0 we can assume all node have directoryID, so I rewrite test and use replicaKey.

@github-actions github-actions bot removed needs-attention triage PRs from the community labels Aug 25, 2025
Copy link
Member

@brandboat brandboat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @TaiJuWu, I left some comment below.

private final Timer beginQuorumEpochTimer;
private final int beginQuorumEpochTimeoutMs;
private final KafkaRaftMetrics kafkaRaftMetrics;
private final Map<ReplicaKey, Long> lastFetchRequestMs = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be replaced with ReplicaState#lastFetchTimestamp, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix, thanks.

Set<ReplicaKey> replicaKeys = new HashSet<>();
beginQuorumEpochTimer.update(currentTimeMs);
for (Map.Entry<ReplicaKey, Long> entry : lastFetchRequestMs.entrySet()) {
if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code above at KafkaRaftClient#L3059,

.filter(key -> !needToSendBeginQuorumRequest.contains(key))

I think the condition should be changed to:

Suggested change
if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) {
if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() < beginQuorumEpochTimeoutMs) {

and we should rename the method to skipSendBeginQuorumRequestNodes.

Copy link
Collaborator Author

@TaiJuWu TaiJuWu Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also use needToSendBeginQuorumRequest and change filter condition because positive logic is easier to understand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants