Skip to content

Commit bb77b94

Browse files
authored
BAEL-7164: Avro Magic Byte (#18631)
* BAEL-7164: sample app * BAEL-7164: error handling deser + dlq * BAEL-7164: use service connection * BAEL-7164: extract test helper * BAEL-7164: use getter * BAEL-7164: code reivew
1 parent b8b94ed commit bb77b94

File tree

6 files changed

+237
-0
lines changed

6 files changed

+237
-0
lines changed

spring-kafka-4/pom.xml

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@
3232
<artifactId>spring-boot-starter-actuator</artifactId>
3333
</dependency>
3434

35+
<dependency>
36+
<groupId>org.apache.avro</groupId>
37+
<artifactId>avro</artifactId>
38+
<version>${apache.avro.version}</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>io.confluent</groupId>
42+
<artifactId>kafka-avro-serializer</artifactId>
43+
<version>${kafka-avro-serializer.version}</version>
44+
</dependency>
3545
<!-- test dependencies -->
3646
<dependency>
3747
<groupId>org.springframework.boot</groupId>
@@ -62,6 +72,48 @@
6272

6373
<build>
6474
<plugins>
75+
<plugin>
76+
<groupId>org.apache.avro</groupId>
77+
<artifactId>avro-maven-plugin</artifactId>
78+
<version>${apache.avro.version}</version>
79+
<configuration>
80+
<stringType>String</stringType>
81+
</configuration>
82+
<executions>
83+
<execution>
84+
<phase>generate-sources</phase>
85+
<goals>
86+
<goal>schema</goal>
87+
</goals>
88+
<configuration>
89+
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
90+
<includes>
91+
<include>*.avsc</include>
92+
</includes>
93+
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
94+
</configuration>
95+
</execution>
96+
</executions>
97+
</plugin>
98+
<plugin>
99+
<groupId>org.codehaus.mojo</groupId>
100+
<artifactId>build-helper-maven-plugin</artifactId>
101+
<version>${build-helper-maven-plugin.version}</version>
102+
<executions>
103+
<execution>
104+
<id>add-source</id>
105+
<phase>generate-sources</phase>
106+
<goals>
107+
<goal>add-source</goal>
108+
</goals>
109+
<configuration>
110+
<sources>
111+
<source>${project.build.directory}/generated-sources/avro</source>
112+
</sources>
113+
</configuration>
114+
</execution>
115+
</executions>
116+
</plugin>
65117
<plugin>
66118
<groupId>org.springframework.boot</groupId>
67119
<artifactId>spring-boot-maven-plugin</artifactId>
@@ -75,6 +127,16 @@
75127
<properties>
76128
<java.version>21</java.version>
77129
<spring-boot.version>3.4.4</spring-boot.version>
130+
<apache.avro.version>1.12.0</apache.avro.version>
131+
<kafka-avro-serializer.version>7.9.1</kafka-avro-serializer.version>
132+
<build-helper-maven-plugin.version>3.2.0</build-helper-maven-plugin.version>
78133
</properties>
79134

135+
<repositories>
136+
<repository>
137+
<id>confluent</id>
138+
<url>https://packages.confluent.io/maven/</url>
139+
</repository>
140+
</repositories>
141+
80142
</project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.baeldung.avro.deserialization.exception;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.boot.autoconfigure.SpringBootApplication;
9+
import org.springframework.boot.builder.SpringApplicationBuilder;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.kafka.annotation.KafkaListener;
12+
13+
import com.baeldung.avro.deserialization.exception.avro.Article;
14+
15+
@SpringBootApplication
16+
class AvroMagicByteApp {
17+
18+
private static final Logger LOG = LoggerFactory.getLogger(AvroMagicByteApp.class);
19+
20+
private final List<String> blog = new ArrayList<>();
21+
22+
public static void main(String[] args) {
23+
new SpringApplicationBuilder().sources(AvroMagicByteApp.class)
24+
.profiles("avro-magic-byte")
25+
.run(args);
26+
}
27+
28+
@KafkaListener(topics = "baeldung.article.published")
29+
public void listen(Article article) {
30+
LOG.info("a new article was published: {}", article);
31+
blog.add(article.getTitle());
32+
}
33+
34+
public List<String> getBlog() {
35+
return blog;
36+
}
37+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.baeldung.avro.deserialization.exception;
2+
3+
import java.util.Map;
4+
5+
import org.apache.kafka.clients.producer.ProducerConfig;
6+
import org.apache.kafka.common.serialization.ByteArraySerializer;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.kafka.core.ProducerFactory;
11+
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
12+
import org.springframework.kafka.listener.DefaultErrorHandler;
13+
14+
@Configuration
15+
class DlqConfig {
16+
17+
@Bean
18+
DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqPublishingRecoverer) {
19+
return new DefaultErrorHandler(dlqPublishingRecoverer);
20+
}
21+
22+
@Bean
23+
DeadLetterPublishingRecoverer dlqPublishingRecoverer(KafkaTemplate<byte[], byte[]> bytesKafkaTemplate) {
24+
return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
25+
}
26+
27+
@Bean("bytesKafkaTemplate")
28+
KafkaTemplate<?, ?> bytesTemplate(ProducerFactory<?, ?> kafkaProducerFactory) {
29+
return new KafkaTemplate<>(kafkaProducerFactory, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()));
30+
}
31+
32+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
2+
spring:
3+
kafka:
4+
# bootstrap-servers <-- it'll be injected in test via Testcontainers and @ServiceConnection
5+
consumer:
6+
group-id: test-group
7+
auto-offset-reset: earliest
8+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
9+
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
10+
properties:
11+
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
12+
schema.registry.url: mock://test
13+
specific.avro.reader: true
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"type": "record",
3+
"name": "Article",
4+
"namespace": "com.baeldung.avro.deserialization.exception.avro",
5+
"fields": [
6+
{ "name": "title", "type": "string" },
7+
{ "name": "author", "type": "string" },
8+
{ "name": "tags", "type": { "type": "array", "items": "string" } }
9+
]
10+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.baeldung.avro.deserialization.exception;
2+
3+
import static java.time.Duration.ofSeconds;
4+
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
5+
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
6+
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
7+
import static org.assertj.core.api.Assertions.assertThat;
8+
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
9+
10+
import java.time.Duration;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
import org.apache.kafka.clients.consumer.ConsumerRecord;
15+
import org.apache.kafka.common.serialization.StringSerializer;
16+
import org.junit.jupiter.api.Test;
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.boot.test.context.SpringBootTest;
19+
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
20+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
21+
import org.springframework.kafka.core.KafkaTemplate;
22+
import org.springframework.kafka.test.utils.KafkaTestUtils;
23+
import org.springframework.test.context.ActiveProfiles;
24+
import org.testcontainers.junit.jupiter.Container;
25+
import org.testcontainers.kafka.KafkaContainer;
26+
import org.testcontainers.utility.DockerImageName;
27+
28+
import com.baeldung.avro.deserialization.exception.avro.Article;
29+
30+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
31+
32+
@SpringBootTest
33+
@ActiveProfiles("avro-magic-byte")
34+
class AvroMagicByteLiveTest {
35+
36+
@Container
37+
@ServiceConnection
38+
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:4.0.0"));
39+
40+
@Autowired
41+
private AvroMagicByteApp listener;
42+
43+
@Test
44+
void whenSendingCorrectArticle_thenItsAddedToTheBlog() throws Exception {
45+
avroKafkaTemplate().send("baeldung.article.published", aTestArticle("Avro Magic Byte"))
46+
.get();
47+
48+
await().untilAsserted(() -> assertThat(listener.getBlog()).containsExactly("Avro Magic Byte"));
49+
}
50+
51+
@Test
52+
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
53+
stringKafkaTemplate().send("baeldung.article.published", "not a valid avro message!")
54+
.get();
55+
56+
var dlqRecord = listenForOneMessage("baeldung.article.published-dlt", ofSeconds(5L));
57+
58+
assertThat(dlqRecord.value()).isEqualTo("not a valid avro message!");
59+
}
60+
61+
private static KafkaTemplate<Object, Object> avroKafkaTemplate() {
62+
return new KafkaTemplate<>(kafkaProducerFactory());
63+
}
64+
65+
private static KafkaTemplate<Object, Object> stringKafkaTemplate() {
66+
return new KafkaTemplate<>(kafkaProducerFactory(), Map.of(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
67+
}
68+
69+
private static DefaultKafkaProducerFactory<Object, Object> kafkaProducerFactory() {
70+
return new DefaultKafkaProducerFactory<>(
71+
Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
72+
VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName(), "schema.registry.url", "mock://test"));
73+
}
74+
75+
private static ConsumerRecord<?, ?> listenForOneMessage(String topic, Duration timeout) {
76+
return KafkaTestUtils.getOneRecord(kafka.getBootstrapServers(), "test-group-id", topic, 0, false, true, timeout);
77+
}
78+
79+
private static Article aTestArticle(String title) {
80+
return new Article(title, "John Doe", List.of("avro", "kafka", "spring"));
81+
}
82+
83+
}

0 commit comments

Comments
 (0)