Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
<li>
The <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code> in <code>ProducerConfig</code> was deprecated and will be removed in Kafka 5.0. Please use the <code>PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG</code> instead.
</li>
<li>
The <code>ConsumerPerformance</code> command line tool has a new <code>include</code> option that is alternative to the <code>topic</code> option.
This new option allows to pass a regular expression specifying a list of topics to include for consumption, which is useful to test consumer performance across multiple topics or dynamically matching topic sets.
</li>
</ul>

<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import joptsimple.OptionParser;
import joptsimple.OptionSet;
Expand Down Expand Up @@ -135,6 +137,29 @@ public static void printErrorAndExit(String message) {
Exit.exit(1, message);
}

/**
* Check that exactly one of a set of mutually exclusive arguments is present.
*/
public static void checkOneOfArgs(OptionParser parser, OptionSet options, OptionSpec<?>... optionSpecs) {
if (optionSpecs == null || optionSpecs.length == 0) {
throw new IllegalArgumentException("At least one option must be provided");
}

int presentCount = 0;
for (OptionSpec<?> spec : optionSpecs) {
if (options.has(spec)) {
presentCount++;
}
}

if (presentCount != 1) {
printUsageAndExit(parser, "Exactly one of the following arguments is required: " +
Arrays.stream(optionSpecs)
.map(Object::toString)
.collect(Collectors.joining(", ")));
}
}

public static void printUsageAndExit(OptionParser parser, String message) {
System.err.println(message);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.util;

import org.apache.kafka.common.utils.Exit;

import org.junit.jupiter.api.Test;

import java.util.List;
Expand All @@ -26,9 +28,12 @@
import joptsimple.OptionSet;
import joptsimple.OptionSpec;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class CommandLineUtilsTest {
@Test
Expand Down Expand Up @@ -266,4 +271,105 @@ public void testInitializeBootstrapPropertiesWithBothBootstraps() {
() -> CommandLineUtils.initializeBootstrapProperties(createTestProps(),
Optional.of("127.0.0.2:9094"), Optional.of("127.0.0.3:9095"))).getMessage());
}

private OptionSpec<String> createMockOptionSpec(String name) {
OptionSpec<String> spec = mock(OptionSpec.class);
when(spec.toString()).thenReturn("[" + name.replaceAll("--", "") + "]");
return spec;
}

@Test
void testCheckOneOfArgsNoOptions() {
OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);

IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () ->
CommandLineUtils.checkOneOfArgs(parser, options)
);

assertEquals("At least one option must be provided", e.getMessage());
}

@Test
void testCheckOneOfArgsOnePresent() {
OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");

when(options.has(opt1)).thenReturn(true);
when(options.has(opt2)).thenReturn(false);
when(options.has(opt3)).thenReturn(false);

assertDoesNotThrow(() ->
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
);

when(options.has(opt1)).thenReturn(false);
when(options.has(opt2)).thenReturn(true);

assertDoesNotThrow(() ->
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
);

when(options.has(opt2)).thenReturn(false);
when(options.has(opt3)).thenReturn(true);

assertDoesNotThrow(() ->
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
);
}

@Test
void testCheckOneOfArgsNonePresent() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});

OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");

when(options.has(opt1)).thenReturn(false);
when(options.has(opt2)).thenReturn(false);
when(options.has(opt3)).thenReturn(false);

try {
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3));
assertEquals("Exactly one of the following arguments is required: " +
"[first-option], [second-option], [third-option]", e.getMessage());
} finally {
Exit.resetExitProcedure();
}
}

@Test
void testCheckOneOfArgsMultiplePresent() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});

OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");

when(options.has(opt1)).thenReturn(true);
when(options.has(opt2)).thenReturn(true);
when(options.has(opt3)).thenReturn(false);

try {
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3));
assertEquals("Exactly one of the following arguments is required: " +
"[first-option], [second-option], [third-option]", e.getMessage());
} finally {
Exit.resetExitProcedure();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

import joptsimple.OptionException;
import joptsimple.OptionSpec;
Expand Down Expand Up @@ -134,8 +136,13 @@ private static void consume(KafkaConsumer<byte[], byte[]> consumer,
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
SimpleDateFormat dateFormat = options.dateFormat();
consumer.subscribe(options.topic(),
new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound));

ConsumerPerfRebListener listener = new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound);
if (options.topic().isPresent()) {
consumer.subscribe(options.topic().get(), listener);
} else {
consumer.subscribe(options.include().get(), listener);
}

// now start the benchmark
long currentTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -246,6 +253,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
protected static class ConsumerPerfOptions extends CommandDefaultOptions {
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> includeOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
Expand All @@ -265,10 +273,14 @@ public ConsumerPerfOptions(String[] args) {
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
topicOpt = parser.accepts("topic", "The topic to consume from.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
includeOpt = parser.accepts("include", "Regular expression specifying list of topics to include for consumption.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class);
groupIdOpt = parser.accepts("group", "The group id to consume on.")
.withRequiredArg()
.describedAs("gid")
Expand Down Expand Up @@ -323,7 +335,8 @@ public ConsumerPerfOptions(String[] args) {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt);
CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt);
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
}
}

Expand Down Expand Up @@ -353,8 +366,16 @@ public Properties props() throws IOException {
return props;
}

public Set<String> topic() {
return Set.of(options.valueOf(topicOpt));
public Optional<Collection<String>> topic() {
return options.has(topicOpt)
? Optional.of(List.of(options.valueOf(topicOpt)))
: Optional.empty();
}

public Optional<Pattern> include() {
return options.has(includeOpt)
? Optional.of(Pattern.compile(options.valueOf(includeOpt)))
: Optional.empty();
}

public long numMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import org.apache.kafka.server.util.CommandLineUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -185,12 +183,8 @@ public ConsoleConsumerOptions(String[] args) throws IOException {
}

private void checkRequiredArgs() {
List<Optional<String>> topicOrFilterArgs = new ArrayList<>(List.of(topicArg(), includedTopicsArg()));
topicOrFilterArgs.removeIf(Optional::isEmpty);
// user need to specify value for either --topic or --include options
if (topicOrFilterArgs.size() != 1) {
CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. ");
}
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);

if (partitionArg().isPresent()) {
if (!options.has(topicOpt)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testConfigBootStrapServer() {
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);

assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().contains("test"));
assertTrue(config.topic().get().contains("test"));
assertEquals(10, config.numMessages());
}

Expand All @@ -93,6 +93,47 @@ public void testConfigWithUnrecognizedOption() {
assertTrue(err.contains("new-consumer is not a recognized option"));
}

@Test
public void testConfigWithInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "test.*",
"--messages", "10"
};

ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);

assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.include().get().toString().contains("test.*"));
assertEquals(10, config.numMessages());
}

@Test
public void testConfigWithTopicAndInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--include", "test.*",
"--messages", "10"
};

String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));

assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]"));
}

@Test
public void testConfigWithoutTopicAndInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--messages", "10"
};

String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));

assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]"));
}

@Test
public void testClientIdOverride() throws IOException {
File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();
Expand Down
Loading