Skip to content

Commit 0dddc9a

Browse files
committed
feat(examples): add example using transactional ProducerContainer (#3)
Resolves: GH-3
1 parent 4b0308d commit 0dddc9a

File tree

3 files changed

+74
-7
lines changed

3 files changed

+74
-7
lines changed

clients/src/main/kotlin/io/streamthoughts/kafka/clients/producer/KafkaProducerContainer.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ class KafkaProducerContainer<K, V> private constructor(
7474
private lateinit var producer: Producer<K, V>
7575

7676
override fun send(
77-
pairs: Collection<Pair<K, V>>,
77+
records: Collection<Pair<K, V>>,
7878
topic: String?,
7979
partition: Int?,
8080
timestamp: Instant?,
8181
onSuccess: OnSendSuccessCallback<K, V>?,
8282
onError: OnSendErrorCallback<K, V>?
8383
): Future<List<SendResult<K?, V?>>> {
84-
val futures: List<CompletableFuture<SendResult<K?, V?>>> = pairs.map {
84+
val futures: List<CompletableFuture<SendResult<K?, V?>>> = records.map {
8585
send(it.first, it.second, topic, partition, timestamp, onSuccess, onError) as CompletableFuture
8686
}
8787
return CompletableFuture.allOf(*futures.toTypedArray()).thenApply { futures.map { it.join() }.toList() }

clients/src/main/kotlin/io/streamthoughts/kafka/clients/producer/ProducerContainer.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ interface ProducerContainer<K, V>: Closeable {
162162
}
163163

164164
/**
165-
* Asynchronously send the given key-value [pair] to the given [topic] (or the default one if null is given)
165+
* Asynchronously send the given key-value [record] to the given [topic] (or the default one if null is given)
166166
* and [partition] with the given [timestamp].
167167
*
168168
* Then, optionally invoke the specific given [onSuccess] callback when the record has been acknowledge.
@@ -171,17 +171,17 @@ interface ProducerContainer<K, V>: Closeable {
171171
* @see Producer.send
172172
* @return a [Future] of [SendResult]
173173
*/
174-
fun send(pair: Pair<K, V?>,
174+
fun send(record: Pair<K, V?>,
175175
topic: String? = null,
176176
partition: Int? = null,
177177
timestamp: Instant? = null,
178178
onSuccess: OnSendSuccessCallback<K, V>? = null,
179179
onError: OnSendErrorCallback<K, V>? = null) : Future<SendResult<K?, V?>> {
180-
return send(pair.first, pair.second, topic, partition, timestamp, onSuccess, onError)
180+
return send(record.first, record.second, topic, partition, timestamp, onSuccess, onError)
181181
}
182182

183183
/**
184-
* Asynchronously send all the given key-value [pairs] to the given [topic] (or the default one if null is given)
184+
* Asynchronously send all the given key-value [records] to the given [topic] (or the default one if null is given)
185185
* and [partition] with the given [timestamp].
186186
*
187187
* Then, optionally invoke the specific given [onSuccess] callback when the record has been acknowledge.
@@ -190,7 +190,7 @@ interface ProducerContainer<K, V>: Closeable {
190190
* @see Producer.send
191191
* @return a [Future] of [SendResult]
192192
*/
193-
fun send(pairs: Collection<Pair<K, V>>,
193+
fun send(records: Collection<Pair<K, V>>,
194194
topic: String? = null,
195195
partition: Int? = null,
196196
timestamp: Instant? = null,
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.client.examples
20+
21+
import io.streamthoughts.kafka.clients.loadProducerConfigs
22+
import io.streamthoughts.kafka.clients.producer.Acks
23+
import io.streamthoughts.kafka.clients.producer.KafkaProducerConfigs
24+
import io.streamthoughts.kafka.clients.producer.KafkaProducerContainer
25+
import org.apache.kafka.common.serialization.StringSerializer
26+
import kotlin.system.exitProcess
27+
28+
29+
fun main(args: Array<String>) {
30+
31+
if (args.size != 2) {
32+
println("Missing required command line arguments: configFile topic")
33+
exitProcess(1)
34+
}
35+
36+
val (config, topic) = args
37+
38+
39+
// Load properties from file and customize Producer config.
40+
val configs: KafkaProducerConfigs = loadProducerConfigs(config)
41+
.acks(Acks.Leader)
42+
.keySerializer(StringSerializer::class.java.name)
43+
.valueSerializer(StringSerializer::class.java.name)
44+
45+
val producer = KafkaProducerContainer.Builder<Any?, String>(configs)
46+
.defaultTopic(topic)
47+
.configure {
48+
transactionalId("my-tx-id")
49+
enableIdempotence(true)
50+
}
51+
.onSendSuccess { _, _, m ->
52+
println("Record was successfully sent (topic=${m.topic()}, partition=${m.partition()}, offset= ${m.offset()})")
53+
}
54+
.onSendError { _, _, e ->
55+
e.printStackTrace()
56+
}
57+
.build()
58+
59+
val messages = listOf("I ❤️ Logs", "Making Sense of Stream Processing", "Apache Kafka")
60+
61+
producer.use {
62+
producer.init()
63+
producer.runTx {
64+
messages.forEach{producer.send(value = it)}
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)