From b0f7776ca2f101c84a6f35f293f2842a7a47b759 Mon Sep 17 00:00:00 2001 From: usharma Date: Thu, 28 Aug 2025 22:33:03 +0530 Subject: [PATCH 01/10] out_s3: add zstd support This patch adds zstd based compression support to out_s3 plugin. This references the new aws compression support for zstd using the flb_zstd compression mechanism. It also adds a new content header for zstd compression Signed-off-by: Ujjwal Sharma Signed-off-by: usharma --- plugins/out_s3/s3.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index d9d25f187b1..f6d1a42ba0a 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -91,6 +92,13 @@ static struct flb_aws_header content_encoding_header = { .val_len = 4, }; +static struct flb_aws_header zstd_content_encoding_header = { + .key = "Content-Encoding", + .key_len = 16, + .val = "zstd", + .val_len = 4, +}; + static struct flb_aws_header content_type_header = { .key = "Content-Type", .key_len = 12, @@ -162,7 +170,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -195,6 +203,9 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; + } else if(ctx->compression == FLB_AWS_COMPRESS_ZSTD){ + s3_headers[n] = zstd_content_encoding_header; + n++; } if (ctx->canned_acl != NULL) { s3_headers[n] = canned_acl_header; @@ -1175,7 +1186,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, goto multipart; } else { - if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if ((ctx->use_put_object == FLB_FALSE && (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD))) { flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, " "the chunk was too small, using PutObject to upload", preCompress_size, body_size); } @@ -3998,10 +4009,11 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. " + "Compression type for S3 objects. 'gzip', 'arrow', 'parquet' and 'zstd' are the supported values. " "'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." + "If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, From 2ed01ff35deed08104713c3331256e085c66d44d Mon Sep 17 00:00:00 2001 From: usharma Date: Thu, 28 Aug 2025 22:36:55 +0530 Subject: [PATCH 02/10] aws: add zstd compression type zstd compression type added for the aws compression Signed-off-by: Ujjwal Sharma Signed-off-by: usharma --- include/fluent-bit/aws/flb_aws_compress.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h index d9e929c6669..6525e96d867 100644 --- a/include/fluent-bit/aws/flb_aws_compress.h +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -25,6 +25,7 @@ #define FLB_AWS_COMPRESS_GZIP 1 #define FLB_AWS_COMPRESS_ARROW 2 #define FLB_AWS_COMPRESS_PARQUET 3 +#define FLB_AWS_COMPRESS_ZSTD 4 /* * Get compression type from compression keyword. The return value is used to identify From de74f79acf10a58420e94b9e6ae5c3d5f56b322c Mon Sep 17 00:00:00 2001 From: usharma Date: Thu, 28 Aug 2025 22:38:48 +0530 Subject: [PATCH 03/10] aws: add zstd compression support This patch adds zstd compression to existing compression options for aws. It references the compression login in flb_zstd Signed-off-by: Ujjwal Sharma Signed-off-by: usharma --- src/aws/flb_aws_compress.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index 253020e392a..45fc1510255 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -23,6 +23,7 @@ #include #include +#include #include @@ -48,6 +49,11 @@ static const struct compression_option compression_options[] = { "gzip", &flb_gzip_compress }, + { + FLB_AWS_COMPRESS_ZSTD, + "zstd", + &flb_zstd_compress + }, #ifdef FLB_HAVE_ARROW { FLB_AWS_COMPRESS_ARROW, From 1eb854d7321f90a84eb2ac28ecb9fff29c116590 Mon Sep 17 00:00:00 2001 From: usharma Date: Thu, 28 Aug 2025 22:41:58 +0530 Subject: [PATCH 04/10] tests: internal: aws_compress: zstd test cases added zstd basic test cases added for compression detection and decoding validation Signed-off-by: Ujjwal Sharma Signed-off-by: usharma --- tests/internal/aws_compress.c | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c index 180a0985b92..e91e23f8be0 100644 --- a/tests/internal/aws_compress.c +++ b/tests/internal/aws_compress.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "flb_tests_internal.h" @@ -53,6 +54,22 @@ void test_compression_gzip() flb_aws_compress_test_cases(cases); } +void test_compression_zstd() +{ + struct flb_aws_test_case cases[] = + { + { + "zstd", + "hello hello hello hello hello hello", + "KLUv/SAjZQAAMGhlbGxvIAEAuUsR", + 0 + }, + { 0 } + }; + + flb_aws_compress_test_cases(cases); +} + void test_b64_truncated_gzip() { struct flb_aws_test_case cases[] = @@ -70,6 +87,22 @@ struct flb_aws_test_case cases[] = 41); } +void test_b64_truncated_zstd() +{ +struct flb_aws_test_case cases[] = + { + { + "zstd", + "hello hello hello hello hello hello", + "hello hello hello hello hello hello", + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__zstd_decode(cases,41); +} + void test_b64_truncated_gzip_truncation() { struct flb_aws_test_case cases[] = @@ -202,7 +235,9 @@ struct flb_aws_test_case cases[] = TEST_LIST = { { "test_compression_gzip", test_compression_gzip }, + { "test_compression_zstd", test_compression_zstd }, { "test_b64_truncated_gzip", test_b64_truncated_gzip }, + { "test_b64_truncated_zstd", test_b64_truncated_zstd }, { "test_b64_truncated_gzip_truncation", test_b64_truncated_gzip_truncation }, { "test_b64_truncated_gzip_truncation_buffer_too_small", test_b64_truncated_gzip_truncation_buffer_too_small }, @@ -231,6 +266,14 @@ static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( cases, max_out_len, &flb_gzip_uncompress); } +static void flb_aws_compress_truncate_b64_test_cases__zstd_decode( + struct flb_aws_test_case *cases, + size_t max_out_len) +{ + flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE, + cases, max_out_len, &flb_zstd_uncompress); +} + /* General test case loop flb_aws_compress */ static void flb_aws_compress_general_test_cases(int test_type, struct flb_aws_test_case *cases, From 82f61ac58e6e1635adaf8ce5117133bb629524fd Mon Sep 17 00:00:00 2001 From: usharma Date: Fri, 29 Aug 2025 11:33:54 +0530 Subject: [PATCH 05/10] tests: internal: aws_compress: zstd test cases added Signed-off-by: usharma --- tests/internal/aws_compress.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c index e91e23f8be0..7bb9d24b052 100644 --- a/tests/internal/aws_compress.c +++ b/tests/internal/aws_compress.c @@ -36,6 +36,9 @@ static void flb_aws_compress_test_cases(struct flb_aws_test_case *cases); static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( struct flb_aws_test_case *cases, size_t max_out_len); +static void flb_aws_compress_truncate_b64_test_cases__zstd_decode( + struct flb_aws_test_case *cases, + size_t max_out_len); /** ------ Test Cases ------ **/ void test_compression_gzip() From 1ced9cfadcd5dd81cac01a415e8213a0ba45dcb1 Mon Sep 17 00:00:00 2001 From: usharma Date: Fri, 29 Aug 2025 11:35:17 +0530 Subject: [PATCH 06/10] out_s3: Dynamic encoding header selection based on compression method Implemented a struct switch to select encoding header based on compression method. This can be extended in the future. Currently gzip was hardcoded. Signed-off-by: usharma --- plugins/out_s3/s3.c | 52 +++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index f6d1a42ba0a..59c9578eb7c 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -85,19 +85,31 @@ static void remove_from_queue(struct upload_queue *entry); static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context); -static struct flb_aws_header content_encoding_header = { - .key = "Content-Encoding", - .key_len = 16, - .val = "gzip", - .val_len = 4, -}; - -static struct flb_aws_header zstd_content_encoding_header = { - .key = "Content-Encoding", - .key_len = 16, - .val = "zstd", - .val_len = 4, -}; +static struct flb_aws_header *get_content_encoding_header(int compression_type) +{ + static struct flb_aws_header gzip_header = { + .key = "Content-Encoding", + .key_len = 16, + .val = "gzip", + .val_len = 4, + }; + + static struct flb_aws_header zstd_header = { + .key = "Content-Encoding", + .key_len = 16, + .val = "zstd", + .val_len = 4, + }; + + switch (compression_type) { + case FLB_AWS_COMPRESS_GZIP: + return &gzip_header; + case FLB_AWS_COMPRESS_ZSTD: + return &zstd_header; + default: + return NULL; + } +} static struct flb_aws_header content_type_header = { .key = "Content-Type", @@ -166,6 +178,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, int n = 0; int headers_len = 0; struct flb_aws_header *s3_headers = NULL; + struct flb_aws_header *encoding_header = NULL; if (ctx->content_type != NULL) { headers_len++; @@ -200,11 +213,14 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { - s3_headers[n] = content_encoding_header; - n++; - } else if(ctx->compression == FLB_AWS_COMPRESS_ZSTD){ - s3_headers[n] = zstd_content_encoding_header; + if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) { + encoding_header = get_content_encoding_header(ctx->compression); + + if(encoding_header == NULL){ + flb_errno(); + return -1; + } + s3_headers[n] = *encoding_header; n++; } if (ctx->canned_acl != NULL) { From 9024dddd9eb916a8be29424c21695dc04fe9b0a3 Mon Sep 17 00:00:00 2001 From: usharma Date: Fri, 29 Aug 2025 11:57:37 +0530 Subject: [PATCH 07/10] out_s3: Free s3 headers in case of encoding header not present We need to free the s3 headers from memory in case compression not present Signed-off-by: usharma --- plugins/out_s3/s3.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 59c9578eb7c..322e81cff5b 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -216,8 +216,9 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) { encoding_header = get_content_encoding_header(ctx->compression); - if(encoding_header == NULL){ + if (encoding_header == NULL) { flb_errno(); + flb_free(s3_headers); return -1; } s3_headers[n] = *encoding_header; From 846158107ee54b3a4730d326342f9a056fc4c436 Mon Sep 17 00:00:00 2001 From: usharma Date: Fri, 29 Aug 2025 14:33:25 +0530 Subject: [PATCH 08/10] out_s3: remove unused zstd header file Signed-off-by: usharma --- plugins/out_s3/s3.c | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 322e81cff5b..3eedb9334c0 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include From dbd27e50529b65ab77eacd4a90a9ae7e987009a4 Mon Sep 17 00:00:00 2001 From: usharma Date: Wed, 3 Sep 2025 13:31:34 +0530 Subject: [PATCH 09/10] aws: fix zstd compression pointer Added type casting explicitly to match the compress_options struct Signed-off-by: usharma --- src/aws/flb_aws_compress.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index 45fc1510255..040c3e1e28e 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -52,7 +52,7 @@ static const struct compression_option compression_options[] = { { FLB_AWS_COMPRESS_ZSTD, "zstd", - &flb_zstd_compress + (int *)&flb_zstd_compress }, #ifdef FLB_HAVE_ARROW { From 6418a32270e23e857c63c10013dbe5e35b714a17 Mon Sep 17 00:00:00 2001 From: usharma Date: Wed, 3 Sep 2025 16:55:06 +0530 Subject: [PATCH 10/10] tests: fix typecasting for zstd_compress Signed-off-by: usharma --- tests/internal/aws_compress.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c index 7bb9d24b052..2a94754374d 100644 --- a/tests/internal/aws_compress.c +++ b/tests/internal/aws_compress.c @@ -274,7 +274,7 @@ static void flb_aws_compress_truncate_b64_test_cases__zstd_decode( size_t max_out_len) { flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE, - cases, max_out_len, &flb_zstd_uncompress); + cases, max_out_len, (int *)&flb_zstd_uncompress); } /* General test case loop flb_aws_compress */