Skip to content

Commit ea8aea7

Browse files
committed
Pulsar topic types in Container Properties
Pulsar Java client expects topics to be provided as a Set. In PulsarContainerProperties, we were using an array type. In order to make it consistent with the Pulsar Java client, we are migrating this type in PulsarContainerProperties to Set<String>.
1 parent 45da73d commit ea8aea7

File tree

6 files changed

+16
-13
lines changed

6 files changed

+16
-13
lines changed

spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.pulsar.spring.cloud.stream.binder;
1818

1919
import java.util.Optional;
20+
import java.util.Set;
2021

2122
import org.apache.pulsar.client.api.PulsarClientException;
2223
import org.apache.pulsar.client.api.Schema;
@@ -137,7 +138,7 @@ private PulsarBinderHeaderMapper determineOutboundHeaderMapper(
137138
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
138139
ExtendedConsumerProperties<PulsarConsumerProperties> properties) {
139140
var containerProperties = new PulsarContainerProperties();
140-
containerProperties.setTopics(new String[] { destination.getName() });
141+
containerProperties.setTopics(Set.of(destination.getName()));
141142

142143
var inboundHeaderMapper = determineInboundHeaderMapper(properties);
143144

spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Arrays;
2020
import java.util.Collection;
21+
import java.util.HashSet;
2122

2223
import org.springframework.lang.Nullable;
2324
import org.springframework.pulsar.core.PulsarConsumerFactory;
@@ -62,7 +63,7 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
6263
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
6364

6465
if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
65-
properties.setTopics(endpoint.getTopics().toArray(new String[0]));
66+
properties.setTopics(new HashSet<>(endpoint.getTopics()));
6667
}
6768

6869
if (StringUtils.hasText(endpoint.getTopicPattern())) {

spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.List;
2222
import java.util.Optional;
23+
import java.util.Set;
2324

2425
import org.apache.pulsar.client.api.Consumer;
2526
import org.apache.pulsar.client.api.DeadLetterPolicy;
@@ -159,7 +160,7 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageListener(
159160
|| StringUtils.hasText(pulsarContainerProperties.getTopicsPattern());
160161
if (!hasTopicInfo) {
161162
topicResolver.resolveTopic(null, messageType.getRawClass(), () -> null)
162-
.ifResolved((topic) -> pulsarContainerProperties.setTopics(new String[] { topic }));
163+
.ifResolved((topic) -> pulsarContainerProperties.setTopics(Set.of(topic)));
163164
}
164165

165166
container.setNegativeAckRedeliveryBackoff(this.negativeAckRedeliveryBackoff);

spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.pulsar.listener;
1818

1919
import java.util.ArrayList;
20-
import java.util.Arrays;
2120
import java.util.HashMap;
2221
import java.util.HashSet;
2322
import java.util.List;
@@ -327,9 +326,8 @@ private void populateAllNecessaryPropertiesIfNeedBe(Map<String, Object> currentP
327326
}
328327
}
329328
if (!currentProperties.containsKey("topicNames")) {
330-
String[] topics = this.containerProperties.getTopics();
331-
Set<String> listenerDefinedTopics = new HashSet<>(Arrays.stream(topics).toList());
332-
if (!listenerDefinedTopics.isEmpty()) {
329+
Set<String> listenerDefinedTopics = this.containerProperties.getTopics();
330+
if (!this.containerProperties.getTopics().isEmpty()) {
333331
currentProperties.put("topicNames", listenerDefinedTopics);
334332
}
335333
}

spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.util.Properties;
21+
import java.util.Set;
2122

2223
import org.apache.pulsar.client.api.Schema;
2324
import org.apache.pulsar.client.api.SubscriptionType;
@@ -48,7 +49,7 @@ public class PulsarContainerProperties {
4849

4950
private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT;
5051

51-
private String[] topics;
52+
private Set<String> topics;
5253

5354
private String topicsPattern;
5455

@@ -83,7 +84,7 @@ public class PulsarContainerProperties {
8384
private Properties pulsarConsumerProperties = new Properties();
8485

8586
public PulsarContainerProperties(String... topics) {
86-
this.topics = topics.clone();
87+
this.topics = Set.of(topics);
8788
this.topicsPattern = null;
8889
this.schemaResolver = new DefaultSchemaResolver();
8990
this.topicResolver = new DefaultTopicResolver();
@@ -186,11 +187,11 @@ public void setConsumerStartTimeout(Duration consumerStartTimeout) {
186187
this.consumerStartTimeout = consumerStartTimeout;
187188
}
188189

189-
public String[] getTopics() {
190+
public Set<String> getTopics() {
190191
return this.topics;
191192
}
192193

193-
public void setTopics(String[] topics) {
194+
public void setTopics(Set<String> topics) {
194195
this.topics = topics;
195196
}
196197

spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Collections;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.Set;
3233
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.locks.Condition;
@@ -293,7 +294,7 @@ void deadLetterPolicyDefault() throws Exception {
293294
.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> dlqLatch.countDown());
294295
dlqContainerProperties.setSchema(Schema.INT32);
295296
dlqContainerProperties.setSubscriptionType(SubscriptionType.Shared);
296-
dlqContainerProperties.setTopics(new String[] { "dpmlct-016-dlq-topic" });
297+
dlqContainerProperties.setTopics(Set.of("dpmlct-016-dlq-topic"));
297298
DefaultPulsarMessageListenerContainer<Integer> dlqContainer = new DefaultPulsarMessageListenerContainer<>(
298299
pulsarConsumerFactory, dlqContainerProperties);
299300
dlqContainer.start();
@@ -349,7 +350,7 @@ void deadLetterPolicyCustom() throws Exception {
349350
.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> dlqLatch.countDown());
350351
dlqContainerProperties.setSchema(Schema.INT32);
351352
dlqContainerProperties.setSubscriptionType(SubscriptionType.Shared);
352-
dlqContainerProperties.setTopics(new String[] { "dlq-topic" });
353+
dlqContainerProperties.setTopics(Set.of("dlq-topic"));
353354
DefaultPulsarMessageListenerContainer<Integer> dlqContainer = new DefaultPulsarMessageListenerContainer<>(
354355
pulsarConsumerFactory, dlqContainerProperties);
355356
dlqContainer.start();

0 commit comments

Comments
 (0)