Skip to content

Commit b4956d4

Browse files
garyrussellartembilan
authored andcommitted
GH-2501: Fix Class Tangle
See #2501 New tangle between `container <-> properties`. Move the new properties and accessors to the abstract container.
1 parent d913fe1 commit b4956d4

File tree

7 files changed

+91
-55
lines changed

7 files changed

+91
-55
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1749,7 +1749,7 @@ For the `ConcurrentMessageListenerContainer`, the `<beanName>` part of the threa
17491749
So, with a bean name of `container`, threads in this container will be named `container-0-C-1`, `container-1-C-1` etc., after the container is started the first time; `container-0-C-2`, `container-1-C-2` etc., after a stop and subsequent start.
17501750

17511751
Starting with version `3.0.1`, you can now change the name of the thread, regardless of which executor is used.
1752-
Set the `ContainerProperties.changeConsumerThreadName` property to `true` and the `ContainerProperties.threadNameSupplier` will be invoked to obtain the thread name.
1752+
Set the `AbstractMessageListenerContainer.changeConsumerThreadName` property to `true` and the `AbstractMessageListenerContainer.threadNameSupplier` will be invoked to obtain the thread name.
17531753
This is a `Function<MessageListenerContainer, String>`, with the default implementation returning `container.getListenerId()`.
17541754

17551755
[[kafka-listener-meta]]

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.Arrays;
2121
import java.util.Collection;
22+
import java.util.function.Function;
2223
import java.util.regex.Pattern;
2324

2425
import org.apache.commons.logging.LogFactory;
@@ -38,6 +39,7 @@
3839
import org.springframework.kafka.listener.BatchInterceptor;
3940
import org.springframework.kafka.listener.CommonErrorHandler;
4041
import org.springframework.kafka.listener.ContainerProperties;
42+
import org.springframework.kafka.listener.MessageListenerContainer;
4143
import org.springframework.kafka.listener.RecordInterceptor;
4244
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
4345
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -110,6 +112,10 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
110112

111113
private String correlationHeaderName;
112114

115+
private Boolean changeConsumerThreadName;
116+
117+
private Function<MessageListenerContainer, String> threadNameSupplier;
118+
113119
@Override
114120
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
115121
this.applicationContext = applicationContext;
@@ -334,6 +340,29 @@ public void setCorrelationHeaderName(String correlationHeaderName) {
334340
this.correlationHeaderName = correlationHeaderName;
335341
}
336342

