Skip to content

Commit 950a999

Browse files
garyrussellartembilan
authored andcommitted
GH-800: Fix Zombie Fencing
Resolves #800 Fix assignment of `transactional.id` to be consistent across consumers. **cherry-pick to all versions >= 1.3.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java # src/reference/asciidoc/whats-new.adoc
1 parent db84b14 commit 950a999

File tree

6 files changed

+163
-10
lines changed

6 files changed

+163
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818

1919
import java.util.Collections;
2020
import java.util.HashMap;
21+
import java.util.Iterator;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Map.Entry;
2325
import java.util.concurrent.BlockingQueue;
2426
import java.util.concurrent.Future;
2527
import java.util.concurrent.LinkedBlockingQueue;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.function.Consumer;
2831

2932
import org.apache.commons.logging.Log;
3033
import org.apache.commons.logging.LogFactory;
@@ -44,6 +47,7 @@
4447

4548
import org.springframework.beans.factory.DisposableBean;
4649
import org.springframework.context.Lifecycle;
50+
import org.springframework.kafka.support.TransactionSupport;
4751
import org.springframework.util.Assert;
4852

4953
/**
@@ -81,6 +85,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
8185

8286
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
8387

88+
private final Map<String, Producer<K, V>> consumerProducers = new HashMap<>();
89+
8490
private volatile CloseSafeProducer<K, V> producer;
8591

8692
private Serializer<K> keySerializer;
@@ -93,6 +99,12 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
9399

94100
private volatile boolean running;
95101

102+
private boolean producerPerConsumerPartition = true;
103+
104+
/**
105+
* Construct a factory with the provided configuration.
106+
* @param configs the configuration.
107+
*/
96108
public DefaultKafkaProducerFactory(Map<String, Object> configs) {
97109
this(configs, null, null);
98110
}
@@ -144,6 +156,17 @@ private void enableIdempotentBehaviour() {
144156
}
145157
}
146158

