Skip to content

Commit 2ea3b4e

Browse files
committed
fix(tests): close temporary consumer
1 parent f54663c commit 2ea3b4e

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

tests/src/main/kotlin/io/streamthoughts/kafka/tests/TestingEmbeddedKafka.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -217,16 +217,16 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
217217
valueDeserializer: Deserializer<V>? = null,
218218
consumerConfig: Map<String, Any?> = emptyMap()): List<ConsumerRecord<K, V>> {
219219

220-
val consumer = consumerClient(consumerConfig, keyDeserializer, valueDeserializer)
221-
consumer.subscribe(listOf(topic))
222-
val records : MutableList<ConsumerRecord<K, V>> = mutableListOf()
223-
224-
val begin = System.currentTimeMillis()
225-
while ( (System.currentTimeMillis() - begin) < timeout.toMillis() && records.size < expectedNumRecords) {
226-
consumer.poll(Duration.ofMillis(100)).forEach { records.add(it) }
227-
}
228-
return records
220+
consumerClient(consumerConfig, keyDeserializer, valueDeserializer).use { client ->
221+
client.subscribe(listOf(topic))
222+
val records: MutableList<ConsumerRecord<K, V>> = mutableListOf()
229223

224+
val begin = System.currentTimeMillis()
225+
while ((System.currentTimeMillis() - begin) < timeout.toMillis() && records.size < expectedNumRecords) {
226+
client.poll(Duration.ofMillis(100)).forEach { records.add(it) }
227+
}
228+
return records
229+
}
230230
}
231231

232232
/**

0 commit comments

Comments
 (0)