diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index cd72a8a..2f18410 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -141,6 +141,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -513,6 +514,15 @@ The maximum number of records returned in a single call to poll(). The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions +[id="plugins-{type}s-{plugin}-on_shutdown"] +===== `on_shutdown` + +* Value type is <> +* Accepted values are: +- `release`: releases assigned partitions to make them immediately available to other consumers in the group +- `abandon`: avoid explicitly releasing assigned partitions, allowing sticky-type <> to retain partition assignments across process or pipeline restarts. +* The default value is `abandon` + [id="plugins-{type}s-{plugin}-partition_assignment_strategy"] ===== `partition_assignment_strategy` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index b8b1dbd..3aa49cd 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -270,6 +270,11 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # otherwise auto-topic creation is not permitted. config :auto_create_topics, :validate => :boolean, :default => true + # controls consumer behaviour on shutdown + # - abandon: close consumer without unsubscribing + # - release: unsubscribe before closing consumer + config :on_shutdown, :validate => %w(release abandon), :default => 'abandon' + config :decorate_events, :validate => %w(none basic extended false true), :default => "none" attr_reader :metadata_mode @@ -355,6 +360,7 @@ def thread_runner(logstash_queue, consumer, name) end end ensure + consumer.unsubscribe if @on_shutdown == 'release' consumer.close end end diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 7c44f47..ad2351e 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -187,6 +187,42 @@ def send_message(record) end end + context 'on_shutdown' do + let(:config) { plain_config.merge('group_id' => rand(36**8).to_s(36), 'consumer_threads' => 2) } + subject(:registered_kafka_input) { LogStash::Inputs::Kafka.new(config).tap(&:register) } + + before(:each) do + # setup a spy for `KafkaConsumer#unsubscribe` on each consumer that is created by the plugin + allow(registered_kafka_input).to receive(:create_consumer).and_wrap_original do |m, *args| + m.call(*args).tap do |kafka_consumer| + kafka_consumer.class.__persistent__ = true # allow mocking on KafkaConsumer java proxy + allow(kafka_consumer).to receive(:unsubscribe).and_call_original + end + end + end + + context 'release' do + let(:config) { super().merge('on_shutdown' => 'release') } + + it 'unsubscribes' do + run_input(registered_kafka_input, timeout: timeout_seconds, event_count: num_events) + + expect(registered_kafka_input.kafka_consumers).to all(have_received(:unsubscribe)) + end + end + + context 'abandon' do + define_negated_matcher :not_have_received, :have_received + let(:config) { super().merge('on_shutdown' => 'abandon') } + + it 'does not unsubscribe' do + run_input(registered_kafka_input, timeout: timeout_seconds, event_count: num_events) + + expect(registered_kafka_input.kafka_consumers).to all(not_have_received(:unsubscribe)) + end + end + end + context "static membership 'group.instance.id' setting" do let(:base_config) do { @@ -285,9 +321,13 @@ def wait_kafka_input_is_ready(topic, queue) expect(message).to_not eq(nil) end -def consume_messages(config, queue: Queue.new, timeout:, event_count:) +def consume_messages(config, queue: Queue.new, timeout:, event_count:, &block) kafka_input = LogStash::Inputs::Kafka.new(config) kafka_input.register + run_input(kafka_input, queue: queue, timeout: timeout, event_count: event_count, &block) +end + +def run_input(kafka_input, queue: Queue.new, timeout:, event_count:) t = Thread.new { kafka_input.run(queue) } begin t.run @@ -295,8 +335,8 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:) block_given? ? yield(queue, kafka_input) : queue ensure kafka_input.do_stop - t.kill t.join(30) + t.kill end end diff --git a/spec/unit/inputs/kafka_spec.rb b/spec/unit/inputs/kafka_spec.rb index 04ed19c..3fcbd11 100644 --- a/spec/unit/inputs/kafka_spec.rb +++ b/spec/unit/inputs/kafka_spec.rb @@ -93,6 +93,15 @@ expect(subject.client_dns_lookup).to eq('use_all_dns_ips') end end + + context '#on_shutdown' do + let(:registered_kafka_input) { subject.tap(&:register) } + it 'defaults to `abandon`' do + # BACKWARD-COMPATIBILITY: defaulting to a value other than `abandon` has + # side-effects for sticky partition strategies and static partition assignments + expect(registered_kafka_input.on_shutdown).to eq('abandon') + end + end end describe '#running' do