Skip to content

Commit 10f4f8b

Browse files
committed
refactor(clients): enhance configs classes
Additional changes: - update readme with new features
1 parent 0dddc9a commit 10f4f8b

File tree

14 files changed

+300
-118
lines changed

14 files changed

+300
-118
lines changed

README.md

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,40 @@ Just add **Kafka Clients for Kotlin** to the dependencies of your projects.
4646

4747
## Getting Started
4848

49-
### Kafka Producer
49+
### Writing messages to Kafka
5050

51-
See the full code-snippet : [ProducerExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ProducerKotlinDSLExample.kt)
51+
**Example: How to create `KafkaProducer` config ?**
52+
53+
```kotlin
54+
val configs = producerConfigsOf()
55+
.client { bootstrapServers("localhost:9092") }
56+
.acks(Acks.Leader)
57+
.keySerializer(StringSerializer::class.java.name)
58+
.valueSerializer(StringSerializer::class.java.name)
59+
```
60+
61+
**Example with standard `KafkaProducer` (i.e : using java `kafka-clients`)**
62+
63+
```kotlin
64+
val producer = KafkaProducer<String, String>(configs)
65+
66+
val messages = listOf("I ❤️ Logs", "Making Sense of Stream Processing", "Apache Kafka")
67+
producer.use {
68+
messages.forEach {value ->
69+
val record = ProducerRecord<String, String>(topic, value)
70+
producer.send(record) { m: RecordMetadata, e: Exception? ->
71+
when (e) {
72+
null -> println("Record was successfully sent (topic=${m.topic()}, partition=${m.partition()}, offset= ${m.offset()})")
73+
else -> e.printStackTrace()
74+
}
75+
}
76+
}
77+
}
78+
```
79+
80+
N.B: See the full source code: [ProducerClientExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ProducerClientExample.kt)
81+
82+
**Example with Kotlin DSL**
5283

5384
```kotlin
5485
val producer: ProducerContainer<String, String> = kafka("localhost:9092") {
@@ -65,24 +96,59 @@ val producer: ProducerContainer<String, String> = kafka("localhost:9092") {
6596

6697
defaultTopic("demo-topic")
6798

99+
onSendError {_, _, error ->
100+
e.printStackTrace()
101+
}
102+
68103
onSendSuccess{ _, _, metadata ->
69104
println("Record was sent successfully: topic=${metadata.topic()}, partition=${metadata.partition()}, offset=${metadata.offset()} ")
70105
}
71106
}
72107
}
73108

74-
with(producer) {
75-
init()
76-
listOf("I ❤️ Logs", "Making Sense of Stream Processing", "Apache Kafka").forEach {
77-
send(value = it)
78-
}
79-
close()
109+
val messages = listOf("I ❤️ Logs", "Making Sense of Stream Processing", "Apache Kafka")
110+
producer.use {
111+
producer.init() // create internal producer and call initTransaction() if `transactional.id` is set
112+
messages.forEach { producer.send(value = it) }
80113
}
81114
```
82115

83-
### Kafka Consumer
116+
N.B: See the full source code: [ProducerKotlinDSLExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ProducerKotlinDSLExample.kt)
117+
118+
### Consuming messages from a Kafka topic
119+
120+
121+
**Example: How to create `KafkaConsumer` config ?**
122+
123+
```kotlin
124+
val configs = consumerConfigsOf()
125+
.client { bootstrapServers("localhost:9092") }
126+
.groupId("demo-consumer-group")
127+
.keyDeserializer(StringDeserializer::class.java.name)
128+
.valueDeserializer(StringDeserializer::class.java.name)
129+
```
130+
131+
**Example with standard `KafkaConsumer` (i.e : using java `kafka-clients`)**
132+
133+
```kotlin
134+
val consumer = KafkaConsumer<String, String>(configs)
135+
136+
consumer.use {
137+
consumer.subscribe(listOf(topic))
138+
while(true) {
139+
consumer
140+
.poll(Duration.ofMillis(500))
141+
.forEach { record ->
142+
println(
143+
"Received record with key ${record.key()} " +
144+
"and value ${record.value()} from topic ${record.topic()} and partition ${record.partition()}"
145+
)
146+
}
147+
}
148+
}
149+
```
84150

