From 980d78d26f0a9ea77d24915cca444d20b9fd012e Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Mon, 14 Jul 2025 08:32:23 +0200 Subject: [PATCH 1/5] KAFKA-19498: Add include argument to ConsumerPerformance tool This patch adds the include argument to ConsumerPerformance tool. ConsoleConsumer and ConsumerPerformance serve different purposes but share common functionality for message consumption. Currently, there's an inconsistency in their command-line interfaces: - ConsoleConsumer supports an --include argument that allows users to specify a regular expression pattern to filter topics for consumption - ConsumerPerformance lacks this topic filtering capability, requiring users to specify a single topic explicitly via --topic argument This inconsistency creates two problems: - Similar tools should provide similar topic selection capabilities for better user experience - Users cannot test consumer performance across multiple topics or dynamically matching topic sets, making it difficult to test realistic scenarios Signed-off-by: Federico Valeri --- .../kafka/server/util/CommandLineUtils.java | 27 +++++ .../server/util/CommandLineUtilsTest.java | 105 ++++++++++++++++++ .../kafka/tools/ConsumerPerformance.java | 35 ++++-- .../consumer/ConsoleConsumerOptions.java | 8 +- .../kafka/tools/ConsumerPerformanceTest.java | 43 ++++++- 5 files changed, 203 insertions(+), 15 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java index 6146daeb45cad..40bfbf1bda015 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java @@ -23,10 +23,13 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import joptsimple.OptionParser; import joptsimple.OptionSet; @@ -135,6 +138,30 @@ public static void printErrorAndExit(String message) { Exit.exit(1, message); } + /** + * Check that exactly one of a set of mutually exclusive arguments is present. + */ + public static void checkOneOfArgs(OptionParser parser, OptionSet options, OptionSpec... optionSpecs) { + if (optionSpecs == null || optionSpecs.length == 0) { + throw new IllegalArgumentException("At least one option must be provided"); + } + + int presentCount = 0; + for (OptionSpec spec : optionSpecs) { + if (spec != null && options.has(spec)) { + presentCount++; + } + } + + if (presentCount != 1) { + printUsageAndExit(parser, "Exactly one of the following arguments is required: " + + Arrays.stream(optionSpecs) + .filter(Objects::nonNull) + .map(Object::toString) + .collect(Collectors.joining(", "))); + } + } + public static void printUsageAndExit(OptionParser parser, String message) { System.err.println(message); try { diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java index 8fdc6c89d06b8..5c8d03f3479de 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.util; +import org.apache.kafka.common.utils.Exit; import org.junit.jupiter.api.Test; import java.util.List; @@ -26,9 +27,12 @@ import joptsimple.OptionSet; import joptsimple.OptionSpec; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class CommandLineUtilsTest { @Test @@ -266,4 +270,105 @@ public void testInitializeBootstrapPropertiesWithBothBootstraps() { () -> CommandLineUtils.initializeBootstrapProperties(createTestProps(), Optional.of("127.0.0.2:9094"), Optional.of("127.0.0.3:9095"))).getMessage()); } + + private OptionSpec createMockOptionSpec(String name) { + OptionSpec spec = mock(OptionSpec.class); + when(spec.toString()).thenReturn("[" + name.replaceAll("--", "") + "]"); + return spec; + } + + @Test + void testCheckOneOfArgsNoOptions() { + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> + CommandLineUtils.checkOneOfArgs(parser, options) + ); + + assertEquals("At least one option must be provided", e.getMessage()); + } + + @Test + void testCheckOneOfArgsOnePresent() { + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + OptionSpec opt1 = createMockOptionSpec("--first-option"); + OptionSpec opt2 = createMockOptionSpec("--second-option"); + OptionSpec opt3 = createMockOptionSpec("--third-option"); + + when(options.has(opt1)).thenReturn(true); + when(options.has(opt2)).thenReturn(false); + when(options.has(opt3)).thenReturn(false); + + assertDoesNotThrow(() -> + CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3) + ); + + when(options.has(opt1)).thenReturn(false); + when(options.has(opt2)).thenReturn(true); + + assertDoesNotThrow(() -> + CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3) + ); + + when(options.has(opt2)).thenReturn(false); + when(options.has(opt3)).thenReturn(true); + + assertDoesNotThrow(() -> + CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3) + ); + } + + @Test + void testCheckOneOfArgsNonePresent() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + OptionSpec opt1 = createMockOptionSpec("--first-option"); + OptionSpec opt2 = createMockOptionSpec("--second-option"); + OptionSpec opt3 = createMockOptionSpec("--third-option"); + + when(options.has(opt1)).thenReturn(false); + when(options.has(opt2)).thenReturn(false); + when(options.has(opt3)).thenReturn(false); + + try { + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, + () -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)); + assertEquals("Exactly one of the following arguments is required: " + + "[first-option], [second-option], [third-option]", e.getMessage()); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + void testCheckOneOfArgsMultiplePresent() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + OptionSpec opt1 = createMockOptionSpec("--first-option"); + OptionSpec opt2 = createMockOptionSpec("--second-option"); + OptionSpec opt3 = createMockOptionSpec("--third-option"); + + when(options.has(opt1)).thenReturn(true); + when(options.has(opt2)).thenReturn(true); + when(options.has(opt3)).thenReturn(false); + + try { + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, + () -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)); + assertEquals("Exactly one of the following arguments is required: " + + "[first-option], [second-option], [third-option]", e.getMessage()); + } finally { + Exit.resetExitProcedure(); + } + } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 60b4b37abe4b6..0892693801ad3 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -37,11 +37,13 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Random; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import joptsimple.OptionException; import joptsimple.OptionSpec; @@ -134,8 +136,13 @@ private static void consume(KafkaConsumer consumer, long reportingIntervalMs = options.reportingIntervalMs(); boolean showDetailedStats = options.showDetailedStats(); SimpleDateFormat dateFormat = options.dateFormat(); - consumer.subscribe(options.topic(), - new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound)); + + ConsumerPerfRebListener listener = new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound); + if (options.topic().isPresent()) { + consumer.subscribe(options.topic().get(), listener); + } else { + consumer.subscribe(options.include().get(), listener); + } // now start the benchmark long currentTimeMs = System.currentTimeMillis(); @@ -246,6 +253,7 @@ public void onPartitionsAssigned(Collection partitions) { protected static class ConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec bootstrapServerOpt; private final OptionSpec topicOpt; + private final OptionSpec includeOpt; private final OptionSpec groupIdOpt; private final OptionSpec fetchSizeOpt; private final OptionSpec resetBeginningOffsetOpt; @@ -265,10 +273,14 @@ public ConsumerPerfOptions(String[] args) { .withRequiredArg() .describedAs("server to connect to") .ofType(String.class); - topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + topicOpt = parser.accepts("topic", "The topic to consume from.") .withRequiredArg() .describedAs("topic") .ofType(String.class); + includeOpt = parser.accepts("include", "Regular expression specifying list of topics to include for consumption.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); groupIdOpt = parser.accepts("group", "The group id to consume on.") .withRequiredArg() .describedAs("gid") @@ -323,7 +335,8 @@ public ConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt); + CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); } } @@ -353,8 +366,16 @@ public Properties props() throws IOException { return props; } - public Set topic() { - return Set.of(options.valueOf(topicOpt)); + public Optional> topic() { + return options.has(topicOpt) + ? Optional.of(List.of(options.valueOf(topicOpt))) + : Optional.empty(); + } + + public Optional include() { + return options.has(includeOpt) + ? Optional.of(Pattern.compile(options.valueOf(includeOpt))) + : Optional.empty(); } public long numMessages() { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java index cf4daa0c63626..60ee8f61ffacd 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java @@ -25,10 +25,8 @@ import org.apache.kafka.server.util.CommandLineUtils; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -185,12 +183,8 @@ public ConsoleConsumerOptions(String[] args) throws IOException { } private void checkRequiredArgs() { - List> topicOrFilterArgs = new ArrayList<>(List.of(topicArg(), includedTopicsArg())); - topicOrFilterArgs.removeIf(Optional::isEmpty); // user need to specify value for either --topic or --include options - if (topicOrFilterArgs.size() != 1) { - CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. "); - } + CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); if (partitionArg().isPresent()) { if (!options.has(topicOpt)) { diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index 270fab2cf805f..d78b65e54a31d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -75,7 +75,7 @@ public void testConfigBootStrapServer() { ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); assertEquals("localhost:9092", config.brokerHostsAndPorts()); - assertTrue(config.topic().contains("test")); + assertTrue(config.topic().get().contains("test")); assertEquals(10, config.numMessages()); } @@ -93,6 +93,47 @@ public void testConfigWithUnrecognizedOption() { assertTrue(err.contains("new-consumer is not a recognized option")); } + @Test + public void testConfigWithInclude() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--include", "test.*", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("localhost:9092", config.brokerHostsAndPorts()); + assertTrue(config.include().get().toString().contains("test.*")); + assertEquals(10, config.numMessages()); + } + + @Test + public void testConfigWithTopicAndInclude() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--include", "test.*", + "--messages", "10" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); + + assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]")); + } + + @Test + public void testConfigWithoutTopicAndInclude() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--messages", "10" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); + + assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]")); + } + @Test public void testClientIdOverride() throws IOException { File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile(); From 5e4389321488aca961da8f17313df88d0bacee76 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Tue, 22 Jul 2025 17:30:09 +0200 Subject: [PATCH 2/5] Fix build error Signed-off-by: Federico Valeri --- .../java/org/apache/kafka/server/util/CommandLineUtilsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java index 5c8d03f3479de..a634b21403ee5 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.server.util; import org.apache.kafka.common.utils.Exit; + import org.junit.jupiter.api.Test; import java.util.List; From e80dfe4eede58a9a9079b928e656a2d3e4be0072 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Wed, 20 Aug 2025 16:10:13 +0200 Subject: [PATCH 3/5] Remove unnecessary null check Signed-off-by: Federico Valeri --- .../java/org/apache/kafka/server/util/CommandLineUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java index 40bfbf1bda015..91f1dc4825a8b 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java @@ -148,7 +148,7 @@ public static void checkOneOfArgs(OptionParser parser, OptionSet options, Option int presentCount = 0; for (OptionSpec spec : optionSpecs) { - if (spec != null && options.has(spec)) { + if (options.has(spec)) { presentCount++; } } From e0d66b164e43d42528d3bce6f072b8b460a24b47 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Fri, 22 Aug 2025 08:50:44 +0200 Subject: [PATCH 4/5] Remove unnecessary filter Signed-off-by: Federico Valeri --- .../java/org/apache/kafka/server/util/CommandLineUtils.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java index 91f1dc4825a8b..c5b973f78e76c 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -156,7 +155,6 @@ public static void checkOneOfArgs(OptionParser parser, OptionSet options, Option if (presentCount != 1) { printUsageAndExit(parser, "Exactly one of the following arguments is required: " + Arrays.stream(optionSpecs) - .filter(Objects::nonNull) .map(Object::toString) .collect(Collectors.joining(", "))); } From d0161ef962ba921ea7d9b0b74a9f9ba01cca7261 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Sun, 24 Aug 2025 08:46:29 +0200 Subject: [PATCH 5/5] Add upgrade documentation Signed-off-by: Federico Valeri --- docs/upgrade.html | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/upgrade.html b/docs/upgrade.html index f0f3f5407388d..1dbb7e2d2ee2a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -40,6 +40,10 @@
Notable changes in 4
  • The PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG in ProducerConfig was deprecated and will be removed in Kafka 5.0. Please use the PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG instead.
  • +
  • + The ConsumerPerformance command line tool has a new include option that is alternative to the topic option. + This new option allows to pass a regular expression specifying a list of topics to include for consumption, which is useful to test consumer performance across multiple topics or dynamically matching topic sets. +
  • Upgrading to 4.1.0