Skip to content

Commit d913fe1

Browse files
garyrussellartembilan
authored andcommitted
GH-2501: Add Option to Set Consumer Thread Name
Resolves #2501
1 parent 08951f5 commit d913fe1

File tree

5 files changed

+73
-12
lines changed

5 files changed

+73
-12
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,19 +1734,24 @@ IMPORTANT: This is available in record listeners and batch listeners that receiv
17341734
It is **not** available in a batch listener that receives a `ConsumerRecords<?, ?>` argument.
17351735
Use the `KafkaUtils` mechanism in that case.
17361736

1737+
[[container-thread-naming]]
17371738
===== Container Thread Naming
17381739

1739-
Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property `enable.auto.commit` is `false`.
1740-
You can provide custom executors by setting the `consumerExecutor` and `listenerExecutor` properties of the container's `ContainerProperties`.
1740+
A `TaskExecutor` is used to invoke the consumer and the listener.
1741+
You can provide a custom executor by setting the `consumerExecutor` property of the container's `ContainerProperties`.
17411742
When using pooled executors, be sure that enough threads are available to handle the concurrency across all the containers in which they are used.
1742-
When using the `ConcurrentMessageListenerContainer`, a thread from each is used for each consumer (`concurrency`).
1743+
When using the `ConcurrentMessageListenerContainer`, a thread from the executor is used for each consumer (`concurrency`).
17431744

1744-
If you do not provide a consumer executor, a `SimpleAsyncTaskExecutor` is used.
1745-
This executor creates threads with names similar to `<beanName>-C-1` (consumer thread).
1745+
If you do not provide a consumer executor, a `SimpleAsyncTaskExecutor` is used for each container.
1746+
This executor creates threads with names similar to `<beanName>-C-<n>`.
17461747
For the `ConcurrentMessageListenerContainer`, the `<beanName>` part of the thread name becomes `<beanName>-m`, where `m` represents the consumer instance.
17471748
`n` increments each time the container is started.
17481749
So, with a bean name of `container`, threads in this container will be named `container-0-C-1`, `container-1-C-1` etc., after the container is started the first time; `container-0-C-2`, `container-1-C-2` etc., after a stop and subsequent start.
17491750

1751+
Starting with version `3.0.1`, you can now change the name of the thread, regardless of which executor is used.
1752+
Set the `ContainerProperties.changeConsumerThreadName` property to `true` and the `ContainerProperties.threadNameSupplier` will be invoked to obtain the thread name.
1753+
This is a `Function<MessageListenerContainer, String>`, with the default implementation returning `container.getListenerId()`.
1754+
17501755
[[kafka-listener-meta]]
17511756
===== `@KafkaListener` as a Meta Annotation
17521757

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ these methods now require an `ObjectProvider<RetryTopicComponentFactory>` parame
6464
Events related to consumer authentication and authorization failures are now published by the container.
6565
See <<events>> for more information.
6666

67+
You can now customize the thread names used by consumer threads.
68+
See <<container-thread-naming>> for more information.
69+
6770
[[x30-template-changes]]
6871
==== `KafkaTemplate` Changes
6972

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.function.Function;
2627
import java.util.regex.Pattern;
2728