85-
See the full code-snippet : [ConsumerExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ConsumerKotlinDSLExample.kt)
151+
N.B: See the full source code: [ConsumerClientExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ConsumerClientExample.kt)
86152

87153
```kotlin
88154
val consumerWorker: ConsumerWorker<String, String> = kafka("localhost:9092") {
@@ -97,7 +163,7 @@ val consumerWorker: ConsumerWorker<String, String> = kafka("localhost:9092") {
97163
autoOffsetReset(AutoOffsetReset.Earliest)
98164
}
99165

100-
onDeserializationError(DeserializationErrorHandlers.silentlyReplaceWithNull())
166+
onDeserializationError(silentlyReplaceWithNull())
101167

102168
onPartitionsAssigned { _: Consumer<*, *>, partitions ->
103169
println("Partitions assigned: $partitions")
@@ -111,12 +177,23 @@ val consumerWorker: ConsumerWorker<String, String> = kafka("localhost:9092") {
111177
println("consumed record-value: $value")
112178
}
113179

180+
onConsumedError(closeTaskOnConsumedError())
181+
114182
Runtime.getRuntime().addShutdownHook(Thread { run { stop() } })
115183
}
116184
}
117-
consumerWorker.start("topic-test", maxParallelHint = 4)
185+
186+
consumerWorker.use {
187+
consumerWorker.start("demo-topic", maxParallelHint = 4)
188+
runBlocking {
189+
println("All consumers started, waiting one minute before stopping")
190+
delay(Duration.ofMinutes(1).toMillis())
191+
}
192+
}
118193
```
119194

195+
N.B: See the full source code: [ConsumerKotlinDSLExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ConsumerKotlinDSLExample.kt)
196+
120197
## How to build project ?
121198