343+
/**
344+
* Set to true to instruct the container to change the consumer thread name during
345+
* initialization.
346+
* @param changeConsumerThreadName true to change.
347+
* @since 3.0.1
348+
* @see #setThreadNameSupplier(Function)
349+
*/
350+
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) {
351+
this.changeConsumerThreadName = changeConsumerThreadName;
352+
}
353+
354+
/**
355+
* Set a function used to change the consumer thread name. The default returns the
356+
* container {@code listenerId}.
357+
* @param threadNameSupplier the function.
358+
* @since 3.0.1
359+
* @see #setChangeConsumerThreadName(boolean)
360+
*/
361+
public void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
362+
Assert.notNull(threadNameSupplier, "'threadNameSupplier' cannot be null");
363+
this.threadNameSupplier = threadNameSupplier;
364+
}
365+
337366
@SuppressWarnings("deprecation")
338367
@Override
339368
public void afterPropertiesSet() {
@@ -414,7 +443,9 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
414443
properties::setSubBatchPerPartition)
415444
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
416445
.acceptIfNotNull(this.commonErrorHandler, instance::setCommonErrorHandler)
417-
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
446+
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal)
447+
.acceptIfNotNull(this.changeConsumerThreadName, instance::setChangeConsumerThreadName)
448+
.acceptIfNotNull(this.threadNameSupplier, instance::setThreadNameSupplier);
418449
Boolean autoStart = endpoint.getAutoStartup();
419450
if (autoStart != null) {
420451
instance.setAutoStartup(autoStart);

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.function.Function;
2829
import java.util.regex.Pattern;
2930
import java.util.stream.Collectors;
3031

@@ -126,6 +127,11 @@ public abstract class AbstractMessageListenerContainer<K, V>
126127
@Nullable
127128
private String mainListenerId;
128129

130+
private boolean changeConsumerThreadName;
131+
132+
@NonNull
133+
private Function<MessageListenerContainer, String> threadNameSupplier = container -> container.getListenerId();
134+
129135
/**
130136
* Construct an instance with the provided factory and properties.
131137
* @param consumerFactory the factory.
@@ -423,6 +429,48 @@ public void setTopicCheckTimeout(int topicCheckTimeout) {
423429
this.topicCheckTimeout = topicCheckTimeout;
424430
}
425431

432+
/**
433+
* Return true if the container should change the consumer thread name during
434+
* initialization.
435+
* @return true to change.
436+
* @since 3.0.1
437+
*/
438+
public boolean isChangeConsumerThreadName() {
439+
return this.changeConsumerThreadName;
440+
}
441+
442+
/**
443+
* Set to true to instruct the container to change the consumer thread name during
444+
* initialization.
445+
* @param changeConsumerThreadName true to change.
446+
* @since 3.0.1
447+
* @see #setThreadNameSupplier(Function)
448+
*/
449+
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) {
450+
this.changeConsumerThreadName = changeConsumerThreadName;
451+
}
452+
453+
/**
454+
* Return the function used to change the consumer thread name.
455+
* @return the function.
456+
* @since 3.0.1
457+
*/
458+
public Function<MessageListenerContainer, String> getThreadNameSupplier() {
459+
return this.threadNameSupplier;
460+
}
461+
462+
/**
463+
* Set a function used to change the consumer thread name. The default returns the
464+
* container {@code listenerId}.
465+
* @param threadNameSupplier the function.
466+
* @since 3.0.1
467+
* @see #setChangeConsumerThreadName(boolean)
468+
*/
469+
public void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
470+
Assert.notNull(threadNameSupplier, "'threadNameSupplier' cannot be null");
471+
this.threadNameSupplier = threadNameSupplier;
472+
}
473+
426474
protected RecordInterceptor<K, V> getRecordInterceptor() {
427475
return this.recordInterceptor;
428476
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.function.Function;
2726
import java.util.regex.Pattern;
2827

2928
import org.aopalliance.aop.Advice;
@@ -34,7 +33,6 @@
3433
import org.springframework.core.task.AsyncTaskExecutor;
3534
import org.springframework.kafka.support.TopicPartitionOffset;
3635
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
37-
import org.springframework.lang.NonNull;
3836
import org.springframework.lang.Nullable;
3937
import org.springframework.scheduling.TaskScheduler;
4038
import org.springframework.transaction.PlatformTransactionManager;
@@ -289,11 +287,6 @@ public enum EOSMode {
289287

290288
private KafkaListenerObservationConvention observationConvention;
291289

292-
private boolean changeConsumerThreadName;
293-
294-
@NonNull
295-
private Function<MessageListenerContainer, String> threadNameSupplier = container -> container.getListenerId();
296-
297290
/**
298291
* Create properties for a container that will subscribe to the specified topics.
299292
* @param topics the topics.
@@ -952,48 +945,6 @@ public void setObservationConvention(KafkaListenerObservationConvention observat
952945
this.observationConvention = observationConvention;
953946
}
954947

955-
/**
956-
* Return true if the container should change the consumer thread name during
957-
* initialization.
958-
* @return true to change.
959-
* @since 3.0.1
960-
*/
961-
public boolean isChangeConsumerThreadName() {
962-
return this.changeConsumerThreadName;
963-
}
964-
965-
/**
966-
* Set to true to instruct the container to change the consumer thread name during
967-
* initialization.
968-
* @param changeConsumerThreadName true to change.
969-
* @since 3.0.1
970-
* @see #setThreadNameSupplier(Function)
971-
*/
972-
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) {
973-
this.changeConsumerThreadName = changeConsumerThreadName;
974-
}
975-
976-
/**
977-
* Return the function used to change the consumer thread name.
978-
* @return the function.
979-
* @since 3.0.1
980-
*/
981-
public Function<MessageListenerContainer, String> getThreadNameSupplier() {
982-
return this.threadNameSupplier;
983-
}
984-
985-
/**
986-
* Set a function used to change the consumer thread name. The default returns the
987-
* container {@code listenerId}.
988-
* @param threadNameSupplier the function.
989-
* @since 3.0.1
990-
* @see #setChangeConsumerThreadName(boolean)
991-
*/
992-
public void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
993-
Assert.notNull(threadNameSupplier, "'threadNameSupplier' cannot be null");
994-
this.threadNameSupplier = threadNameSupplier;
995-
}
996-
997948
@Override
998949
public String toString() {
999950
return "ContainerProperties ["
@@ -1030,7 +981,6 @@ public String toString() {
1030981
+ (this.observationConvention != null
1031982
? "\n observationConvention=" + this.observationConvention
1032983
: "")
1033-
+ "\n changeConsumerThreadName=" + this.changeConsumerThreadName
1034984
+ "\n]";
1035985
}
1036986

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,9 +1397,10 @@ public void run() {
13971397
}
13981398

13991399
protected void initialize() {
1400-
if (this.containerProperties.isChangeConsumerThreadName()) {
1400+
if (KafkaMessageListenerContainer.this.thisOrParentContainer.isChangeConsumerThreadName()) {
14011401
Thread.currentThread().setName(
1402-
this.containerProperties.getThreadNameSupplier().apply(KafkaMessageListenerContainer.this));
1402+
KafkaMessageListenerContainer.this.thisOrParentContainer.getThreadNameSupplier()
1403+
.apply(KafkaMessageListenerContainer.this));
14031404
}
14041405
publishConsumerStartingEvent();
14051406
this.consumerThread = Thread.currentThread();

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,7 @@ public void testProjection() throws InterruptedException {
920920
assertThat(this.listener.projectionLatch.await(60, TimeUnit.SECONDS)).isTrue();
921921
assertThat(this.listener.name).isEqualTo("SomeName");
922922
assertThat(this.listener.username).isEqualTo("SomeUsername");
923+
assertThat(this.listener.customThreadName).isEqualTo("foo.projection-0");
923924
}
924925

925926
@SuppressWarnings("unchecked")
@@ -1167,6 +1168,8 @@ public KafkaListenerContainerFactory<?> projectionListenerContainerFactory() {
11671168
typeMapper.addTrustedPackages("*");
11681169
converter.setTypeMapper(typeMapper);
11691170
factory.setMessageConverter(new ProjectingMessageConverter(converter));
1171+
factory.setChangeConsumerThreadName(true);
1172+
factory.setThreadNameSupplier(container -> "foo." + container.getListenerId());
11701173
return factory;
11711174
}
11721175

@@ -1888,6 +1891,8 @@ static class Listener implements ConsumerSeekAware {
18881891

18891892
volatile String batchOverrideStackTrace;
18901893

1894+
volatile String customThreadName;
1895+
18911896
@KafkaListener(id = "manualStart", topics = "manualStart",
18921897
containerFactory = "kafkaAutoStartFalseListenerContainerFactory",
18931898
properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000",
@@ -2206,6 +2211,7 @@ public void projectionListener(ProjectionSample sample) {
22062211
this.username = sample.getUsername();
22072212
this.name = sample.getName();
22082213
this.projectionLatch.countDown();
2214+
this.customThreadName = Thread.currentThread().getName();
22092215
}
22102216

22112217
@KafkaListener(id = "customMethodArgumentResolver", topics = "annotated39")

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
137137
ContainerProperties containerProps = new ContainerProperties(topic1);
138138
containerProps.setLogContainerConfig(true);
139139
containerProps.setClientId("client");
140-
containerProps.setChangeConsumerThreadName(true);
141140

142141
final CountDownLatch latch = new CountDownLatch(3);
143142
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
@@ -153,6 +152,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
153152
new ConcurrentMessageListenerContainer<>(cf, containerProps);
154153
container.setConcurrency(2);
155154
container.setBeanName("testAuto");
155+
container.setChangeConsumerThreadName(true);
156156
BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
157157
CountDownLatch stopLatch = new CountDownLatch(4);
158158
container.setApplicationEventPublisher(e -> {

0 commit comments

Comments
 (0)