Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-max_poll_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_poll_records>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-on_shutdown>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-partition_assignment_strategy>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-poll_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<number,number>>|No
Expand Down Expand Up @@ -513,6 +514,15 @@ The maximum number of records returned in a single call to poll().
The period of time in milliseconds after which we force a refresh of metadata even if
we haven't seen any partition leadership changes to proactively discover any new brokers or partitions

[id="plugins-{type}s-{plugin}-on_shutdown"]
===== `on_shutdown`

* Value type is <<string,string>>
* Accepted values are:
- `release`: releases assigned partitions to make them immediately available to other consumers in the group
- `abandon`: avoid explicitly releasing assigned partitions, allowing sticky-type <<plugins-{type}s-{plugin}-partition_assignment_strategy>> to retain partition assignments across process or pipeline restarts.
* The default value is `abandon`

[id="plugins-{type}s-{plugin}-partition_assignment_strategy"]
===== `partition_assignment_strategy`

Expand Down
6 changes: 6 additions & 0 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# otherwise auto-topic creation is not permitted.
config :auto_create_topics, :validate => :boolean, :default => true

# controls consumer behaviour on shutdown
# - abandon: close consumer without unsubscribing
# - release: unsubscribe before closing consumer
config :on_shutdown, :validate => %w(release abandon), :default => 'abandon'

config :decorate_events, :validate => %w(none basic extended false true), :default => "none"

attr_reader :metadata_mode
Expand Down Expand Up @@ -355,6 +360,7 @@ def thread_runner(logstash_queue, consumer, name)
end
end
ensure
consumer.unsubscribe if @on_shutdown == 'release'
consumer.close
end
end
Expand Down
44 changes: 42 additions & 2 deletions spec/integration/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,42 @@ def send_message(record)
end
end

context 'on_shutdown' do
let(:config) { plain_config.merge('group_id' => rand(36**8).to_s(36), 'consumer_threads' => 2) }
subject(:registered_kafka_input) { LogStash::Inputs::Kafka.new(config).tap(&:register) }

before(:each) do
# setup a spy for `KafkaConsumer#unsubscribe` on each consumer that is created by the plugin
allow(registered_kafka_input).to receive(:create_consumer).and_wrap_original do |m, *args|
m.call(*args).tap do |kafka_consumer|
kafka_consumer.class.__persistent__ = true # allow mocking on KafkaConsumer java proxy
allow(kafka_consumer).to receive(:unsubscribe).and_call_original
end
end
end

context 'release' do
let(:config) { super().merge('on_shutdown' => 'release') }

it 'unsubscribes' do
run_input(registered_kafka_input, timeout: timeout_seconds, event_count: num_events)

expect(registered_kafka_input.kafka_consumers).to all(have_received(:unsubscribe))
end
end

context 'abandon' do
define_negated_matcher :not_have_received, :have_received
let(:config) { super().merge('on_shutdown' => 'abandon') }

it 'does not unsubscribe' do
run_input(registered_kafka_input, timeout: timeout_seconds, event_count: num_events)

expect(registered_kafka_input.kafka_consumers).to all(not_have_received(:unsubscribe))
end
end
end

context "static membership 'group.instance.id' setting" do
let(:base_config) do
{
Expand Down Expand Up @@ -285,18 +321,22 @@ def wait_kafka_input_is_ready(topic, queue)
expect(message).to_not eq(nil)
end

def consume_messages(config, queue: Queue.new, timeout:, event_count:)
def consume_messages(config, queue: Queue.new, timeout:, event_count:, &block)
kafka_input = LogStash::Inputs::Kafka.new(config)
kafka_input.register
run_input(kafka_input, queue: queue, timeout: timeout, event_count: event_count, &block)
end

def run_input(kafka_input, queue: Queue.new, timeout:, event_count:)
t = Thread.new { kafka_input.run(queue) }
begin
t.run
wait(timeout).for { queue.length }.to eq(event_count) unless timeout.eql?(false)
block_given? ? yield(queue, kafka_input) : queue
ensure
kafka_input.do_stop
t.kill
t.join(30)
t.kill
end
end

Expand Down
9 changes: 9 additions & 0 deletions spec/unit/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@
expect(subject.client_dns_lookup).to eq('use_all_dns_ips')
end
end

context '#on_shutdown' do
let(:registered_kafka_input) { subject.tap(&:register) }
it 'defaults to `abandon`' do
# BACKWARD-COMPATIBILITY: defaulting to a value other than `abandon` has
# side-effects for sticky partition strategies and static partition assignments
expect(registered_kafka_input.on_shutdown).to eq('abandon')
end
end
end

describe '#running' do
Expand Down