Skip to content

Commit 18045c6

Browse files
authored
KAFKA-19592: testGenerateAssignmentWithBootstrapServer uses wrong JSON format (#20336)
This PR do following: 1. Use correct json format to test. 2. make `PartitionReassignmentState` and `VerifyAssignmentResult.java` become record Reviewers: TengYao Chi <[email protected]>, Ken Huang <[email protected]>, PoAn Yang <[email protected]>
1 parent 7d2ad18 commit 18045c6

File tree

4 files changed

+57
-86
lines changed

4 files changed

+57
-86
lines changed

tools/src/main/java/org/apache/kafka/tools/reassign/PartitionReassignmentState.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,17 @@
1818
package org.apache.kafka.tools.reassign;
1919

2020
import java.util.List;
21-
import java.util.Objects;
2221

2322
/**
2423
* The state of a partition reassignment. The current replicas and target replicas
2524
* may overlap.
25+
*
26+
* @param currentReplicas The current replicas.
27+
* @param targetReplicas The target replicas.
28+
* @param done True if the reassignment is done.
2629
*/
27-
final class PartitionReassignmentState {
28-
public final List<Integer> currentReplicas;
29-
30-
public final List<Integer> targetReplicas;
31-
32-
public final boolean done;
33-
34-
/**
35-
* @param currentReplicas The current replicas.
36-
* @param targetReplicas The target replicas.
37-
* @param done True if the reassignment is done.
38-
*/
39-
public PartitionReassignmentState(List<Integer> currentReplicas, List<Integer> targetReplicas, boolean done) {
40-
this.currentReplicas = currentReplicas;
41-
this.targetReplicas = targetReplicas;
42-
this.done = done;
43-
}
44-
45-
@Override
46-
public boolean equals(Object o) {
47-
if (this == o) return true;
48-
if (o == null || getClass() != o.getClass()) return false;
49-
PartitionReassignmentState state = (PartitionReassignmentState) o;
50-
return done == state.done && Objects.equals(currentReplicas, state.currentReplicas) && Objects.equals(targetReplicas, state.targetReplicas);
51-
}
52-
53-
@Override
54-
public int hashCode() {
55-
return Objects.hash(currentReplicas, targetReplicas, done);
56-
}
57-
}
30+
record PartitionReassignmentState(
31+
List<Integer> currentReplicas,
32+
List<Integer> targetReplicas,
33+
boolean done
34+
) { }

tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,12 @@ static String partitionReassignmentStatesToString(Map<TopicPartition, PartitionR
279279
bld.add("Status of partition reassignment:");
280280
states.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).forEach(topicPartition -> {
281281
PartitionReassignmentState state = states.get(topicPartition);
282-
if (state.done) {
283-
if (state.currentReplicas.equals(state.targetReplicas)) {
282+
if (state.done()) {
283+
if (state.currentReplicas().equals(state.targetReplicas())) {
284284
bld.add(String.format("Reassignment of partition %s is completed.", topicPartition));
285285
} else {
286-
String currentReplicaStr = state.currentReplicas.stream().map(String::valueOf).collect(Collectors.joining(","));
287-
String targetReplicaStr = state.targetReplicas.stream().map(String::valueOf).collect(Collectors.joining(","));
286+
String currentReplicaStr = state.currentReplicas().stream().map(String::valueOf).collect(Collectors.joining(","));
287+
String targetReplicaStr = state.targetReplicas().stream().map(String::valueOf).collect(Collectors.joining(","));
288288

289289
bld.add("There is no active reassignment of partition " + topicPartition + ", " +
290290
"but replica set is " + currentReplicaStr + " rather than " +

tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java

Lines changed: 10 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,49 +21,21 @@
2121
import org.apache.kafka.common.TopicPartitionReplica;
2222

2323
import java.util.Map;
24-
import java.util.Objects;
2524

2625
/**
2726
* A result returned from verifyAssignment.
27+
* @param partStates A map from partitions to reassignment states.
28+
* @param partsOngoing True if there are any ongoing partition reassignments.
29+
* @param moveStates A map from log directories to movement states.
30+
* @param movesOngoing True if there are any ongoing moves that we know about.
2831
*/
29-
public final class VerifyAssignmentResult {
30-
public final Map<TopicPartition, PartitionReassignmentState> partStates;
31-
public final boolean partsOngoing;
32-
public final Map<TopicPartitionReplica, LogDirMoveState> moveStates;
33-
public final boolean movesOngoing;
34-
32+
public record VerifyAssignmentResult(
33+
Map<TopicPartition, PartitionReassignmentState> partStates,
34+
boolean partsOngoing,
35+
Map<TopicPartitionReplica, LogDirMoveState> moveStates,
36+
boolean movesOngoing
37+
) {
3538
public VerifyAssignmentResult(Map<TopicPartition, PartitionReassignmentState> partStates) {
3639
this(partStates, false, Map.of(), false);
3740
}
38-
39-
/**
40-
* @param partStates A map from partitions to reassignment states.
41-
* @param partsOngoing True if there are any ongoing partition reassignments.
42-
* @param moveStates A map from log directories to movement states.
43-
* @param movesOngoing True if there are any ongoing moves that we know about.
44-
*/
45-
public VerifyAssignmentResult(
46-
Map<TopicPartition, PartitionReassignmentState> partStates,
47-
boolean partsOngoing,
48-
Map<TopicPartitionReplica, LogDirMoveState> moveStates,
49-
boolean movesOngoing
50-
) {
51-
this.partStates = partStates;
52-
this.partsOngoing = partsOngoing;
53-
this.moveStates = moveStates;
54-
this.movesOngoing = movesOngoing;
55-
}
56-
57-
@Override
58-
public boolean equals(Object o) {
59-
if (this == o) return true;
60-
if (o == null || getClass() != o.getClass()) return false;
61-
VerifyAssignmentResult that = (VerifyAssignmentResult) o;
62-
return partsOngoing == that.partsOngoing && movesOngoing == that.movesOngoing && Objects.equals(partStates, that.partStates) && Objects.equals(moveStates, that.moveStates);
63-
}
64-
65-
@Override
66-
public int hashCode() {
67-
return Objects.hash(partStates, partsOngoing, moveStates, movesOngoing);
68-
}
6941
}

tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,35 @@ public void testGenerateAssignmentWithBootstrapServer() throws Exception {
153153
produceMessages(foo0.topic(), foo0.partition(), 100);
154154

155155
try (Admin admin = Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
156-
String assignment = "{\"version\":1,\"partitions\":" +
157-
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
158-
"]}";
159-
generateAssignment(admin, assignment, "1,2,3", false);
156+
String topicsToMoveJson = """
157+
{
158+
"topics": [
159+
{ "topic": "foo" }
160+
],
161+
"version": 1
162+
}
163+
""";
164+
var assignment = generateAssignment(admin, topicsToMoveJson, "1,2,3", false);
165+
Map<TopicPartition, List<Integer>> proposedAssignments = assignment.getKey();
166+
String assignmentJson = String.format("""
167+
{
168+
"version": 1,
169+
"partitions": [
170+
{
171+
"topic": "foo",
172+
"partition": 0,
173+
"replicas": %s,
174+
"log_dirs": ["any", "any", "any"]
175+
}
176+
]
177+
}
178+
""", proposedAssignments.get(foo0));
179+
180+
runExecuteAssignment(false, assignmentJson, -1L, -1L);
181+
160182
Map<TopicPartition, PartitionReassignmentState> finalAssignment = Map.of(foo0,
161-
new PartitionReassignmentState(List.of(0, 1, 2), List.of(3, 1, 2), true));
162-
waitForVerifyAssignment(admin, assignment, false,
183+
new PartitionReassignmentState(proposedAssignments.get(foo0), proposedAssignments.get(foo0), true));
184+
waitForVerifyAssignment(admin, assignmentJson, false,
163185
new VerifyAssignmentResult(finalAssignment));
164186
}
165187
}
@@ -237,15 +259,15 @@ public void testThrottledReassignment() throws Exception {
237259
// Check the reassignment status.
238260
VerifyAssignmentResult result = runVerifyAssignment(admin, assignment, true);
239261

240-
if (!result.partsOngoing) {
262+
if (!result.partsOngoing()) {
241263
return true;
242264
} else {
243265
assertFalse(
244-
result.partStates.values().stream().allMatch(state -> state.done),
266+
result.partStates().values().stream().allMatch(PartitionReassignmentState::done),
245267
"Expected at least one partition reassignment to be ongoing when result = " + result
246268
);
247-
assertEquals(List.of(0, 3, 2), result.partStates.get(new TopicPartition("foo", 0)).targetReplicas);
248-
assertEquals(List.of(3, 2, 1), result.partStates.get(new TopicPartition("baz", 2)).targetReplicas);
269+
assertEquals(List.of(0, 3, 2), result.partStates().get(new TopicPartition("foo", 0)).targetReplicas());
270+
assertEquals(List.of(3, 2, 1), result.partStates().get(new TopicPartition("baz", 2)).targetReplicas());
249271
waitForInterBrokerThrottle(admin, List.of(0, 1, 2, 3), interBrokerThrottle);
250272
return false;
251273
}
@@ -540,7 +562,7 @@ private void executeAndVerifyReassignment() throws InterruptedException {
540562
finalAssignment.put(bar0, new PartitionReassignmentState(List.of(3, 2, 0), List.of(3, 2, 0), true));
541563

542564
VerifyAssignmentResult verifyAssignmentResult = runVerifyAssignment(admin, assignment, false);
543-
assertFalse(verifyAssignmentResult.movesOngoing);
565+
assertFalse(verifyAssignmentResult.movesOngoing());
544566

545567
// Wait for the assignment to complete
546568
waitForVerifyAssignment(admin, assignment, false,
@@ -786,7 +808,7 @@ private void testCancellationAction(boolean useBootstrapServer) throws Interrupt
786808
// This time, the broker throttles were removed.
787809
waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs);
788810
// Verify that there are no ongoing reassignments.
789-
assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing);
811+
assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing());
790812
}
791813
// Verify that the partition is removed from cancelled replicas
792814
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));

0 commit comments

Comments
 (0)