Skip to content

Commit f97b95c

Browse files
authored
KAFKA-19498 Add include argument to ConsumerPerformance tool (#20221)
This patch adds the include argument to ConsumerPerformance tool. ConsoleConsumer and ConsumerPerformance serve different purposes but share common functionality for message consumption. Currently, there's an inconsistency in their command-line interfaces: - ConsoleConsumer supports an --include argument that allows users to specify a regular expression pattern to filter topics for consumption - ConsumerPerformance lacks this topic filtering capability, requiring users to specify a single topic explicitly via --topic argument This inconsistency creates two problems: - Similar tools should provide similar topic selection capabilities for better user experience - Users cannot test consumer performance across multiple topics or dynamically matching topic sets, making it difficult to test realistic scenarios Reviewers: Chia-Ping Tsai <[email protected]>
1 parent ecd5b4c commit f97b95c

File tree

6 files changed

+206
-15
lines changed

6 files changed

+206
-15
lines changed

docs/upgrade.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
4040
<li>
4141
The <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code> in <code>ProducerConfig</code> was deprecated and will be removed in Kafka 5.0. Please use the <code>PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG</code> instead.
4242
</li>
43+
<li>
44+
The <code>ConsumerPerformance</code> command line tool has a new <code>include</code> option that is alternative to the <code>topic</code> option.
45+
This new option allows to pass a regular expression specifying a list of topics to include for consumption, which is useful to test consumer performance across multiple topics or dynamically matching topic sets.
46+
</li>
4347
</ul>
4448

4549
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323

2424
import java.io.IOException;
2525
import java.util.ArrayList;
26+
import java.util.Arrays;
2627
import java.util.List;
2728
import java.util.Optional;
2829
import java.util.Properties;
2930
import java.util.Set;
31+
import java.util.stream.Collectors;
3032

3133
import joptsimple.OptionParser;
3234
import joptsimple.OptionSet;
@@ -135,6 +137,29 @@ public static void printErrorAndExit(String message) {
135137
Exit.exit(1, message);
136138
}
137139

140+
/**
141+
* Check that exactly one of a set of mutually exclusive arguments is present.
142+
*/
143+
public static void checkOneOfArgs(OptionParser parser, OptionSet options, OptionSpec<?>... optionSpecs) {
144+
if (optionSpecs == null || optionSpecs.length == 0) {
145+
throw new IllegalArgumentException("At least one option must be provided");
146+
}
147+
148+
int presentCount = 0;
149+
for (OptionSpec<?> spec : optionSpecs) {
150+
if (options.has(spec)) {
151+
presentCount++;
152+
}
153+
}
154+
155+
if (presentCount != 1) {
156+
printUsageAndExit(parser, "Exactly one of the following arguments is required: " +
157+
Arrays.stream(optionSpecs)
158+
.map(Object::toString)
159+
.collect(Collectors.joining(", ")));
160+
}
161+
}
162+
138163
public static void printUsageAndExit(OptionParser parser, String message) {
139164
System.err.println(message);
140165
try {

server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.kafka.server.util;
1818

19+
import org.apache.kafka.common.utils.Exit;
20+
1921
import org.junit.jupiter.api.Test;
2022

2123
import java.util.List;
@@ -26,9 +28,12 @@
2628
import joptsimple.OptionSet;
2729
import joptsimple.OptionSpec;
2830

31+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2932
import static org.junit.jupiter.api.Assertions.assertEquals;
3033
import static org.junit.jupiter.api.Assertions.assertNull;
3134
import static org.junit.jupiter.api.Assertions.assertThrows;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.when;
3237

3338
public class CommandLineUtilsTest {
3439
@Test
@@ -266,4 +271,105 @@ public void testInitializeBootstrapPropertiesWithBothBootstraps() {
266271
() -> CommandLineUtils.initializeBootstrapProperties(createTestProps(),
267272
Optional.of("127.0.0.2:9094"), Optional.of("127.0.0.3:9095"))).getMessage());
268273
}
274+
275+
private OptionSpec<String> createMockOptionSpec(String name) {
276+
OptionSpec<String> spec = mock(OptionSpec.class);
277+
when(spec.toString()).thenReturn("[" + name.replaceAll("--", "") + "]");
278+
return spec;
279+
}
280+
281+
@Test
282+
void testCheckOneOfArgsNoOptions() {
283+
OptionParser parser = mock(OptionParser.class);
284+
OptionSet options = mock(OptionSet.class);
285+
286+
IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () ->
287+
CommandLineUtils.checkOneOfArgs(parser, options)
288+
);
289+
290+
assertEquals("At least one option must be provided", e.getMessage());
291+
}
292+
293+
@Test
294+
void testCheckOneOfArgsOnePresent() {
295+
OptionParser parser = mock(OptionParser.class);
296+
OptionSet options = mock(OptionSet.class);
297+
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
298+
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
299+
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
300+
301+
when(options.has(opt1)).thenReturn(true);
302+
when(options.has(opt2)).thenReturn(false);
303+
when(options.has(opt3)).thenReturn(false);
304+
305+
assertDoesNotThrow(() ->
306+
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
307+
);
308+
309+
when(options.has(opt1)).thenReturn(false);
310+
when(options.has(opt2)).thenReturn(true);
311+
312+
assertDoesNotThrow(() ->
313+
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
314+
);
315+
316+
when(options.has(opt2)).thenReturn(false);
317+
when(options.has(opt3)).thenReturn(true);
318+
319+
assertDoesNotThrow(() ->
320+
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
321+
);
322+
}
323+
324+
@Test
325+
void testCheckOneOfArgsNonePresent() {
326+
Exit.setExitProcedure((code, message) -> {
327+
throw new IllegalArgumentException(message);
328+
});
329+
330+
OptionParser parser = mock(OptionParser.class);
331+
OptionSet options = mock(OptionSet.class);
332+
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
333+
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
334+
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
335+
336+
when(options.has(opt1)).thenReturn(false);
337+
when(options.has(opt2)).thenReturn(false);
338+
when(options.has(opt3)).thenReturn(false);
339+
340+
try {
341+
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
342+
() -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3));
343+
assertEquals("Exactly one of the following arguments is required: " +
344+
"[first-option], [second-option], [third-option]", e.getMessage());
345+
} finally {
346+
Exit.resetExitProcedure();
347+
}
348+
}
349+
350+
@Test
351+
void testCheckOneOfArgsMultiplePresent() {
352+
Exit.setExitProcedure((code, message) -> {
353+
throw new IllegalArgumentException(message);
354+
});
355+
356+
OptionParser parser = mock(OptionParser.class);
357+
OptionSet options = mock(OptionSet.class);
358+
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
359+
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
360+
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
361+
362+
when(options.has(opt1)).thenReturn(true);
363+
when(options.has(opt2)).thenReturn(true);
364+
when(options.has(opt3)).thenReturn(false);
365+
366+
try {
367+
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
368+
() -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3));
369+
assertEquals("Exactly one of the following arguments is required: " +
370+
"[first-option], [second-option], [third-option]", e.getMessage());
371+
} finally {
372+
Exit.resetExitProcedure();
373+
}
374+
}
269375
}

tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@
3737
import java.text.SimpleDateFormat;
3838
import java.time.Duration;
3939
import java.util.Collection;
40+
import java.util.List;
4041
import java.util.Map;
42+
import java.util.Optional;
4143
import java.util.Properties;
4244
import java.util.Random;
43-
import java.util.Set;
4445
import java.util.concurrent.atomic.AtomicLong;
46+
import java.util.regex.Pattern;
4547

4648
import joptsimple.OptionException;
4749
import joptsimple.OptionSpec;
@@ -134,8 +136,13 @@ private static void consume(KafkaConsumer<byte[], byte[]> consumer,
134136
long reportingIntervalMs = options.reportingIntervalMs();
135137
boolean showDetailedStats = options.showDetailedStats();
136138
SimpleDateFormat dateFormat = options.dateFormat();
137-
consumer.subscribe(options.topic(),
138-
new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound));
139+
140+
ConsumerPerfRebListener listener = new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound);
141+
if (options.topic().isPresent()) {
142+
consumer.subscribe(options.topic().get(), listener);
143+
} else {
144+
consumer.subscribe(options.include().get(), listener);
145+
}
139146

140147
// now start the benchmark
141148
long currentTimeMs = System.currentTimeMillis();
@@ -246,6 +253,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
246253
protected static class ConsumerPerfOptions extends CommandDefaultOptions {
247254
private final OptionSpec<String> bootstrapServerOpt;
248255
private final OptionSpec<String> topicOpt;
256+
private final OptionSpec<String> includeOpt;
249257
private final OptionSpec<String> groupIdOpt;
250258
private final OptionSpec<Integer> fetchSizeOpt;
251259
private final OptionSpec<Void> resetBeginningOffsetOpt;
@@ -265,10 +273,14 @@ public ConsumerPerfOptions(String[] args) {
265273
.withRequiredArg()
266274
.describedAs("server to connect to")
267275
.ofType(String.class);
268-
topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
276+
topicOpt = parser.accepts("topic", "The topic to consume from.")
269277
.withRequiredArg()
270278
.describedAs("topic")
271279
.ofType(String.class);
280+
includeOpt = parser.accepts("include", "Regular expression specifying list of topics to include for consumption.")
281+
.withRequiredArg()
282+
.describedAs("Java regex (String)")
283+
.ofType(String.class);
272284
groupIdOpt = parser.accepts("group", "The group id to consume on.")
273285
.withRequiredArg()
274286
.describedAs("gid")
@@ -323,7 +335,8 @@ public ConsumerPerfOptions(String[] args) {
323335
}
324336
if (options != null) {
325337
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
326-
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt);
338+
CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt);
339+
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
327340
}
328341
}
329342

