Skip to content

Commit bc08568

Browse files
committed
feat(clients): allow to load configs from properties (#4)
This commit refactors all classes for configuring producer/consumer. In addition, it adds helper functions/extensions to load config from Map, Properties and property files. Additional changes: - add new Extensions class - add new functions: loadClientConfigs, loadConsumerConfigs, loadProducerConfigs - fix tests for KafkaConsumerTaskTest Resolves: GH-4
1 parent 9ec26ee commit bc08568

File tree

15 files changed

+337
-152
lines changed

15 files changed

+337
-152
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.clients
20+
21+
import kotlin.collections.HashMap
22+
23+
24+
/**
25+
* The base class for client configuration.
26+
*
27+
* @see io.streamthoughts.kafka.clients.KafkaClientConfigs
28+
* @see io.streamthoughts.kafka.clients.consumer.KafkaConsumerConfigs
29+
* @see io.streamthoughts.kafka.clients.producer.KafkaProducerConfigs
30+
*/
31+
open class Configs protected constructor(backed: Map<String, Any?> = emptyMap()) : MutableMap<String, Any?> {
32+
33+
private val mutableMap = HashMap(backed)
34+
35+
override val entries: MutableSet<MutableMap.MutableEntry<String, Any?>>
36+
get() = mutableMap.entries
37+
38+
override val keys: MutableSet<String>
39+
get() = mutableMap.keys
40+
41+
override val size: Int
42+
get() = mutableMap.size
43+
44+
override val values: MutableCollection<Any?>
45+
get() = mutableMap.values
46+
47+
override fun containsKey(key: String): Boolean {
48+
return mutableMap.containsKey(key)
49+
}
50+
51+
override fun containsValue(value: Any?): Boolean {
52+
return mutableMap.containsValue(value)
53+
}
54+
55+
override fun get(key: String): Any? {
56+
return mutableMap[key]
57+
}
58+
59+
override fun isEmpty(): Boolean {
60+
return mutableMap.isEmpty()
61+
}
62+
63+
open fun with(key: String, value: Any?) = apply { this[key] = value }
64+
65+
operator fun set(key: String, value: Any?) {
66+
mutableMap[key] = value
67+
}
68+
69+
override fun equals(other: Any?): Boolean {
70+
if (this === other) return true
71+
if (other !is Configs) return false
72+
73+
if (mutableMap != other.mutableMap) return false
74+
75+
return true
76+
}
77+
78+
override fun hashCode(): Int {
79+
return mutableMap.hashCode()
80+
}
81+
82+
override fun toString(): String {
83+
return "Configs[$mutableMap]"
84+
}
85+
86+
override fun clear() {
87+
mutableMap.clear()
88+
}
89+
90+
override fun put(key: String, value: Any?): Any? = mutableMap.put(key, value)
91+
92+
override fun putAll(from: Map<out String, Any?>) = mutableMap.putAll(from)
93+
94+
override fun remove(key: String): Any? = mutableMap.remove(key)
95+
}

clients/src/main/kotlin/io/streamthoughts/kafka/clients/Configure.kt renamed to clients/src/main/kotlin/io/streamthoughts/kafka/clients/Extensions.kt

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,23 @@
1818
*/
1919
package io.streamthoughts.kafka.clients
2020

21-
interface Configure {
21+
import java.io.FileInputStream
22+
import java.io.InputStream
23+
import java.util.Properties
2224

23-
fun with(key: String, value: Any?)
25+
/**
26+
* Convenient method to transform a [Properties] to a [Map] of string keys.
27+
*/
28+
fun Properties.toStringMap(): Map<String, Any?> = this.map { (k, v) -> Pair(k.toString(), v) }.toMap()
29+
30+
/**
31+
* Convenient method to load config properties from the given [configFile].
32+
*/
33+
fun <T:Configs> T.load(configFile: String): T =
34+
apply { FileInputStream(configFile).use { load(it) } }
2435

25-
/**
26-
* @return the configs properties as [Map].
27-
*/
28-
fun asMap(): Map<String, Any?>
29-
}
36+
/**
37+
* Convenient method to load config properties from the given [inputStream].
38+
*/
39+
fun <T:Configs> T.load(inputStream: InputStream): T =
40+
apply { putAll((Properties().apply { load(inputStream) }).toStringMap()) }

clients/src/main/kotlin/io/streamthoughts/kafka/clients/KafkaClientConfigs.kt

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,89 @@
1818
*/
1919
package io.streamthoughts.kafka.clients
2020

21+
import io.streamthoughts.kafka.clients.consumer.KafkaConsumerConfigs
22+
import io.streamthoughts.kafka.clients.producer.KafkaProducerConfigs
2123
import org.apache.kafka.clients.CommonClientConfigs
24+
import java.io.InputStream
25+
import java.util.Properties
2226

23-
data class KafkaClientConfigs (
24-
var kafka : Kafka,
25-
var clientId: String? = null
26-
) : Configure {
27+
open class KafkaClientConfigs constructor(props: Map<String, Any?> = emptyMap()): Configs(props) {
2728

28-
private val configs = HashMap<String, Any?>()
29+
constructor(kafka : Kafka): this(mapOf(
30+
Pair(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers.joinToString())
31+
))
2932

30-
init {
31-
configs[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers.joinToString()
32-
}
33+
companion object {
3334

34-
override fun with(key: String, value: Any?) {
35-
configs[key] = value
36-
}
35+
/**
36+
* Creates a new [KafkaClientConfigs] with no properties.
37+
*/
38+
fun empty() = KafkaClientConfigs()
3739

38-
fun clientId(clientId: String) = apply { this.clientId = clientId }
40+
/**
41+
* Creates a new [KafkaClientConfigs] with the given [props].
42+
*/
43+
fun of(props: Map<String, Any?>) = KafkaClientConfigs(props)
3944

40-
override fun asMap(): Map<String, Any?> {
41-
val configs = HashMap<String, Any?>(this.configs)
42-
clientId?.let { configs[CommonClientConfigs.CLIENT_ID_CONFIG] = clientId }
43-
return configs
45+
/**
46+
* Creates a new [KafkaClientConfigs] with the given [props].
47+
*/
48+
fun of(props: Properties) = of(props.toStringMap())
4449
}
45-
}
50+
51+
/**
52+
* @see [CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG]
53+
*/
54+
fun bootstrapServers(bootstrapServers: Array<String>) =
55+
apply { this[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers.joinToString() }
56+
57+
/**
58+
* @see [CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG]
59+
*/
60+
fun bootstrapServers(bootstrapServers: String) =
61+
apply { this[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers }
62+
63+
/**
64+
* @see [CommonClientConfigs.CLIENT_ID_CONFIG]
65+
*/
66+
fun clientId(clientId: String) =
67+
apply { this[CommonClientConfigs.CLIENT_ID_CONFIG] = clientId }
68+
69+
override fun with(key: String, value: Any?) = apply { super.with(key, value) }
70+
}
71+
72+
/**
73+
* Convenient method to create and populate a new [KafkaClientConfigs] from a [configFile].
74+
*/
75+
fun loadClientConfigs(configFile: String): KafkaClientConfigs
76+
= KafkaClientConfigs().load(configFile)
77+
78+
/**
79+
* Convenient method to create and populate a new [KafkaClientConfigs] from an [inputStream].
80+
*/
81+
fun loadClientConfigs(inputStream: InputStream): KafkaClientConfigs
82+
= KafkaClientConfigs().load(inputStream)
83+
84+
/**
85+
* Convenient method to create and populate a new [KafkaProducerConfigs] from a [configFile].
86+
*/
87+
fun loadProducerConfigs(configFile: String): KafkaProducerConfigs
88+
= KafkaProducerConfigs().load(configFile)
89+
90+
/**
91+
* Convenient method to create and populate a new [KafkaClientConfigs] from an [inputStream].
92+
*/
93+
fun loadProducerConfigs(inputStream: InputStream): KafkaProducerConfigs
94+
= KafkaProducerConfigs().load(inputStream)
95+
96+
/**
97+
* Convenient method to create and populate a new [KafkaConsumerConfigs] from a [configFile].
98+
*/
99+
fun loadConsumerConfigs(configFile: String): KafkaConsumerConfigs
100+
= KafkaConsumerConfigs().load(configFile)
101+
102+
/**
103+
* Convenient method to create and populate new [KafkaConsumerConfigs] from an [inputStream].
104+
*/
105+
fun loadConsumerConfigs(inputStream: InputStream): KafkaConsumerConfigs
106+
= KafkaConsumerConfigs().load(inputStream)

clients/src/main/kotlin/io/streamthoughts/kafka/clients/KafkaClients.kt

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,37 @@ import io.streamthoughts.kafka.clients.producer.KafkaProducerContainer
2626
import io.streamthoughts.kafka.clients.producer.ProducerContainer
2727
import org.apache.kafka.common.serialization.Deserializer
2828

29-
class KafkaClients(val kafka: Kafka) {
30-
31-
private val client: KafkaClientConfigs = KafkaClientConfigs(kafka)
29+
/**
30+
* [KafkaClients] DSL for building either a new consumer or producer kafka client.
31+
*/
32+
class KafkaClients(private val configs: KafkaClientConfigs) {
3233

33-
fun client(init: KafkaClientConfigs.() -> Unit) {
34-
client.init()
35-
}
34+
/**
35+
* Configures the commons configuration for Kafka Client.
36+
*/
37+
fun client(init: KafkaClientConfigs.() -> Unit) : Unit = configs.init()
3638

39+
/**
40+
* Creates and configures a new [KafkaConsumerWorker] using the given [init] function
41+
* for the given [groupId], [keyDeserializer] and [valueDeserializer]
42+
*
43+
* @return a new [KafkaConsumerWorker] instance.
44+
*/
3745
fun <K, V> consumer(groupId: String,
3846
keyDeserializer: Deserializer<K>,
3947
valueDeserializer: Deserializer<V>,
4048
init: KafkaConsumerWorker.Builder<K, V>.() -> Unit): ConsumerWorker<K, V> {
41-
val configs = KafkaConsumerConfigs(client).groupId(groupId)
49+
val configs = KafkaConsumerConfigs(configs).groupId(groupId)
4250
return KafkaConsumerWorker.Builder(configs, keyDeserializer, valueDeserializer).also(init).build()
4351
}
4452

53+
/**
54+
* Creates and configures a new [ProducerContainer] using the given [init] function.
55+
*
56+
* @return a new [ProducerContainer] instance.
57+
*/
4558
fun<K, V> producer(init: ProducerContainer.Builder<K, V>.() -> Unit): ProducerContainer<K, V> {
46-
val configs = KafkaProducerConfigs(client)
59+
val configs = KafkaProducerConfigs(configs)
4760
return KafkaProducerContainer.Builder<K, V>(configs).also(init).build()
4861
}
4962
}
@@ -52,10 +65,10 @@ fun <R> kafka(bootstrapServer: String, init: KafkaClients.() -> R): R =
5265
kafka(arrayOf(bootstrapServer), init)
5366

5467
fun <R> kafka(bootstrapServers: Array<String>, init: KafkaClients.() -> R): R =
55-
with(KafkaClients(Kafka(bootstrapServers)), init)
68+
kafka(KafkaClientConfigs(Kafka(bootstrapServers)), init)
5669

57-
fun <R> with(kafka: Kafka, init: KafkaClients.() -> R): R =
58-
with(KafkaClients(kafka), init)
70+
fun <R> kafka(kafka: Kafka, init: KafkaClients.() -> R): R =
71+
kafka(KafkaClientConfigs(kafka), init)
5972

60-
fun <R> with(kafkaClient: KafkaClients, init: KafkaClients.() -> R): R =
61-
kafkaClient.init()
73+
fun <R> kafka(configs: KafkaClientConfigs, init: KafkaClients.() -> R): R =
74+
KafkaClients(configs).init()

0 commit comments

Comments
 (0)