This repository was archived by the owner on Mar 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 226
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
SslConfig With RdKafka Producer/Consumer Raises Exceptions #616
Copy link
Copy link
Closed
Labels
Description
I'm getting an exception when attempting to use an rdkafka producer configured with SSL using release 2.5.0.
KAFKA_HOSTS = "192.168.99.248:9093"
USE_RDKAFKA = True
TOPIC = "test"
ssl_config = SslConfig(cafile='/home/shaskell/cacert',
certfile='/home/shaskell/client-cert-signed',
keyfile='/home/shaskell/client-key',
password='test1234')
client = KafkaClient(hosts=KAFKA_HOSTS, ssl_config=ssl_config)
topic = client.topics[TOPIC]
producer = topic.get_producer(use_rdkafka=USE_RDKAFKA)
The code results in the following traceback
Traceback (most recent call last):
File "./kafka_gen.py", line 104, in <module>
main()
File "./kafka_gen.py", line 75, in main
producer = topic.get_producer(use_rdkafka=USE_RDKAFKA)
File "/home/shaskell/pykafka-2.5.0/pykafka/topic.py", line 95, in get_producer
return Cls(self._cluster, self, **kwargs)
File "/home/shaskell/pykafka-2.5.0/pykafka/rdkafka/producer.py", line 52, in __init__
super(RdKafkaProducer, self).__init__(**callargs)
File "/home/shaskell/pykafka-2.5.0/pykafka/producer.py", line 186, in __init__
self.start()
File "/home/shaskell/pykafka-2.5.0/pykafka/rdkafka/producer.py", line 61, in start
self._rdk_producer.configure(conf=conf)
pykafka.exceptions.RdKafkaException: No such configuration property: "ssl.key.location"
If I modify rdkafka/producer.py to print the conf at line 159 I can see that the config has the proper settings set by helpers.rdk_ssl_config.
{'batch.num.messages': 2000,
'broker.version.fallback': '0.9.0',
'client.id': 'pykafka.rdkafka',
'compression.codec': 'none',
'delivery.report.only.error': 'false',
'message.max.bytes': 1000012,
'message.send.max.retries': 3,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 5000,
'retry.backoff.ms': 100,
'security.protocol': 'ssl',
'socket.timeout.ms': 30000,
'ssl.ca.location': '/home/shaskell/cacert',
'ssl.certificate.location': '/home/shaskell/client-cert-signed',
'ssl.cipher.suites': 'DHE-DSS-AES256-GCM-SHA384',
'ssl.key.location': '/home/shaskell/client-key',
'ssl.key.password': 'test1234'}
If I try a similar config with the following consumer:
KAFKA_HOSTS = "192.168.99.248:9093"
USE_RDKAFKA = True
TOPIC = "test"
ssl_config = SslConfig(cafile='/home/shaskell/cacert',
certfile='/home/shaskell/client-cert-signed',
keyfile='/home/shaskell/client-key',
password='test1234')
client = KafkaClient(hosts=KAFKA_HOSTS, ssl_config=ssl_config)
topic = client.topics[TOPIC]
consumer = topic.get_simple_consumer(use_rdkafka=USE_RDKAFKA)
I get the following exception:
Traceback (most recent call last):
File "./kafka_gen.py", line 106, in <module>
main()
File "./kafka_gen.py", line 75, in main
consumer = topic.get_simple_consumer(use_rdkafka=USE_RDKAFKA)
File "/home/shaskell/pykafka-2.5.0/pykafka/topic.py", line 195, in get_simple_consumer
**kwargs)
File "/home/shaskell/pykafka-2.5.0/pykafka/rdkafka/simple_consumer.py", line 62, in __init__
super(RdKafkaSimpleConsumer, self).__init__(**callargs)
File "/home/shaskell/pykafka-2.5.0/pykafka/simpleconsumer.py", line 216, in __init__
self.start()
File "/home/shaskell/pykafka-2.5.0/pykafka/simpleconsumer.py", line 258, in start
self._fetch_workers = self._setup_fetch_workers()
File "/home/shaskell/pykafka-2.5.0/pykafka/rdkafka/simple_consumer.py", line 74, in _setup_fetch_workers
self._rdk_consumer.configure(conf=conf)
pykafka.exceptions.RdKafkaException: Invalid value for configuration property "security.protocol"
Here's what the config dictionary looks like:
{'broker.version.fallback': '0.9.0',
'client.id': 'pykafka.rdkafka',
'fetch.message.max.bytes': 1048576,
'fetch.min.bytes': 1,
'fetch.wait.max.ms': 100,
'queued.max.messages.kbytes': '102400000',
'queued.min.messages': 100000,
'receive.message.max.bytes': 2097152,
'security.protocol': 'ssl',
'socket.timeout.ms': 30000,
'ssl.ca.location': '/home/shaskell/cacert',
'ssl.certificate.location': '/home/shaskell/client-cert-signed',
'ssl.cipher.suites': 'DHE-DSS-AES256-GCM-SHA384',
'ssl.key.location': '/home/shaskell/client-key',
'ssl.key.password': 'test1234'}
'ssl' is clearly a valid value for security.protocol according to the librdkafka docs.