Skip to content

add headers reporting uncompressed size and doc count #1217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 12.0.6
- Add headers reporting uncompressed size and doc count for bulk requests [#1217](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1217)

## 12.0.5
- [DOC] Fix link to Logstash DLQ docs [#1214](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1214)

Expand Down
22 changes: 17 additions & 5 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ module LogStash; module Outputs; class ElasticSearch;
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB

EVENT_COUNT_HEADER = "X-Elastic-Event-Count".freeze
UNCOMPRESSED_LENGTH_HEADER = "X-Elastic-Uncompressed-Request-Length".freeze

class HttpClient
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
Expand Down Expand Up @@ -143,7 +144,11 @@ def bulk(actions)
:payload_size => stream_writer.pos,
:content_length => body_stream.size,
:batch_offset => (index + 1 - batch_actions.size))
bulk_responses << bulk_send(body_stream, batch_actions)
headers = {
EVENT_COUNT_HEADER => batch_actions.size.to_s,
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s
}
bulk_responses << bulk_send(body_stream, batch_actions, headers)
body_stream.truncate(0) && body_stream.seek(0)
stream_writer = gzip_writer(body_stream) if compression_level?
batch_actions.clear
Expand All @@ -159,7 +164,14 @@ def bulk(actions)
:payload_size => stream_writer.pos,
:content_length => body_stream.size,
:batch_offset => (actions.size - batch_actions.size))
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0

if body_stream.size > 0
headers = {
EVENT_COUNT_HEADER => batch_actions.size.to_s,
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s
}
bulk_responses << bulk_send(body_stream, batch_actions, headers)
end

body_stream.close unless compression_level?
join_bulk_responses(bulk_responses)
Expand All @@ -179,8 +191,8 @@ def join_bulk_responses(bulk_responses)
}
end

def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
def bulk_send(body_stream, batch_actions, headers = {})
params = compression_level? ? {:headers => headers.merge("Content-Encoding" => "gzip") } : { :headers => headers }

begin
response = @pool.post(@bulk_path, params, body_stream.string)
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '12.0.5'
s.version = '12.0.6'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
4 changes: 3 additions & 1 deletion spec/integration/outputs/compressed_indexing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
{
"Content-Encoding" => "gzip",
"Content-Type" => "application/json",
'x-elastic-product-origin' => 'logstash-output-elasticsearch'
'x-elastic-product-origin' => 'logstash-output-elasticsearch',
'X-Elastic-Event-Count' => anything,
'X-Elastic-Uncompressed-Request-Length' => anything,
}
}

Expand Down
14 changes: 12 additions & 2 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,22 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);

it "sets the correct content-type header" do
expected_manticore_opts = {
:headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'},
:headers => {
"Content-Type" => "application/json",
'x-elastic-product-origin' => 'logstash-output-elasticsearch',
'X-Elastic-Event-Count' => anything,
'X-Elastic-Uncompressed-Request-Length' => anything
},
:body => anything
}
if secure
expected_manticore_opts = {
:headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'},
:headers => {
"Content-Type" => "application/json",
'x-elastic-product-origin' => 'logstash-output-elasticsearch',
'X-Elastic-Event-Count' => anything,
'X-Elastic-Uncompressed-Request-Length' => anything
},
:body => anything,
:auth => {
:user => user,
Expand Down
77 changes: 77 additions & 0 deletions spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,83 @@

end
end
context "the 'user-agent' header" do
let(:pool) { double("pool") }
let(:compression_level) { 6 }
let(:base_options) { super().merge( :client_settings => {:compression_level => compression_level}) }
let(:actions) { [
["index", {:_id=>nil, :_index=>"logstash"}, {"message_1"=> message_1}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message_2"=> message_2}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message_3"=> message_3}],
]}
let(:message_1) { "hello" }
let(:message_2_size) { 1_000 }
let(:message_2) { SecureRandom.alphanumeric(message_2_size / 2 ) * 2 }
let(:message_3_size) { 1_000 }
let(:message_3) { "m" * message_3_size }
let(:messages_size) { message_1.size + message_2.size + message_3.size }
let(:action_overhead) { 42 + 16 + 2 } # header plus doc key size plus new line overhead per action

let(:response) do
response = double("response")
allow(response).to receive(:code).and_return(response)
allow(response).to receive(:body).and_return({"errors" => false}.to_json)
response
end

before(:each) do
subject.instance_variable_set("@pool", pool)
end

it "carries bulk request's uncompressed size" do
expect(pool).to receive(:post) do |path, params, body|
headers = params.fetch(:headers, {})
expect(headers["X-Elastic-Event-Count"]).to eq("3")
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s
end.and_return(response)

subject.send(:bulk, actions)
end
context "without compression" do
let(:compression_level) { 0 }
it "carries bulk request's uncompressed size" do
expect(pool).to receive(:post) do |path, params, body|
headers = params.fetch(:headers, {})
expect(headers["X-Elastic-Event-Count"]).to eq("3")
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s
end.and_return(response)
subject.send(:bulk, actions)
end
end

context "with compressed messages over 20MB" do
let(:message_2_size) { 21_000_000 }
it "carries bulk request's uncompressed size" do
# only the first, tiny, message is sent first
expect(pool).to receive(:post) do |path, params, body|
headers = params.fetch(:headers, {})
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_1.size + action_overhead).to_s
expect(headers["X-Elastic-Event-Count"]).to eq("1")
end.and_return(response)

# huge message_2 is sent afterwards alone
expect(pool).to receive(:post) do |path, params, body|
headers = params.fetch(:headers, {})
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_2.size + action_overhead).to_s
expect(headers["X-Elastic-Event-Count"]).to eq("1")
end.and_return(response)

# finally medium message_3 is sent alone as well
expect(pool).to receive(:post) do |path, params, body|
headers = params.fetch(:headers, {})
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_3.size + action_overhead).to_s
expect(headers["X-Elastic-Event-Count"]).to eq("1")
end.and_return(response)

subject.send(:bulk, actions)
end
end
end
end

describe "sniffing" do
Expand Down
2 changes: 1 addition & 1 deletion spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@
end

before(:each) do
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions|
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array), instance_of(Hash)) do |stream, actions, headers|
expect( stream.string ).to include '"foo":"bar1"'
expect( stream.string ).to include '"foo":"bar2"'
end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely
Expand Down