Skip to content

Commit 2dc3435

Browse files
committed
feat(clients): add examples for producer/consumer clients
1 parent bc08568 commit 2dc3435

File tree

5 files changed

+120
-2
lines changed

5 files changed

+120
-2
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ Just add **Kafka Clients for Kotlin** to the dependencies of your projects.
4848

4949
### Kafka Producer
5050

51-
See the full code-snippet : [ProducerExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ProducerExample.kt)
51+
See the full code-snippet : [ProducerExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ProducerKotlinDSLExample.kt)
5252

5353
```kotlin
5454
val producer: ProducerContainer<String, String> = kafka("localhost:9092") {
@@ -82,7 +82,7 @@ with(producer) {
8282

8383
### Kafka Consumer
8484

85-
See the full code-snippet : [ConsumerExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ConsumerExample.kt)
85+
See the full code-snippet : [ConsumerExample.kt](https://github.com/streamthoughts/kafka-clients-kotlin/blob/master/examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ConsumerKotlinDSLExample.kt)
8686

8787
```kotlin
8888
val consumerWorker: ConsumerWorker<String, String> = kafka("localhost:9092") {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.client.examples
20+
21+
import io.streamthoughts.kafka.clients.consumer.KafkaConsumerConfigs
22+
import io.streamthoughts.kafka.clients.loadConsumerConfigs
23+
import org.apache.kafka.clients.consumer.KafkaConsumer
24+
import org.apache.kafka.common.serialization.StringDeserializer
25+
import java.time.Duration
26+
import kotlin.system.exitProcess
27+
28+
fun main(args: Array<String>) {
29+
30+
if (args.size != 2) {
31+
println("Missing required command line arguments: configFile topic")
32+
exitProcess(1)
33+
}
34+
35+
val (config, topic) = args
36+
37+
// Load properties from file and customize Consumer config.
38+
val configs: KafkaConsumerConfigs = loadConsumerConfigs(config)
39+
.keyDeserializer(StringDeserializer::class.java.name)
40+
.valueDeserializer(StringDeserializer::class.java.name)
41+
42+
val consumer = KafkaConsumer<String, String>(configs)
43+
44+
consumer.use {
45+
consumer.subscribe(listOf(topic))
46+
while(true) {
47+
consumer
48+
.poll(Duration.ofMillis(500))
49+
.forEach { record ->
50+
println(
51+
"Received record with key ${record.key()} " +
52+
"and value ${record.value()} from topic ${record.topic()} and partition ${record.partition()}"
53+
)
54+
}
55+
}
56+
}
57+
}
File renamed without changes.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.client.examples
20+
21+
import io.streamthoughts.kafka.clients.loadProducerConfigs
22+
import io.streamthoughts.kafka.clients.producer.Acks
23+
import io.streamthoughts.kafka.clients.producer.KafkaProducerConfigs
24+
import org.apache.kafka.clients.producer.KafkaProducer
25+
import org.apache.kafka.clients.producer.ProducerRecord
26+
import org.apache.kafka.clients.producer.RecordMetadata
27+
import org.apache.kafka.common.serialization.StringSerializer
28+
import kotlin.system.exitProcess
29+
30+
fun main(args: Array<String>) {
31+
32+
if (args.size != 2) {
33+
println("Missing required command line arguments: configFile topic")
34+
exitProcess(1)
35+
}
36+
37+
val (config, topic) = args
38+
39+
40+
// Load properties from file and customize Producer config.
41+
val configs: KafkaProducerConfigs = loadProducerConfigs(config)
42+
.acks(Acks.Leader)
43+
.keySerializer(StringSerializer::class.java.name)
44+
.valueSerializer(StringSerializer::class.java.name)
45+
46+
val producer = KafkaProducer<String, String>(configs)
47+
48+
val messages = listOf("I ❤️ Logs", "Making Sense of Stream Processing", "Apache Kafka")
49+
50+
producer.use {
51+
messages.forEach {value ->
52+
val record = ProducerRecord<String, String>(topic, value)
53+
producer.send(record) { m: RecordMetadata, e: Exception? ->
54+
when (e) {
55+
null -> println("Record was successfully sent (topic=${m.topic()}, partition=${m.partition()}, offset= ${m.offset()})")
56+
else -> e.printStackTrace()
57+
}
58+
}
59+
}
60+
}
61+
}
File renamed without changes.

0 commit comments

Comments
 (0)