Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,9 @@ struct flb_config {

int hot_reload_watchdog_timeout_seconds;

/* Routing */
size_t route_mask_size;
size_t route_mask_slots;
uint64_t *route_empty_mask;
/* router context */
struct flb_router *router;

#ifdef FLB_SYSTEM_WINDOWS
/* maxstdio (Windows) */
int win_maxstdio;
Expand Down
21 changes: 21 additions & 0 deletions include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ struct flb_router_path {
struct mk_list _head;
};

struct flb_router {
/* Routing masks */
size_t route_mask_size;
size_t route_mask_slots;
uint64_t *route_empty_mask;

/* metrics */
struct cmt *cmt;

/* logs routing metrics */
struct cmt_counter *logs_records_total;
struct cmt_counter *logs_bytes_total;
struct cmt_counter *logs_drop_records_total;
struct cmt_counter *logs_drop_bytes_total;
};

static inline int flb_router_match_type(int in_event_type,
struct flb_output_instance *o_ins)
{
Expand Down Expand Up @@ -65,4 +81,9 @@ int flb_router_match(const char *tag, int tag_len,
const char *match, void *match_regex);
int flb_router_io_set(struct flb_config *config);
void flb_router_exit(struct flb_config *config);

int flb_router_metrics_create(struct flb_config *config, struct flb_router *router);
struct flb_router *flb_router_create(struct flb_config *config);
Comment on lines +85 to +86
Copy link

@coderabbitai coderabbitai bot Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

API name/signature mismatch: flb_router_metrics_create vs router_metrics_create

src/flb_router.c calls router_metrics_create(router) (no config arg), while this header declares flb_router_metrics_create(struct flb_config *, struct flb_router *). This likely causes an unresolved/unused symbol scenario.

Run to confirm and decide whether to:

  • remove the public flb_router_metrics_create declaration, keeping a private router_metrics_create, or
  • standardize on a single public symbol and update the call site.

🏁 Script executed:

#!/bin/bash
set -euo pipefail
rg -n 'flb_router_metrics_create\s*\(' -C2 || true
rg -n 'router_metrics_create\s*\(' -C2 || true
rg -n 'flb_router_create\s*\(' -n -C3 src/flb_router.c || true

Length of output: 1338


Remove or correct flb_router_metrics_create declaration
The header declares int flb_router_metrics_create(struct flb_config *, struct flb_router *) but only a private static int router_metrics_create(struct flb_router *) exists in src/flb_router.c, leading to an unresolved symbol. Either remove the public declaration from include/fluent-bit/flb_router.h or provide a matching implementation.

🤖 Prompt for AI Agents
In include/fluent-bit/flb_router.h around lines 86-87, the header declares
flb_router_metrics_create(struct flb_config *, struct flb_router *) but only a
private static router_metrics_create(struct flb_router *) exists in
src/flb_router.c causing an unresolved symbol; either remove the public
declaration from the header or implement a public flb_router_metrics_create with
the declared signature in src/flb_router.c that adapts/forwards to the existing
router_metrics_create (accepting/using the config param or ignoring it
consistently) and ensure it is non-static and exported.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edsiper, thank you for confirming! I'm glad I could help identify this issue. 🐰

🌟
┏━━━━━━━━━━━━━━━━━━┓
┃ Happy to help! ┃
┗━━━━━━━━━━━━━━━━━━┛
(_/)
(='.'=)
(")
(")

void flb_router_destroy(struct flb_router *router);

#endif
31 changes: 17 additions & 14 deletions include/fluent-bit/flb_routes_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,25 @@ typedef uint64_t flb_route_mask_element;
struct flb_input_instance;
struct flb_config;

int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask,
const char *tag,
int tag_len,
int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask,
const char *tag,
int tag_len,
struct flb_input_instance *in);
int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask,
struct flb_config *config);
int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value,
struct flb_router *router);
void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value,
struct flb_router *router);
void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value,
struct flb_router *router);
int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask,
struct flb_router *router);

int flb_routes_empty_mask_create(struct flb_config *config);
void flb_routes_empty_mask_destroy(struct flb_config *config);
int flb_routes_empty_mask_create(struct flb_router *router);
void flb_routes_empty_mask_destroy(struct flb_router *router);

int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config);
int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router);
size_t flb_routes_mask_get_size(struct flb_router *router);

size_t flb_routes_mask_get_slots(struct flb_router *router);

#endif
16 changes: 8 additions & 8 deletions plugins/in_storage_backlog/sb.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun
int tag_len;
const char * tag_buf;
int result;
size_t slots;

memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk));