122199
Kafka Clients for Kotlin uses [maven-wrapper](https://github.com/takari/maven-wrapper).

clients/src/main/kotlin/io/streamthoughts/kafka/clients/Configs.kt

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,68 +28,68 @@ import kotlin.collections.HashMap
2828
* @see io.streamthoughts.kafka.clients.consumer.KafkaConsumerConfigs
2929
* @see io.streamthoughts.kafka.clients.producer.KafkaProducerConfigs
3030
*/
31-
open class Configs protected constructor(backed: Map<String, Any?> = emptyMap()) : MutableMap<String, Any?> {
31+
open class Configs protected constructor(props: Map<String, Any?> = emptyMap()) : MutableMap<String, Any?> {
3232

33-
private val mutableMap = HashMap(backed)
33+
private val props = HashMap(props)
3434

3535
override val entries: MutableSet<MutableMap.MutableEntry<String, Any?>>
36-
get() = mutableMap.entries
36+
get() = props.entries
3737

3838
override val keys: MutableSet<String>
39-
get() = mutableMap.keys
39+
get() = props.keys
4040

4141
override val size: Int
42-
get() = mutableMap.size
42+
get() = props.size
4343

4444
override val values: MutableCollection<Any?>
45-
get() = mutableMap.values
45+
get() = props.values
4646

4747
override fun containsKey(key: String): Boolean {
48-
return mutableMap.containsKey(key)
48+
return props.containsKey(key)
4949
}
5050

5151
override fun containsValue(value: Any?): Boolean {
52-
return mutableMap.containsValue(value)
52+
return props.containsValue(value)
5353
}
5454

5555
override fun get(key: String): Any? {
56-
return mutableMap[key]
56+
return props[key]
5757
}
5858

5959
override fun isEmpty(): Boolean {
60-
return mutableMap.isEmpty()
60+
return props.isEmpty()
6161
}
6262

6363
open fun with(key: String, value: Any?) = apply { this[key] = value }
6464

6565
operator fun set(key: String, value: Any?) {
66-
mutableMap[key] = value
66+
props[key] = value
6767
}
6868

6969
override fun equals(other: Any?): Boolean {
7070
if (this === other) return true
7171
if (other !is Configs) return false
7272

73-
if (mutableMap != other.mutableMap) return false
73+
if (props != other.props) return false
7474

7575
return true
7676
}
7777

7878
override fun hashCode(): Int {
79-
return mutableMap.hashCode()
79+
return props.hashCode()
8080
}
8181

8282
override fun toString(): String {
83-
return "Configs[$mutableMap]"
83+
return "Configs[$props]"
8484
}
8585

8686
override fun clear() {
87-
mutableMap.clear()
87+
props.clear()
8888
}
8989

90-
override fun put(key: String, value: Any?): Any? = mutableMap.put(key, value)
90+
override fun put(key: String, value: Any?): Any? = props.put(key, value)
9191

92-
override fun putAll(from: Map<out String, Any?>) = mutableMap.putAll(from)
92+
override fun putAll(from: Map<out String, Any?>) = props.putAll(from)
9393

94-
override fun remove(key: String): Any? = mutableMap.remove(key)
94+
override fun remove(key: String): Any? = props.remove(key)
9595
}

clients/src/main/kotlin/io/streamthoughts/kafka/clients/KafkaClientConfigs.kt

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,24 @@
1818
*/
1919
package io.streamthoughts.kafka.clients
2020

21-
import io.streamthoughts.kafka.clients.consumer.KafkaConsumerConfigs
22-
import io.streamthoughts.kafka.clients.producer.KafkaProducerConfigs
2321
import org.apache.kafka.clients.CommonClientConfigs
2422
import java.io.InputStream
2523
import java.util.Properties
24+
import kotlin.collections.HashMap
25+
import kotlin.collections.Map
26+
import kotlin.collections.emptyMap
27+
import kotlin.collections.joinToString
28+
import kotlin.collections.mutableMapOf
29+
2630

2731
open class KafkaClientConfigs constructor(props: Map<String, Any?> = emptyMap()): Configs(props) {
2832

29-
constructor(kafka : Kafka): this(mapOf(
30-
Pair(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers.joinToString())
31-
))
33+
constructor(kafka : Kafka): this(bootstrapServersConfig(kafka))
3234

3335
companion object {
34-
35-
/**
36-
* Creates a new [KafkaClientConfigs] with no properties.
37-
*/
38-
fun empty() = KafkaClientConfigs()
39-
40-
/**
41-
* Creates a new [KafkaClientConfigs] with the given [props].
42-
*/
43-
fun of(props: Map<String, Any?>) = KafkaClientConfigs(props)
44-
45-
/**
46-
* Creates a new [KafkaClientConfigs] with the given [props].
47-
*/
48-
fun of(props: Properties) = of(props.toStringMap())
36+
private fun bootstrapServersConfig(kafka: Kafka) = mutableMapOf<String, Any?>(
37+
Pair(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers.joinToString())
38+
)
4939
}
5040

5141
/**
@@ -72,35 +62,24 @@ open class KafkaClientConfigs constructor(props: Map<String, Any?> = emptyMap())
7262
/**
7363
* Convenient method to create and populate a new [KafkaClientConfigs] from a [configFile].
7464
*/
75-
fun loadClientConfigs(configFile: String): KafkaClientConfigs
76-
= KafkaClientConfigs().load(configFile)
65+
fun loadClientConfigs(configFile: String): KafkaClientConfigs = KafkaClientConfigs().load(configFile)
7766

7867
/**
7968
* Convenient method to create and populate a new [KafkaClientConfigs] from an [inputStream].
8069
*/
81-
fun loadClientConfigs(inputStream: InputStream): KafkaClientConfigs
82-
= KafkaClientConfigs().load(inputStream)
70+
fun loadClientConfigs(inputStream: InputStream): KafkaClientConfigs = KafkaClientConfigs().load(inputStream)
8371

8472
/**
85-
* Convenient method to create and populate a new [KafkaProducerConfigs] from a [configFile].
86-
*/
87-
fun loadProducerConfigs(configFile: String): KafkaProducerConfigs
88-
= KafkaProducerConfigs().load(configFile)
89-
90-
/**
91-
* Convenient method to create and populate a new [KafkaClientConfigs] from an [inputStream].
73+
* Creates a new [KafkaClientConfigs] with no properties.
9274
*/
93-
fun loadProducerConfigs(inputStream: InputStream): KafkaProducerConfigs
94-
= KafkaProducerConfigs().load(inputStream)
75+
fun emptyClientConfigs(): KafkaClientConfigs = KafkaClientConfigs()
9576

9677
/**
97-
* Convenient method to create and populate a new [KafkaConsumerConfigs] from a [configFile].
78+
* Creates a new [KafkaClientConfigs] with the given [props].
9879
*/
99-
fun loadConsumerConfigs(configFile: String): KafkaConsumerConfigs
100-
= KafkaConsumerConfigs().load(configFile)
80+
fun clientConfigsOf(props: Map<String, Any?>): KafkaClientConfigs = KafkaClientConfigs(HashMap(props))
10181

10282
/**
103-
* Convenient method to create and populate new [KafkaConsumerConfigs] from an [inputStream].
83+
* Creates a new [KafkaClientConfigs] with the given [props].
10484
*/
105-
fun loadConsumerConfigs(inputStream: InputStream): KafkaConsumerConfigs
106-
= KafkaConsumerConfigs().load(inputStream)
85+
fun clientConfigsOf(props: Properties): KafkaClientConfigs = clientConfigsOf(props.toStringMap())

clients/src/main/kotlin/io/streamthoughts/kafka/clients/KafkaClients.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
package io.streamthoughts.kafka.clients
2020

2121
import io.streamthoughts.kafka.clients.consumer.ConsumerWorker
22-
import io.streamthoughts.kafka.clients.consumer.KafkaConsumerConfigs
2322
import io.streamthoughts.kafka.clients.consumer.KafkaConsumerWorker
24-
import io.streamthoughts.kafka.clients.producer.KafkaProducerConfigs
23+
import io.streamthoughts.kafka.clients.consumer.consumerConfigsOf
2524
import io.streamthoughts.kafka.clients.producer.KafkaProducerContainer
2625
import io.streamthoughts.kafka.clients.producer.ProducerContainer
26+
import io.streamthoughts.kafka.clients.producer.producerConfigsOf
2727
import org.apache.kafka.common.serialization.Deserializer
2828

2929
/**
@@ -46,7 +46,7 @@ class KafkaClients(private val configs: KafkaClientConfigs) {
4646
keyDeserializer: Deserializer<K>,
4747
valueDeserializer: Deserializer<V>,
4848
init: KafkaConsumerWorker.Builder<K, V>.() -> Unit): ConsumerWorker<K, V> {
49-
val configs = KafkaConsumerConfigs(configs).groupId(groupId)
49+
val configs = consumerConfigsOf(configs).groupId(groupId)
5050
return KafkaConsumerWorker.Builder(configs, keyDeserializer, valueDeserializer).also(init).build()
5151
}
5252

@@ -56,7 +56,7 @@ class KafkaClients(private val configs: KafkaClientConfigs) {
5656
* @return a new [ProducerContainer] instance.
5757
*/
5858
fun<K, V> producer(init: ProducerContainer.Builder<K, V>.() -> Unit): ProducerContainer<K, V> {
59-
val configs = KafkaProducerConfigs(configs)
59+
val configs = producerConfigsOf(configs)
6060
return KafkaProducerContainer.Builder<K, V>(configs).also(init).build()
6161
}
6262
}

0 commit comments

Comments
 (0)