159+
/**
160+
* Set to false to revert to the previous behavior of a simple incrementing
161+
* trasactional.id suffix for each producer instead of maintaining a producer
162+
* for each group/topic/partition.
163+
* @param producerPerConsumerPartition false to revert.
164+
* @since 1.3.7
165+
*/
166+
public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition) {
167+
this.producerPerConsumerPartition = producerPerConsumerPartition;
168+
}
169+
147170
/**
148171
* Return an unmodifiable reference to the configuration map for this factory.
149172
* Useful for cloning to make a similar factory.
@@ -177,6 +200,12 @@ public void destroy() throws Exception { //NOSONAR
177200
}
178201
producer = this.cache.poll();
179202
}
203+
synchronized (this.consumerProducers) {
204+
this.consumerProducers.forEach(
205+
(k, v) -> ((CloseSafeProducer<K, V>) v).delegate
206+
.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
207+
this.consumerProducers.clear();
208+
}
180209
}
181210

182211
@Override
@@ -205,7 +234,12 @@ public boolean isRunning() {
205234
@Override
206235
public Producer<K, V> createProducer() {
207236
if (this.transactionIdPrefix != null) {
208-
return createTransactionalProducer();
237+
if (this.producerPerConsumerPartition) {
238+
return createTransactionalProducerForPartition();
239+
}
240+
else {
241+
return createTransactionalProducer();
242+
}
209243
}
210244
if (this.producer == null) {
211245
synchronized (this) {
@@ -226,6 +260,37 @@ protected Producer<K, V> createKafkaProducer() {
226260
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
227261
}
228262

263+
private Producer<K, V> createTransactionalProducerForPartition() {
264+
String suffix = TransactionSupport.getTransactionIdSuffix();
265+
if (suffix == null) {
266+
return createTransactionalProducer();
267+
}
268+
else {
269+
synchronized (this.consumerProducers) {
270+
if (!this.consumerProducers.containsKey(suffix)) {
271+
Producer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
272+
this.consumerProducers.put(suffix, newProducer);
273+
return newProducer;
274+
}
275+
else {
276+
return this.consumerProducers.get(suffix);
277+
}
278+
}
279+
}
280+
}
281+
282+
private void removeConsumerProducer(CloseSafeProducer<K, V> producer) {
283+
synchronized (this.consumerProducers) {
284+
Iterator<Entry<String, Producer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
285+
while (iterator.hasNext()) {
286+
if (iterator.next().getValue().equals(producer)) {
287+
iterator.remove();
288+
break;
289+
}
290+
}
291+
}
292+
}
293+
229294
/**
230295
* Subclasses must return a producer from the {@link #getCache()} or a
231296
* new raw producer wrapped in a {@link CloseSafeProducer}.
@@ -235,18 +300,22 @@ protected Producer<K, V> createKafkaProducer() {
235300
protected Producer<K, V> createTransactionalProducer() {
236301
Producer<K, V> producer = this.cache.poll();
237302
if (producer == null) {
238-
Map<String, Object> configs = new HashMap<>(this.configs);
239-
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
240-
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
241-
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
242-
producer.initTransactions();
243-
return new CloseSafeProducer<K, V>(producer, this.cache);
303+
return doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement(), null);
244304
}
245305
else {
246306
return producer;
247307
}
248308
}
249309

310+
private Producer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
311+
Producer<K, V> producer;
312+
Map<String, Object> configs = new HashMap<>(this.configs);
313+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
314+
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
315+
producer.initTransactions();
316+
return new CloseSafeProducer<K, V>(producer, this.cache, remover);
317+
}
318+
250319
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
251320
return this.cache;
252321
}
@@ -264,16 +333,24 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
264333

265334
private final BlockingQueue<CloseSafeProducer<K, V>> cache;
266335

336+
private final Consumer<CloseSafeProducer<K, V>> removeConsumerProducer;
337+
267338
private volatile boolean txFailed;
268339

269340
CloseSafeProducer(Producer<K, V> delegate) {
270-
this(delegate, null);
341+
this(delegate, null, null);
271342
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
272343
}
273344

274345
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
346+
this(delegate, cache, null);
347+
}
348+
349+
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
350+
Consumer<CloseSafeProducer<K, V>> removeConsumerProducer) {
275351
this.delegate = delegate;
276352
this.cache = cache;
353+
this.removeConsumerProducer = removeConsumerProducer;
277354
}
278355

279356
@Override
@@ -353,6 +430,9 @@ public void close() {
353430
+ "broker restarted during transaction");
354431

355432
this.delegate.close();
433+
if (this.removeConsumerProducer != null) {
434+
this.removeConsumerProducer.accept(this);
435+
}
356436
}
357437
else {
358438
synchronized (this) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.springframework.kafka.support.LogIfLevelEnabled;
6666
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6767
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
68+
import org.springframework.kafka.support.TransactionSupport;
6869
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
6970
import org.springframework.scheduling.SchedulingAwareRunnable;
7071
import org.springframework.scheduling.TaskScheduler;
@@ -1012,6 +1013,8 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
10121013
this.logger.trace("Processing " + record);
10131014
}
10141015
try {
1016+
TransactionSupport.setTransactionIdSuffix(
1017+
this.consumerGroupId + "." + record.topic() + "." + record.partition());
10151018
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
10161019

10171020
@Override
@@ -1038,6 +1041,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
10381041
}
10391042
getAfterRollbackProcessor().process(unprocessed, this.consumer);
10401043
}
1044+
finally {
1045+
TransactionSupport.clearTransactionIdSuffix();
1046+
}
10411047
}
10421048
}
10431049

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
/**
20+
* Utilities for supporting transactions.
21+
*
22+
* @author Gary Russell
23+
* @since 1.3.7
24+
*
25+
*/
26+
public final class TransactionSupport {
27+
28+
private static final ThreadLocal<String> transactionIdSuffix = new ThreadLocal<>();
29+
30+
private TransactionSupport() {
31+
super();
32+
}
33+
34+
public static void setTransactionIdSuffix(String suffix) {
35+
transactionIdSuffix.set(suffix);
36+
}
37+
38+
public static String getTransactionIdSuffix() {
39+
return transactionIdSuffix.get();
40+
}
41+
42+
public static void clearTransactionIdSuffix() {
43+
transactionIdSuffix.remove();
44+
}
45+
46+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.CountDownLatch;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.concurrent.atomic.AtomicReference;
4243

4344
import org.apache.commons.logging.Log;
4445
import org.apache.commons.logging.LogFactory;
@@ -63,6 +64,7 @@
6364
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
6465
import org.springframework.kafka.core.KafkaTemplate;
6566
import org.springframework.kafka.core.ProducerFactory;
67+
import org.springframework.kafka.core.ProducerFactoryUtils;
6668
import org.springframework.kafka.listener.config.ContainerProperties;
6769
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6870
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -160,7 +162,8 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
160162
}
161163
});
162164
if (handleError) {
163-
props.setErrorHandler((e, data) -> { });
165+
props.setErrorHandler((e, data) -> {
166+
});
164167
}
165168
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
166169
container.setBeanName("commit");
@@ -377,6 +380,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
377380
verify(pf).createProducer();
378381
}
379382