memset(context->dummy_routes_mask,
0,
context->ins->config->route_mask_slots * sizeof(flb_route_mask_element));
slots = flb_routes_mask_get_slots(context->ins->config->router);
memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk));
memset(context->dummy_routes_mask, 0, slots * sizeof(flb_route_mask_element));

Comment on lines +287 to 290
Copy link

@coderabbitai coderabbitai bot Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use route_mask element count, not slots (bits), for allocation/memset

You’re using flb_routes_mask_get_slots() (bits) to size arrays of flb_route_mask_element. This over-allocates by a factor of element bit-width (typically 64) and repeats during memset. Use the element count from flb_routes_mask_get_size() instead.

Apply:

-    size_t                  slots;
+    size_t                  elements;
@@
-    slots = flb_routes_mask_get_slots(context->ins->config->router);
+    elements = flb_routes_mask_get_size(context->ins->config->router);
     memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk));
-    memset(context->dummy_routes_mask, 0, slots * sizeof(flb_route_mask_element));
+    memset(context->dummy_routes_mask, 0, elements * sizeof(flb_route_mask_element));

And at init:

-    size = flb_routes_mask_get_slots(config->router);
-    ctx->dummy_routes_mask = flb_calloc(size, sizeof(flb_route_mask_element));
+    size = flb_routes_mask_get_size(config->router);
+    ctx->dummy_routes_mask = flb_calloc(size, sizeof(flb_route_mask_element));

This prevents unnecessary memory usage and keeps the mask operations within bounds.

Also applies to: 668-671

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edsiper, understood! I'll disregard this suggestion. Thank you for clarifying.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edsiper, understood! I'll defer to your judgment on this. Thank you for the clarification.