@@ -353,8 +366,16 @@ public Properties props() throws IOException {
353366
return props;
354367
}
355368

356-
public Set<String> topic() {
357-
return Set.of(options.valueOf(topicOpt));
369+
public Optional<Collection<String>> topic() {
370+
return options.has(topicOpt)
371+
? Optional.of(List.of(options.valueOf(topicOpt)))
372+
: Optional.empty();
373+
}
374+
375+
public Optional<Pattern> include() {
376+
return options.has(includeOpt)
377+
? Optional.of(Pattern.compile(options.valueOf(includeOpt)))
378+
: Optional.empty();
358379
}
359380

360381
public long numMessages() {

tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@
2525
import org.apache.kafka.server.util.CommandLineUtils;
2626

2727
import java.io.IOException;
28-
import java.util.ArrayList;
2928
import java.util.HashMap;
3029
import java.util.HashSet;
31-
import java.util.List;
3230
import java.util.Locale;
3331
import java.util.Map;
3432
import java.util.Optional;
@@ -185,12 +183,8 @@ public ConsoleConsumerOptions(String[] args) throws IOException {
185183
}
186184

187185
private void checkRequiredArgs() {
188-
List<Optional<String>> topicOrFilterArgs = new ArrayList<>(List.of(topicArg(), includedTopicsArg()));
189-
topicOrFilterArgs.removeIf(Optional::isEmpty);
190186
// user need to specify value for either --topic or --include options
191-
if (topicOrFilterArgs.size() != 1) {
192-
CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. ");
193-
}
187+
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
194188

195189
if (partitionArg().isPresent()) {
196190
if (!options.has(topicOpt)) {

tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void testConfigBootStrapServer() {
7575
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
7676

7777
assertEquals("localhost:9092", config.brokerHostsAndPorts());
78-
assertTrue(config.topic().contains("test"));
78+
assertTrue(config.topic().get().contains("test"));
7979
assertEquals(10, config.numMessages());
8080
}
8181

@@ -93,6 +93,47 @@ public void testConfigWithUnrecognizedOption() {
9393
assertTrue(err.contains("new-consumer is not a recognized option"));
9494
}
9595

96+
@Test
97+
public void testConfigWithInclude() {
98+
String[] args = new String[]{
99+
"--bootstrap-server", "localhost:9092",
100+
"--include", "test.*",
101+
"--messages", "10"
102+
};
103+
104+
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
105+
106+
assertEquals("localhost:9092", config.brokerHostsAndPorts());
107+
assertTrue(config.include().get().toString().contains("test.*"));
108+
assertEquals(10, config.numMessages());
109+
}
110+
111+
@Test
112+
public void testConfigWithTopicAndInclude() {
113+
String[] args = new String[]{
114+
"--bootstrap-server", "localhost:9092",
115+
"--topic", "test",
116+
"--include", "test.*",
117+
"--messages", "10"
118+
};
119+
120+
String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));
121+
122+
assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]"));
123+
}
124+
125+
@Test
126+
public void testConfigWithoutTopicAndInclude() {
127+
String[] args = new String[]{
128+
"--bootstrap-server", "localhost:9092",
129+
"--messages", "10"
130+
};
131+
132+
String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));
133+
134+
assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]"));
135+
}
136+
96137
@Test
97138
public void testClientIdOverride() throws IOException {
98139
File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();

0 commit comments

Comments
 (0)