383+
@SuppressWarnings("unchecked")
380384
@Test
381385
public void testRollbackRecord() throws Exception {
382386
logger.info("Start testRollbackRecord");
@@ -397,6 +401,7 @@ public void testRollbackRecord() throws Exception {
397401
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
398402
final AtomicBoolean failed = new AtomicBoolean();
399403
final CountDownLatch latch = new CountDownLatch(3);
404+
final AtomicReference<String> transactionalId = new AtomicReference<>();
400405
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
401406
latch.countDown();
402407
if (failed.compareAndSet(false, true)) {
@@ -408,6 +413,9 @@ public void testRollbackRecord() throws Exception {
408413
if (message.topic().equals(topic1)) {
409414
template.send(topic2, "bar");
410415
template.flush();
416+
transactionalId.set(KafkaTestUtils.getPropertyValue(
417+
ProducerFactoryUtils.getTransactionalResourceHolder(pf).getProducer(),
418+
"delegate.transactionManager.transactionalId", String.class));
411419
}
412420
});
413421

@@ -444,8 +452,11 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
444452
ConsumerRecords<Integer, String> records = consumer.poll(0);
445453
assertThat(records.count()).isEqualTo(0);
446454
assertThat(consumer.position(new TopicPartition(topic1, 0))).isEqualTo(1);
455+
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
447456
logger.info("Stop testRollbackRecord");
448457
pf.destroy();
458+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
459+
consumer.close();
449460
}
450461

451462
@SuppressWarnings("serial")

src/reference/asciidoc/kafka.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,12 @@ Spring for Apache Kafka adds support in several ways.
256256
Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`.
257257
In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.
258258
When the user `close()` s a producer, it is returned to the cache for reuse instead of actually being closed.
259-
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer.
259+
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener.
260+
In that case, the `transactional.id` is `<transactionIdPrefix>.<group.id>.<topic>.<partition>`; this is to properly support fencing zombies https://www.confluent.io/blog/transactions-apache-kafka/[as described here].
261+
This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0.
262+
If you wish to revert to the previous behavior, set the `producerPerConsumerPartition` property on the `DefaultKafkaProducerFactory` to `false`.
263+
264+
NOTE: While transactions are supported with batch listeners, zombie fencing cannot be supported because a batch may contain records from multiple topics/partitions.
260265

261266
====== KafkaTransactionManager
262267

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ _Version 2.1.3_ introduced the `ChainedKafkaTransactionManager` see <<chained-tr
5959
Starting with _version 2.1.6_, a new `AfterRollbackProcessor` strategy is provided - see <<after-rollback>> for more information.
6060

6161

62+
==== Transactional Id
63+
64+
When a transaction is started by the listener container, the `transactional.id` is now the `transactionIdPrefix` appended with `<group.id>.<topic>.<partition>`.
65+
This is to allow proper fencing of zombies https://www.confluent.io/blog/transactions-apache-kafka/[as described here].
66+
6267
==== Migration Guide from 2.0
6368

6469
https://github.com/spring-projects/spring-kafka/wiki/Spring-for-Apache-Kafka-2.0-to-2.1-Migration-Guide[2.0 to 2.1 Migration].

0 commit comments

Comments
 (0)