Skip to content

Commit 720925f

Browse files
Ability to enable/disable Auto Topic Creation in Kafka input plugin. (#172)
The Logstash Kafka input plugin should have the ability to enable/disable the auto creation of topics from the Consumer (logstash). Users who have enabled auto topic creation in Kafka so that topics are automatically created when a event is produced should also be allowed to enable OR disable the same from Logstash's end. The downside of this autocreation function is that topics can also be created by a consumers. This makes it difficult to remove topics that no longer produce events. There wasn't a property for this in the logstash's Kafka input plugin documentation. Documentation ref: https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation
1 parent f1f4eea commit 720925f

File tree

4 files changed

+23
-2
lines changed

4 files changed

+23
-2
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.5.0
2+
- Add "auto_create_topics" option to allow disabling of topic auto creation [#172](https://github.com/logstash-plugins/logstash-integration-kafka/pull/172)
3+
14
## 11.4.2
25
- Add default client_id of logstash to kafka output [#169](https://github.com/logstash-plugins/logstash-integration-kafka/pull/169)
36

docs/input-kafka.asciidoc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
9898
|=======================================================================
9999
|Setting |Input type|Required
100100
| <<plugins-{type}s-{plugin}-auto_commit_interval_ms>> |<<number,number>>|No
101+
| <<plugins-{type}s-{plugin}-auto_create_topics>> |<<boolean,boolean>>|No
101102
| <<plugins-{type}s-{plugin}-auto_offset_reset>> |<<string,string>>|No
102103
| <<plugins-{type}s-{plugin}-bootstrap_servers>> |<<string,string>>|No
103104
| <<plugins-{type}s-{plugin}-check_crcs>> |<<boolean,boolean>>|No
@@ -285,6 +286,17 @@ This will add a field named `kafka` to the logstash event containing the followi
285286
* `offset`: The offset from the partition this message is associated with
286287
* `key`: A ByteBuffer containing the message key
287288

289+
290+
[id="plugins-{type}s-{plugin}-auto_create_topics"]
291+
===== `auto_create_topics` 
292+
293+
  * Value type is <<boolean,boolean>>
294+
* Default value is `true`
295+
296+
Controls whether the topic is automatically created when subscribing to a non-existent topic.
297+
A topic will be auto-created only if this configuration is set to `true` and auto-topic creation is enabled on the broker using `auto.create.topics.enable`;
298+
otherwise auto-topic creation is not permitted. 
299+
288300
[id="plugins-{type}s-{plugin}-enable_auto_commit"]
289301
===== `enable_auto_commit`
290302

@@ -789,7 +801,6 @@ Filtering by a regular expression is done by retrieving the full list of topic n
789801
NOTE: When the broker has some topics configured with ACL rules and they miss the DESCRIBE permission, then the subscription
790802
happens but on the broker side it is logged that the subscription of some topics was denied to the configured user.
791803

792-
793804
[id="plugins-{type}s-{plugin}-value_deserializer_class"]
794805
===== `value_deserializer_class`
795806

lib/logstash/inputs/kafka.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,12 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
246246
# `timestamp`: The timestamp of this message
247247
# While with `extended` it adds also all the key values present in the Kafka header if the key is valid UTF-8 else
248248
# silently skip it.
249+
#
250+
# Controls whether a kafka topic is automatically created when subscribing to a non-existent topic.
251+
# A topic will be auto-created only if this configuration is set to `true` and auto-topic creation is enabled on the broker using `auto.create.topics.enable`; 
252+
# otherwise auto-topic creation is not permitted.
253+
config :auto_create_topics, :validate => :boolean, :default => true
254+
249255
config :decorate_events, :validate => %w(none basic extended false true), :default => "none"
250256

251257
attr_reader :metadata_mode
@@ -410,6 +416,7 @@ def create_consumer(client_id, group_instance_id)
410416

411417
props.put(kafka::AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval_ms.to_s) unless auto_commit_interval_ms.nil?
412418
props.put(kafka::AUTO_OFFSET_RESET_CONFIG, auto_offset_reset) unless auto_offset_reset.nil?
419+
props.put(kafka::ALLOW_AUTO_CREATE_TOPICS_CONFIG, auto_create_topics) unless auto_create_topics.nil?
413420
props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
414421
props.put(kafka::CHECK_CRCS_CONFIG, check_crcs.to_s) unless check_crcs.nil?
415422
props.put(kafka::CLIENT_DNS_LOOKUP_CONFIG, client_dns_lookup)

logstash-integration-kafka.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-kafka'
3-
s.version = '11.4.2'
3+
s.version = '11.5.0'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Integration with Kafka - input and output plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+

0 commit comments

Comments
 (0)