dummy_input_chunk.in = context->ins;
dummy_input_chunk.chunk = target_chunk;
Expand Down Expand Up @@ -317,7 +317,7 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun
backlog = mk_list_entry(head, struct sb_out_queue, _head);
if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask,
backlog->ins->id,
backlog->ins->config)) {
backlog->ins->config->router)) {
result = sb_append_chunk_to_segregated_backlog(target_chunk, stream,
chunk_size, backlog);
if (result) {
Expand Down Expand Up @@ -655,6 +655,7 @@ static int cb_sb_init(struct flb_input_instance *in,
{
int ret;
char mem[32];
size_t size;
struct flb_sb *ctx;

ctx = flb_calloc(1, sizeof(struct flb_sb));
Expand All @@ -664,9 +665,8 @@ static int cb_sb_init(struct flb_input_instance *in,
return -1;
}

ctx->dummy_routes_mask = flb_calloc(in->config->route_mask_slots,
sizeof(flb_route_mask_element));

size = flb_routes_mask_get_slots(config->router);
ctx->dummy_routes_mask = flb_calloc(size, sizeof(flb_route_mask_element));
if (ctx->dummy_routes_mask == NULL) {
flb_errno();
flb_free(ctx);
Expand Down
12 changes: 10 additions & 2 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/flb_bucket_queue.h>
#include <fluent-bit/flb_router.h>

const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL";

Expand Down Expand Up @@ -303,7 +304,14 @@ struct flb_config *flb_config_init()
}

/* Routing */
flb_routes_mask_set_size(1, config);
config->router = flb_router_create(config);
if (!config->router) {
flb_error("[config] could not create router");
flb_cf_destroy(cf);
flb_free(config);
return NULL;
}
flb_routes_mask_set_size(1, config->router);

Comment on lines 306 to 315
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle flb_routes_mask_set_size() failure to avoid partial-init leaks.

If mask allocation fails, we proceed with a half-initialized router. Bail out and clean up.

-    flb_routes_mask_set_size(1, config->router);
+    ret = flb_routes_mask_set_size(1, config->router);
+    if (ret != 0) {
+        flb_error("[config] routing mask init failed");
+        flb_cf_destroy(cf);
+        flb_router_destroy(config->router);
+        flb_free(config);
+        return NULL;
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Routing */
flb_routes_mask_set_size(1, config);
config->router = flb_router_create(config);
if (!config->router) {
flb_error("[config] could not create router");
flb_cf_destroy(cf);
flb_free(config);
return NULL;
}
flb_routes_mask_set_size(1, config->router);
/* Routing */
config->router = flb_router_create(config);
if (!config->router) {
flb_error("[config] could not create router");
flb_cf_destroy(cf);
flb_free(config);
return NULL;
}
ret = flb_routes_mask_set_size(1, config->router);
if (ret != 0) {
flb_error("[config] routing mask init failed");
flb_cf_destroy(cf);
flb_router_destroy(config->router);
flb_free(config);
return NULL;
}
🤖 Prompt for AI Agents
In src/flb_config.c around lines 306 to 315, flb_routes_mask_set_size(...)
return value is not checked so a mask allocation failure would leave a
half-initialized router and leak resources; update the code to check the
function's return (non-zero = failure), log an error, call flb_cf_destroy(cf),
destroy/free the router (e.g. flb_router_destroy(config->router) or the
appropriate cleanup function), free config, and return NULL to fully abort
initialization on failure.

config->cio = NULL;
config->storage_path = NULL;
Expand Down Expand Up @@ -611,8 +619,8 @@ void flb_config_exit(struct flb_config *config)

/* release task map */
flb_config_task_map_resize(config, 0);
flb_routes_empty_mask_destroy(config);

flb_router_destroy(config->router);
flb_free(config);
}

Expand Down
91 changes: 65 additions & 26 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ static inline int handle_output_event(uint64_t ts,
uint32_t type;
uint32_t key;
double latency_seconds;
char *name;
char *in_name;
char *out_name;
struct flb_task *task;
struct flb_task_retry *retry;
struct flb_output_instance *ins;
Expand Down Expand Up @@ -289,7 +290,9 @@ static inline int handle_output_event(uint64_t ts,
if (flb_output_is_threaded(ins) == FLB_FALSE) {
flb_output_flush_finished(config, out_id);
}
name = (char *) flb_output_name(ins);

in_name = (char *) flb_input_name(task->i_ins);
out_name = (char *) flb_output_name(ins);

/* If we are in synchronous mode, flush the next waiting task */
if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
Expand All @@ -302,16 +305,27 @@ static inline int handle_output_event(uint64_t ts,
if (ret == FLB_OK) {
/* cmetrics */
cmt_counter_add(ins->cmt_proc_records, ts, task->event_chunk->total_events,
1, (char *[]) {name});
1, (char *[]) {out_name});

cmt_counter_add(ins->cmt_proc_bytes, ts, task->event_chunk->size,
1, (char *[]) {name});
1, (char *[]) {out_name});

/* router metrics */
if (task->event_chunk->type == FLB_INPUT_LOGS) {
cmt_counter_add(config->router->logs_records_total, ts,
task->event_chunk->total_events,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

/* latency histogram */
if (ins->cmt_latency) {
latency_seconds = flb_time_now() - ((struct flb_input_chunk *) task->ic)->create_time;
cmt_histogram_observe(ins->cmt_latency, ts, latency_seconds, 2,
(char *[]) {(char *) flb_input_name(task->i_ins), name});
(char *[]) {(char *) flb_input_name(task->i_ins), out_name});
}

/* [OLD API] Update metrics */
Expand Down Expand Up @@ -346,7 +360,7 @@ static inline int handle_output_event(uint64_t ts,

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

flb_task_retry_clean(task, ins);
flb_task_users_dec(task, FLB_TRUE);
Expand All @@ -355,11 +369,21 @@ static inline int handle_output_event(uint64_t ts,
if (ins->retry_limit == FLB_OUT_RETRY_NONE) {
/* cmetrics: output_dropped_records_total */
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

if (task->event_chunk->type == FLB_INPUT_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD metrics API */
#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -389,13 +413,26 @@ static inline int handle_output_event(uint64_t ts,
*/

/* cmetrics */
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name});
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

if (task->event_chunk->type == FLB_INPUT_LOGS) {
in_name = (char *) flb_input_name(task->i_ins);
out_name = (char *) flb_output_name(ins);

cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD metrics API */
#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -449,13 +486,13 @@ static inline int handle_output_event(uint64_t ts,
flb_output_name(ins), out_id);

/* cmetrics */
cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {name});
cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_retried_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD metrics API: update the metrics since a new retry is coming */
#ifdef FLB_HAVE_METRICS
Expand All @@ -466,13 +503,23 @@ static inline int handle_output_event(uint64_t ts,
}
else if (ret == FLB_ERROR) {
/* cmetrics */
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name});
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

if (task->event_chunk->type == FLB_INPUT_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD API */
#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -811,15 +858,7 @@ int flb_engine_start(struct flb_config *config)
config->notification_channels_initialized = FLB_TRUE;
config->notification_event.type = FLB_ENGINE_EV_NOTIFICATION;

ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config);

if (ret != 0) {
flb_error("[engine] routing mask dimensioning failed");
return -1;
}

ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config);

ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config->router);
if (ret != 0) {
flb_error("[engine] routing mask dimensioning failed");
return -1;
Expand Down
Loading
Loading