Skip to content

Commit 5e59d0e

Browse files
[fix][client] Make auto partitions update work for old brokers without PIP-344 (#24822)
1 parent fc65db6 commit 5e59d0e

File tree

4 files changed

+61
-4
lines changed

4 files changed

+61
-4
lines changed

pip/pip-344.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ message FeatureFlags {
122122

123123
# Backward & Forward Compatibility
124124

125-
- Old version client and New version Broker: The client will call the old API.
125+
Old version (`< 3.0.6`) client and New version (`>= 3.0.6`) Broker: The client will call the old API.
126126

127-
- New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false.
127+
New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false in the following cases:
128+
- The topic is a DLQ topic
129+
- The topic is non-persistent
130+
- The topic is in geo-replication that the local cluster is new version and the remote cluster is old version

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Excep
260260
field.set(clientCnxFuture.get(), false);
261261
}
262262
try {
263-
clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
263+
clientWitBinaryLookup.getPartitionedTopicMetadata(topic, false, false).join();
264264
Assert.fail("Expected an error that the broker version is too old.");
265265
} catch (Exception ex) {
266266
Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported by the broker"));

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,7 @@ private void getPartitionedTopicMetadata(TopicName topicName,
12911291

12921292
@Override
12931293
public CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) {
1294-
return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, false).thenApply(metadata -> {
1294+
return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, true).thenApply(metadata -> {
12951295
if (metadata.partitions > 0) {
12961296
TopicName topicName = TopicName.get(topic);
12971297
List<String> partitions = new ArrayList<>(metadata.partitions);

tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,19 @@
1919
package org.apache.pulsar.tests.integration.backwardscompatibility;
2020

2121

22+
import java.util.List;
23+
import java.util.concurrent.TimeUnit;
2224
import java.util.function.Supplier;
25+
import lombok.Cleanup;
26+
import org.apache.pulsar.client.admin.PulsarAdmin;
27+
import org.apache.pulsar.client.api.Message;
28+
import org.apache.pulsar.client.api.MessageIdAdv;
29+
import org.apache.pulsar.client.api.MessageRouter;
30+
import org.apache.pulsar.client.api.MessageRoutingMode;
31+
import org.apache.pulsar.client.api.PulsarClient;
32+
import org.apache.pulsar.client.api.TopicMetadata;
2333
import org.apache.pulsar.tests.integration.topologies.ClientTestBase;
34+
import org.testng.Assert;
2435
import org.testng.annotations.Test;
2536

2637
public class ClientTest25 extends PulsarStandaloneTestSuite25 {
@@ -34,4 +45,47 @@ public void testResetCursorCompatibility(Supplier<String> serviceUrl, Supplier<S
3445
clientTestBase.resetCursorCompatibility(serviceUrl.get(), httpServiceUrl.get(), topicName);
3546
}
3647

48+
@Test(timeOut = 20000)
49+
public void testAutoPartitionsUpdate() throws Exception {
50+
@Cleanup final var pulsarClient = PulsarClient.builder()
51+
.serviceUrl(getContainer().getPlainTextServiceUrl())
52+
.build();
53+
final var topic = "test-auto-part-update";
54+
final var topic2 = "dummy-topic";
55+
@Cleanup final var admin = PulsarAdmin.builder().serviceHttpUrl(getContainer().getHttpServiceUrl()).build();
56+
// Use 2 as the initial partition number because old version broker cannot update partitions on a topic that
57+
// has only 1 partition.
58+
admin.topics().createPartitionedTopic(topic, 2);
59+
admin.topics().createPartitionedTopic(topic2, 2);
60+
@Cleanup final var producer = pulsarClient.newProducer().autoUpdatePartitions(true)
61+
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
62+
.messageRoutingMode(MessageRoutingMode.CustomPartition)
63+
.messageRouter(new MessageRouter() {
64+
@Override
65+
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
66+
return metadata.numPartitions() - 1;
67+
}
68+
})
69+
.topic(topic)
70+
.create();
71+
@Cleanup final var consumer = pulsarClient.newConsumer().autoUpdatePartitions(true)
72+
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
73+
.topic(topic).subscriptionName("sub")
74+
.subscribe();
75+
@Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer().autoUpdatePartitions(true)
76+
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
77+
.topics(List.of(topic, topic2)).subscriptionName("sub-2").subscribe();
78+
79+
admin.topics().updatePartitionedTopic(topic, 3);
80+
Thread.sleep(1500);
81+
final var msgId = (MessageIdAdv) producer.send("msg".getBytes());
82+
Assert.assertEquals(msgId.getPartitionIndex(), 2);
83+
84+
final var msg = consumer.receive(3, TimeUnit.SECONDS);
85+
Assert.assertNotNull(msg);
86+
Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2);
87+
final var msg2 = multiTopicsConsumer.receive(3, TimeUnit.SECONDS);
88+
Assert.assertNotNull(msg2);
89+
Assert.assertEquals(msg2.getMessageId(), msg.getMessageId());
90+
}
3791
}

0 commit comments

Comments
 (0)