2829
import org.aopalliance.aop.Advice;
@@ -33,6 +34,7 @@
3334
import org.springframework.core.task.AsyncTaskExecutor;
3435
import org.springframework.kafka.support.TopicPartitionOffset;
3536
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
37+
import org.springframework.lang.NonNull;
3638
import org.springframework.lang.Nullable;
3739
import org.springframework.scheduling.TaskScheduler;
3840
import org.springframework.transaction.PlatformTransactionManager;
@@ -287,6 +289,11 @@ public enum EOSMode {
287289

288290
private KafkaListenerObservationConvention observationConvention;
289291

292+
private boolean changeConsumerThreadName;
293+
294+
@NonNull
295+
private Function<MessageListenerContainer, String> threadNameSupplier = container -> container.getListenerId();
296+
290297
/**
291298
* Create properties for a container that will subscribe to the specified topics.
292299
* @param topics the topics.
@@ -945,6 +952,48 @@ public void setObservationConvention(KafkaListenerObservationConvention observat
945952
this.observationConvention = observationConvention;
946953
}
947954

955+
/**
956+
* Return true if the container should change the consumer thread name during
957+
* initialization.
958+
* @return true to change.
959+
* @since 3.0.1
960+
*/
961+
public boolean isChangeConsumerThreadName() {
962+
return this.changeConsumerThreadName;
963+
}
964+
965+
/**
966+
* Set to true to instruct the container to change the consumer thread name during
967+
* initialization.
968+
* @param changeConsumerThreadName true to change.
969+
* @since 3.0.1
970+
* @see #setThreadNameSupplier(Function)
971+
*/
972+
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) {
973+
this.changeConsumerThreadName = changeConsumerThreadName;
974+
}
975+
976+
/**
977+
* Return the function used to change the consumer thread name.
978+
* @return the function.
979+
* @since 3.0.1
980+
*/
981+
public Function<MessageListenerContainer, String> getThreadNameSupplier() {
982+
return this.threadNameSupplier;
983+
}
984+
985+
/**
986+
* Set a function used to change the consumer thread name. The default returns the
987+
* container {@code listenerId}.
988+
* @param threadNameSupplier the function.
989+
* @since 3.0.1
990+
* @see #setChangeConsumerThreadName(boolean)
991+
*/
992+
public void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
993+
Assert.notNull(threadNameSupplier, "'threadNameSupplier' cannot be null");
994+
this.threadNameSupplier = threadNameSupplier;
995+
}
996+
948997
@Override
949998
public String toString() {
950999
return "ContainerProperties ["
@@ -981,6 +1030,7 @@ public String toString() {
9811030
+ (this.observationConvention != null
9821031
? "\n observationConvention=" + this.observationConvention
9831032
: "")
1033+
+ "\n changeConsumerThreadName=" + this.changeConsumerThreadName
9841034
+ "\n]";
9851035
}
9861036

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,6 +1397,10 @@ public void run() {
13971397
}
13981398

13991399
protected void initialize() {
1400+
if (this.containerProperties.isChangeConsumerThreadName()) {
1401+
Thread.currentThread().setName(
1402+
this.containerProperties.getThreadNameSupplier().apply(KafkaMessageListenerContainer.this));
1403+
}
14001404
publishConsumerStartingEvent();
14011405
this.consumerThread = Thread.currentThread();
14021406
setupSeeks();

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
137137
ContainerProperties containerProps = new ContainerProperties(topic1);
138138
containerProps.setLogContainerConfig(true);
139139
containerProps.setClientId("client");
140+
containerProps.setChangeConsumerThreadName(true);
140141

141142
final CountDownLatch latch = new CountDownLatch(3);
142143
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
@@ -178,17 +179,15 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
178179
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
179180
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
180181
template.setDefaultTopic(topic1);
181-
template.sendDefault(0, "foo");
182-
template.sendDefault(2, "bar");
183-
template.sendDefault(0, "baz");
184-
template.sendDefault(2, "qux");
182+
template.sendDefault(0, 0, "foo");
183+
template.sendDefault(1, 2, "bar");
184+
template.sendDefault(0, 0, "baz");
185+
template.sendDefault(1, 2, "qux");
185186
template.flush();
186187
assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue();
187188
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
188189
assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux");
189-
for (String threadName : listenerThreadNames) {
190-
assertThat(threadName).contains("-C-");
191-
}
190+
assertThat(listenerThreadNames).contains("testAuto-0", "testAuto-1");
192191
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
193192
"containers", List.class);
194193
assertThat(containers).hasSize(2);

0 commit comments

Comments
 (0)