Skip to content

Commit 4b0308d

Browse files
committed
feat(clients): make ConsumerTask/ProducerContainer Closeaable (#5)
Resolves: GH-5
1 parent 2dc3435 commit 4b0308d

File tree

10 files changed

+25
-23
lines changed

10 files changed

+25
-23
lines changed

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/ConsumerTask.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ package io.streamthoughts.kafka.clients.consumer
2121
import org.apache.kafka.clients.consumer.Consumer
2222
import org.apache.kafka.clients.consumer.OffsetAndMetadata
2323
import org.apache.kafka.common.TopicPartition
24+
import java.io.Closeable
2425
import java.time.Duration
2526

26-
interface ConsumerTask{
27+
interface ConsumerTask: Closeable {
2728

2829
enum class State {
2930
/**
@@ -78,13 +79,13 @@ interface ConsumerTask{
7879
* Shutdowns the [ConsumerTask] and wait for completion.
7980
* @see org.apache.kafka.clients.consumer.Consumer.close
8081
*/
81-
fun shutdown()
82+
override fun close()
8283

8384
/**
8485
* Shutdowns the [ConsumerTask] and wait for completion until the given [timeout].
8586
* @see org.apache.kafka.clients.consumer.Consumer.close
8687
*/
87-
fun shutdown(timeout: Duration)
88+
fun close(timeout: Duration)
8889

8990
/**
9091
* @return the [State] of this [ConsumerTask].

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/ConsumerWorker.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ package io.streamthoughts.kafka.clients.consumer
2121
import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandler
2222
import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandler
2323
import io.streamthoughts.kafka.clients.consumer.listener.ConsumerBatchRecordsListener
24+
import java.io.Closeable
25+
import java.time.Duration
2426
import java.util.regex.Pattern
2527

2628
/**
2729
* The [ConsumerWorker] manages one or many concurrent [org.apache.kafka.clients.consumer.Consumer] that belong
2830
* to the same {@code group.id}.
2931
*/
30-
interface ConsumerWorker<K, V> {
32+
interface ConsumerWorker<K, V>: Closeable {
3133

3234

3335
interface Builder<K, V> {
@@ -115,7 +117,7 @@ interface ConsumerWorker<K, V> {
115117
/**
116118
* Stops all [org.apache.kafka.clients.consumer.Consumer] managed by this [ConsumerWorker].
117119
*/
118-
fun stop()
120+
override fun close()
119121

120122
/**
121123
* Pauses all [org.apache.kafka.clients.consumer.Consumer] managed by this [ConsumerWorker].

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/KafkaConsumerTask.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,14 +221,14 @@ class KafkaConsumerTask<K, V>(
221221
}
222222
}
223223

224-
override fun shutdown() {
224+
override fun close() {
225225
logWithConsumerInfo(Level.INFO, "Closing")
226226
isShutdown.set(true)
227227
consumer.wakeup()
228228
shutdownLatch.await()
229229
}
230230

231-
override fun shutdown(timeout: Duration) {
231+
override fun close(timeout: Duration) {
232232
logWithConsumerInfo(Level.INFO, "Closing")
233233
isShutdown.set(true)
234234
consumer.wakeup()

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/KafkaConsumerWorker.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ class KafkaConsumerWorker<K, V> (
139139
consumerJobs.joinAll()
140140
}
141141

142-
override fun stop() {
142+
override fun close() {
143143
if (isRunning.get()) {
144144
Log.info("KafkaConsumerWorker(group: $groupId): Stopping all io.streamthoughts.kafka.clients.consumer tasks")
145-
consumerTasks.forEach { it.shutdown() }
145+
consumerTasks.forEach { it.close() }
146146
isRunning.set(false)
147147
}
148148
}

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/error/ConsumedErrorHandlers.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private object CloseTaskOnConsumedError: ConsumedErrorHandler {
4343

4444
override fun handle(consumerTask: ConsumerTask, records: List<ConsumerRecord<*, *>>, thrownException: Exception) {
4545
Log.error("Stopping consumerTask after an exception was thrown while processing records", thrownException)
46-
consumerTask.shutdown(Duration.ZERO)
46+
consumerTask.close(Duration.ZERO)
4747
}
4848
}
4949

clients/src/main/kotlin/io/streamthoughts/kafka/clients/producer/KafkaProducerContainer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ class KafkaProducerContainer<K, V> private constructor(
199199
}
200200

201201
override fun close(timeout: Duration) {
202-
if (isClosed()) return // silently ignore call if producer is already closed.
202+
if (isClosed() || !isInitialized()) return // silently ignore call if producer is already closed.
203203

204204
runOrThrowIfIllegalState {
205205
state = ProducerContainer.State.PENDING_SHUTDOWN

clients/src/main/kotlin/io/streamthoughts/kafka/clients/producer/ProducerContainer.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.kafka.common.Metric
2828
import org.apache.kafka.common.MetricName
2929
import org.apache.kafka.common.PartitionInfo
3030
import org.apache.kafka.common.serialization.Serializer
31+
import java.io.Closeable
3132
import java.time.Duration
3233
import java.time.Instant
3334
import java.util.concurrent.Future
@@ -52,7 +53,7 @@ data class UnrecoverableErrorTransactionResult( val exception: Exception): Trans
5253
*/
5354
object CommittedTransactionResult: TransactionResult()
5455

55-
interface ProducerContainer<K, V> {
56+
interface ProducerContainer<K, V>: Closeable {
5657

5758
enum class State {
5859
/**
@@ -261,7 +262,7 @@ interface ProducerContainer<K, V> {
261262
*
262263
* @see [Producer.close].
263264
*/
264-
fun close() {
265+
override fun close() {
265266
close(Duration.ofMillis(Long.MAX_VALUE))
266267
}
267268

clients/src/test/kotlin/io/streamthoughts/kafka/clients/consumer/KafkaConsumerTaskTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class KafkaConsumerTaskTest(private val cluster: TestingEmbeddedKafka) {
114114
try {
115115
captureHandler.assertThatEventuallyCapture("Fail to capture processing error before timeout")
116116
} finally {
117-
consumer.shutdown()
117+
consumer.close()
118118
}
119119
}
120120
}

examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ConsumerKotlinDSLExample.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,11 @@ fun main(args: Array<String>) {
6565
}
6666
}
6767

68-
with (consumerWorker) {
69-
start("demo-topic", maxParallelHint = 4)
68+
consumerWorker.use {
69+
consumerWorker.start("demo-topic", maxParallelHint = 4)
7070
runBlocking {
7171
println("All consumers started, waiting one minute before stopping")
7272
delay(Duration.ofMinutes(1).toMillis())
73-
stop()
7473
}
7574
}
7675
}

examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ProducerKotlinDSLExample.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package io.streamthoughts.kafka.client.examples
2020

2121
import io.streamthoughts.kafka.clients.kafka
22-
import io.streamthoughts.kafka.clients.loadClientConfigs
2322
import io.streamthoughts.kafka.clients.producer.Acks
2423
import io.streamthoughts.kafka.clients.producer.ProducerContainer
2524
import io.streamthoughts.kafka.clients.producer.callback.closeOnErrorProducerSendCallback
@@ -49,11 +48,11 @@ fun main(args: Array<String>) {
4948
}
5049
}
5150

52-
with(producer) {
53-
init()
54-
listOf("I ❤️ Logs", "Making Sense of Stream Processing", "Apache Kafka").forEach {
55-
send(value = it)
51+
val messages = listOf("I ❤️ Logs", "Making Sense of Stream Processing", "Apache Kafka")
52+
producer.use {
53+
producer.init() // create internal KafkaProducer and eventually call initTransaction if transactional.id is set
54+
messages.forEach {
55+
producer.send(value = it)
5656
}
57-
close()
5857
}
5958
}

0 commit comments

Comments
 (0)