Skip to content

MINOR:Extract public code #20198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 46 additions & 77 deletions raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,49 +23,25 @@

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;

/**
* The textual representation of a KIP-853 voter.
*
* <p>
* Since this is used in command-line tools, format changes to the parsing logic require a KIP,
* and should be backwards compatible.
*/
public final class DynamicVoter {
private final Uuid directoryId;
private final int nodeId;
private final String host;
private final int port;

public record DynamicVoter(Uuid directoryId, int nodeId, String host, int port) {
/**
* Create a DynamicVoter object by parsing an input string.
*
* @param input The input string.
*
* @return The DynamicVoter object.
*
* @throws IllegalArgumentException If parsing fails.
* @param input The input string.
* @return The DynamicVoter object.
* @throws IllegalArgumentException If parsing fails.
*/
public static DynamicVoter parse(String input) {
input = input.trim();
int atIndex = input.indexOf("@");
if (atIndex < 0) {
throw new IllegalArgumentException("No @ found in dynamic voter string.");
}
if (atIndex == 0) {
throw new IllegalArgumentException("Invalid @ at beginning of dynamic voter string.");
}
String idString = input.substring(0, atIndex);
int nodeId;
try {
nodeId = Integer.parseInt(idString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse node id in dynamic voter string.", e);
}
if (nodeId < 0) {
throw new IllegalArgumentException("Invalid negative node id " + nodeId +
" in dynamic voter string.");
}
int nodeId = getNodeId(input, atIndex);
input = input.substring(atIndex + 1);
if (input.isEmpty()) {
throw new IllegalArgumentException("No hostname found after node id.");
Expand All @@ -92,6 +68,18 @@ public static DynamicVoter parse(String input) {
}
input = input.substring(1);
int endColonIndex = input.indexOf(":");
int port = getPort(input, endColonIndex);
String directoryIdString = input.substring(endColonIndex + 1);
Uuid directoryId;
try {
directoryId = Uuid.fromString(directoryIdString);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Failed to parse directory ID in dynamic voter string.", e);
}
return new DynamicVoter(directoryId, nodeId, host, port);
}

private static int getPort(String input, int endColonIndex) {
if (endColonIndex < 0) {
throw new IllegalArgumentException("No colon following port could be found.");
}
Expand All @@ -105,50 +93,39 @@ public static DynamicVoter parse(String input) {
if (port < 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port + " in dynamic voter string.");
}
String directoryIdString = input.substring(endColonIndex + 1);
Uuid directoryId;
return port;
}

private static int getNodeId(String input, int atIndex) {
if (atIndex < 0) {
throw new IllegalArgumentException("No @ found in dynamic voter string.");
}
if (atIndex == 0) {
throw new IllegalArgumentException("Invalid @ at beginning of dynamic voter string.");
}
String idString = input.substring(0, atIndex);
int nodeId;
try {
directoryId = Uuid.fromString(directoryIdString);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Failed to parse directory ID in dynamic voter string.", e);
nodeId = Integer.parseInt(idString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse node id in dynamic voter string.", e);
}
return new DynamicVoter(directoryId, nodeId, host, port);
if (nodeId < 0) {
throw new IllegalArgumentException("Invalid negative node id " + nodeId +
" in dynamic voter string.");
}
return nodeId;
}

/**
* Create a new KIP-853 voter.
*
* @param directoryId The directory ID.
* @param nodeId The voter ID.
* @param host The voter hostname or IP address.
* @param port The voter port.
* @param directoryId The directory ID.
* @param nodeId The voter ID.
* @param host The voter hostname or IP address.
* @param port The voter port.
*/
public DynamicVoter(
Uuid directoryId,
int nodeId,
String host,
int port
) {
this.directoryId = directoryId;
this.nodeId = nodeId;
this.host = host;
this.port = port;
}

public Uuid directoryId() {
return directoryId;
}

public int nodeId() {
return nodeId;
}

public String host() {
return host;
}

public int port() {
return port;
public DynamicVoter {
}

public VoterSet.VoterNode toVoterNode(String controllerListenerName) {
Expand All @@ -166,17 +143,9 @@ public boolean equals(Object o) {
if (o == null || (!(o.getClass().equals(DynamicVoter.class)))) return false;
DynamicVoter other = (DynamicVoter) o;
return directoryId.equals(other.directoryId) &&
nodeId == other.nodeId &&
host.equals(other.host) &&
port == other.port;
}

@Override
public int hashCode() {
return Objects.hash(directoryId,
nodeId,
host,
port);
nodeId == other.nodeId &&
host.equals(other.host) &&
port == other.port;
}

@Override
Expand Down
60 changes: 29 additions & 31 deletions raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@

/**
* A replica is "unattached" when it doesn't know the leader or the leader's endpoint.
*
* <p>
* Typically, a replica doesn't know the leader if the KRaft topic is undergoing an election cycle.
*
* <p>
* It is also possible for a replica to be unattached if it doesn't know the leader's endpoint.
* This typically happens when a replica starts up and the known leader id is not part of the local
* voter set. In that case, during startup the replica transitions to unattached instead of
Expand All @@ -52,14 +52,14 @@ public class UnattachedState implements EpochState {
private final Logger log;

public UnattachedState(
Time time,
int epoch,
OptionalInt leaderId,
Optional<ReplicaKey> votedKey,
Set<Integer> voters,
Optional<LogOffsetMetadata> highWatermark,
long electionTimeoutMs,
LogContext logContext
Time time,
int epoch,
OptionalInt leaderId,
Optional<ReplicaKey> votedKey,
Set<Integer> voters,
Optional<LogOffsetMetadata> highWatermark,
long electionTimeoutMs,
LogContext logContext
) {
this.epoch = epoch;
this.leaderId = leaderId;
Expand All @@ -75,11 +75,8 @@ public UnattachedState(
public ElectionState election() {
if (leaderId.isPresent()) {
return ElectionState.withElectedLeader(epoch, leaderId.getAsInt(), votedKey, voters);
} else if (votedKey.isPresent()) {
return ElectionState.withVotedCandidate(epoch, votedKey.get(), voters);
} else {
return ElectionState.withUnknownLeader(epoch, voters);
}
} else
return votedKey.map(replicaKey -> ElectionState.withVotedCandidate(epoch, replicaKey, voters)).orElseGet(() -> ElectionState.withUnknownLeader(epoch, voters));
}

@Override
Expand Down Expand Up @@ -123,30 +120,31 @@ public Optional<LogOffsetMetadata> highWatermark() {
@Override
public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) {
return unattachedOrProspectiveCanGrantVote(
leaderId,
votedKey,
epoch,
replicaKey,
isLogUpToDate,
isPreVote,
log
leaderId,
votedKey,
epoch,
replicaKey,
isLogUpToDate,
isPreVote,
log
);
}

@Override
public String toString() {
return String.format(
"UnattachedState(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, " +
"electionTimeoutMs=%d, highWatermark=%s)",
epoch,
leaderId,
votedKey,
voters,
electionTimeoutMs,
highWatermark
"UnattachedState(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, " +
"electionTimeoutMs=%d, highWatermark=%s)",
epoch,
leaderId,
votedKey,
voters,
electionTimeoutMs,
highWatermark
);
}

@Override
public void close() {}
public void close() {
}
}
Loading