Skip to content

Commit 1074c07

Browse files
committed
Fix topics race in Kafka module tests using an embedded broker
1 parent c1efa8c commit 1074c07

File tree

1 file changed

+31
-29
lines changed

1 file changed

+31
-29
lines changed

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.kafka.common.TopicPartition;
3939
import org.apache.kafka.common.header.Headers;
4040
import org.apache.kafka.common.header.internals.RecordHeaders;
41-
import org.junit.jupiter.api.BeforeAll;
4241
import org.junit.jupiter.api.Test;
4342

4443
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -75,6 +74,8 @@
7574
import org.springframework.kafka.support.converter.MessagingMessageConverter;
7675
import org.springframework.kafka.support.converter.RecordMessageConverter;
7776
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
77+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
78+
import org.springframework.kafka.test.context.EmbeddedKafka;
7879
import org.springframework.kafka.test.utils.ContainerTestUtils;
7980
import org.springframework.kafka.test.utils.KafkaTestUtils;
8081
import org.springframework.messaging.Message;
@@ -111,6 +112,14 @@
111112
* @since 5.4
112113
*
113114
*/
115+
@EmbeddedKafka(controlledShutdown = true,
116+
partitions = 1,
117+
topics = {MessageDrivenAdapterTests.topic1,
118+
MessageDrivenAdapterTests.topic2,
119+
MessageDrivenAdapterTests.topic3,
120+
MessageDrivenAdapterTests.topic4,
121+
MessageDrivenAdapterTests.topic5,
122+
MessageDrivenAdapterTests.topic6})
114123
class MessageDrivenAdapterTests implements TestApplicationContextAware {
115124

116125
static final String topic1 = "testTopic1";
@@ -125,16 +134,9 @@ class MessageDrivenAdapterTests implements TestApplicationContextAware {
125134

126135
static final String topic6 = "testTopic6";
127136

128-
static String EMBEDDED_BROKERS;
129-
130-
@BeforeAll
131-
static void setup() {
132-
EMBEDDED_BROKERS = System.getProperty("spring.global.embedded.kafka.brokers");
133-
}
134-
135137
@Test
136-
void testInboundRecord() {
137-
Map<String, Object> props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test1", true);
138+
void testInboundRecord(EmbeddedKafkaBroker embeddedKafka) {
139+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test1", true);
138140
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
139141
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
140142
ContainerProperties containerProps = new ContainerProperties(topic1);
@@ -158,7 +160,7 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
158160
adapter.start();
159161
ContainerTestUtils.waitForAssignment(container, 1);
160162

161-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
163+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
162164
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
163165
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
164166
template.setDefaultTopic(topic1);
@@ -223,8 +225,8 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
223225
}
224226

225227
@Test
226-
void testInboundRecordRetryRecover() {
227-
Map<String, Object> props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test4", true);
228+
void testInboundRecordRetryRecover(EmbeddedKafkaBroker embeddedKafka) {
229+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test4", true);
228230
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
229231
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
230232
ContainerProperties containerProps = new ContainerProperties(topic4);
@@ -255,7 +257,7 @@ protected boolean doSend(Message<?> message, long timeout) {
255257
adapter.start();
256258
ContainerTestUtils.waitForAssignment(container, 1);
257259

258-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
260+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
259261
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
260262
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
261263
template.setDefaultTopic(topic4);
@@ -291,8 +293,8 @@ protected boolean doSend(Message<?> message, long timeout) {
291293
* to the consumer.
292294
*/
293295
@Test
294-
void testInboundRecordRetryRecoverWithoutRecoveryCallback() throws Exception {
295-
Map<String, Object> props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test6", true);
296+
void testInboundRecordRetryRecoverWithoutRecoveryCallback(EmbeddedKafkaBroker embeddedKafka) throws Exception {
297+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test6", true);
296298
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
297299
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
298300
ContainerProperties containerProps = new ContainerProperties(topic6);
@@ -329,7 +331,7 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
329331
adapter.start();
330332
ContainerTestUtils.waitForAssignment(container, 1);
331333

332-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
334+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
333335
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
334336
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
335337
template.setDefaultTopic(topic6);
@@ -342,8 +344,8 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
342344
}
343345

344346
@Test
345-
void testInboundRecordNoRetryRecover() {
346-
Map<String, Object> props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test5", true);
347+
void testInboundRecordNoRetryRecover(EmbeddedKafkaBroker embeddedKafka) {
348+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test5", true);
347349
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
348350
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
349351
ContainerProperties containerProps = new ContainerProperties(topic5);
@@ -371,7 +373,7 @@ protected boolean doSend(Message<?> message, long timeout) {
371373
adapter.start();
372374
ContainerTestUtils.waitForAssignment(container, 1);
373375

374-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
376+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
375377
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
376378
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
377379
template.setDefaultTopic(topic5);
@@ -399,8 +401,8 @@ protected boolean doSend(Message<?> message, long timeout) {
399401
}
400402

401403
@Test
402-
void testInboundBatch() throws Exception {
403-
Map<String, Object> props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test2", true);
404+
void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception {
405+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test2", true);
404406
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
405407
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
406408
ContainerProperties containerProps = new ContainerProperties(topic2);
@@ -432,7 +434,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
432434
adapter.start();
433435
ContainerTestUtils.waitForAssignment(container, 1);
434436

435-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
437+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
436438
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
437439
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
438440
template.setDefaultTopic(topic2);
@@ -490,8 +492,8 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
490492
}
491493

492494
@Test
493-
void testInboundJson() {
494-
Map<String, Object> props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test3", true);
495+
void testInboundJson(EmbeddedKafkaBroker embeddedKafka) {
496+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test3", true);
495497
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
496498
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
497499
ContainerProperties containerProps = new ContainerProperties(topic3);
@@ -506,7 +508,7 @@ void testInboundJson() {
506508
adapter.start();
507509
ContainerTestUtils.waitForAssignment(container, 1);
508510

509-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
511+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
510512
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
511513
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
512514
template.setDefaultTopic(topic3);
@@ -534,8 +536,8 @@ void testInboundJson() {
534536
}
535537

536538
@Test
537-
void testInboundJsonWithPayload() {
538-
Map<String, Object> props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test7", true);
539+
void testInboundJsonWithPayload(EmbeddedKafkaBroker embeddedKafka) {
540+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test7", true);
539541
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
540542
DefaultKafkaConsumerFactory<Integer, Foo> cf = new DefaultKafkaConsumerFactory<>(props);
541543
ContainerProperties containerProps = new ContainerProperties(topic6);
@@ -554,7 +556,7 @@ void testInboundJsonWithPayload() {
554556
adapter.start();
555557
ContainerTestUtils.waitForAssignment(container, 1);
556558

557-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
559+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
558560
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
559561
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
560562
template.setDefaultTopic(topic6);

0 commit comments

Comments
 (0)