From 7a355b01c672bbc101ca206cc68d587ee1c4258f Mon Sep 17 00:00:00 2001 From: arman1371 Date: Fri, 28 Aug 2020 23:54:10 +0430 Subject: [PATCH 1/2] Added conf to choose if appending thread number to client_id or not --- lib/logstash/inputs/kafka.rb | 4 +++- spec/integration/inputs/kafka_spec.rb | 12 ++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 095f66b1..1a2c6d42 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -80,6 +80,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # is to be able to track the source of requests beyond just ip/port by allowing # a logical application name to be included. config :client_id, :validate => :string, :default => "logstash" + # If true, appends the consumer thread number after the client_id string. + config :append_thread_num_to_client_id, :validate => :boolean, :default => true # Close idle connections after the number of milliseconds specified by this config. config :connections_max_idle_ms, :validate => :number, :default => 540_000 # (9m) Kafka default # Ideally you should have as many threads as the number of partitions for a perfect @@ -240,7 +242,7 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } + @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}"+(@append_thread_num_to_client_id ? "-#{i}" : "")) } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end # def run diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index d7ce7363..f3ecbe0d 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -12,6 +12,7 @@ let(:group_id_4) {rand(36**8).to_s(36)} let(:group_id_5) {rand(36**8).to_s(36)} let(:group_id_6) {rand(36**8).to_s(36)} + let(:group_id_7) {rand(36**8).to_s(36)} let(:plain_config) do { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest' } @@ -19,6 +20,9 @@ let(:multi_consumer_config) do plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) end + let(:not_append_thread_num_config) do + plain_config.merge({"group_id" => group_id_7, "client_id" => "spec", "consumer_threads" => 3, "append_thread_num_to_client_id" => false}) + end let(:snappy_config) do { 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest' } @@ -67,6 +71,14 @@ end end end + it "should consumer all messages with same client_id" do + consume_messages(not_append_thread_num_config, timeout: timeout_seconds, event_count: num_events) do |queue, kafka_input| + expect(queue.length).to eq(num_events) + kafka_input.kafka_consumers.each_with_index do |consumer, i| + expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec") + end + end + end end context "#kafka-topics-pattern" do From ce127130097154667da90849d8a20e70f2f94109 Mon Sep 17 00:00:00 2001 From: Arman Yazdani Date: Sat, 17 Jul 2021 22:28:25 +0430 Subject: [PATCH 2/2] Revert "Added conf to choose if appending thread number to client_id or not" This reverts commit 7a355b01c672bbc101ca206cc68d587ee1c4258f. --- lib/logstash/inputs/kafka.rb | 4 +--- spec/integration/inputs/kafka_spec.rb | 12 ------------ 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 1a2c6d42..095f66b1 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -80,8 +80,6 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # is to be able to track the source of requests beyond just ip/port by allowing # a logical application name to be included. config :client_id, :validate => :string, :default => "logstash" - # If true, appends the consumer thread number after the client_id string. - config :append_thread_num_to_client_id, :validate => :boolean, :default => true # Close idle connections after the number of milliseconds specified by this config. config :connections_max_idle_ms, :validate => :number, :default => 540_000 # (9m) Kafka default # Ideally you should have as many threads as the number of partitions for a perfect @@ -242,7 +240,7 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}"+(@append_thread_num_to_client_id ? "-#{i}" : "")) } + @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end # def run diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index f3ecbe0d..d7ce7363 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -12,7 +12,6 @@ let(:group_id_4) {rand(36**8).to_s(36)} let(:group_id_5) {rand(36**8).to_s(36)} let(:group_id_6) {rand(36**8).to_s(36)} - let(:group_id_7) {rand(36**8).to_s(36)} let(:plain_config) do { 'topics' => ['logstash_integration_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest' } @@ -20,9 +19,6 @@ let(:multi_consumer_config) do plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) end - let(:not_append_thread_num_config) do - plain_config.merge({"group_id" => group_id_7, "client_id" => "spec", "consumer_threads" => 3, "append_thread_num_to_client_id" => false}) - end let(:snappy_config) do { 'topics' => ['logstash_integration_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest' } @@ -71,14 +67,6 @@ end end end - it "should consumer all messages with same client_id" do - consume_messages(not_append_thread_num_config, timeout: timeout_seconds, event_count: num_events) do |queue, kafka_input| - expect(queue.length).to eq(num_events) - kafka_input.kafka_consumers.each_with_index do |consumer, i| - expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec") - end - end - end end context "#kafka-topics-pattern" do