From c61a73cc6182e1d9e85a77fd9b84af2d6d033815 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 17:54:11 -0600 Subject: [PATCH 01/21] router: add log metrics and context for routing masks Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 19 +++++++ src/flb_router.c | 97 +++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index 81528248eb6..f952e53a793 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -29,6 +29,22 @@ struct flb_router_path { struct mk_list _head; }; +struct flb_router { + /* Routing masks */ + size_t route_mask_size; + size_t route_mask_slots; + uint64_t *route_empty_mask; + + /* metrics */ + struct cmt *cmt; + + /* logs routing metrics */ + struct cmt_counter *logs_records_total; + struct cmt_counter *logs_bytes_total; + struct cmt_counter *logs_drop_records_total; + struct cmt_counter *logs_drop_bytes_total; +}; + static inline int flb_router_match_type(int in_event_type, struct flb_output_instance *o_ins) { @@ -66,3 +82,6 @@ int flb_router_match(const char *tag, int tag_len, int flb_router_io_set(struct flb_config *config); void flb_router_exit(struct flb_config *config); #endif + +int flb_router_metrics_create(struct flb_config *config, struct flb_router *router); +struct flb_router *flb_router_create(struct flb_config *config); \ No newline at end of file diff --git a/src/flb_router.c b/src/flb_router.c index dbc2243edc1..7fe1d9b827d 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -269,3 +269,100 @@ void flb_router_exit(struct flb_config *config) } } } + +static int router_metrics_create(struct flb_router *router) +{ + if (!router || !router->cmt) { + return -1; + } + + /* Metrics for Logs + * ---------------- + * The following metrics are used to track the number of records routed from input to output, + * the number of bytes routed, and the number of records and bytes dropped during routing. + * + * The metrics are defined as follows: + * + * - flb_routing_logs_records_total: Log records routed to outputs + * - flb_routing_logs_bytes_total: Total log bytes routed + * - flb_routing_logs_drop_records_total: Log records dropped during routing + * - flb_routing_logs_drop_bytes_total: Log bytes dropped during routing + * + */ + router->logs_records_total = cmt_counter_create( + router->cmt, + "flb", "routing_logs", "records_total", + "Total log records routed from input to output", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_records_total) { + return -1; + } + + router->logs_bytes_total = cmt_counter_create( + router->cmt, + "flb", "routing_logs", "bytes_total", + "Total bytes routed from input to output (logs)", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_bytes_total) { + return -1; + } + + router->logs_drop_records_total = cmt_counter_create( + router->cmt, + "flb", "routing_logs", "drop_records_total", + "Total log records dropped during routing", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_drop_records_total) { + return -1; + } + + router->logs_drop_bytes_total = cmt_counter_create( + router->cmt, + "flb", "routing_logs", "drop_bytes_total", + "Total bytes dropped during routing (logs)", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_drop_bytes_total) { + return -1; + } + + return 0; +} + +struct flb_router *flb_router_create(struct flb_config *config) +{ + struct flb_router *router; + + router = flb_calloc(1, sizeof(struct flb_router)); + if (!router) { + flb_errno(); + return NULL; + } + + /* create metrics instance */ + router->cmt = cmt_create(); + if (!router->cmt) { + flb_free(router); + return NULL; + } + + if (router_metrics_create(router) != 0) { + flb_free(router); + return NULL; + } + return router; +} + +void flb_router_destroy(struct flb_router *router) +{ + if (router->cmt) { + cmt_destroy(router->cmt); + } + + flb_free(router); + router = NULL; +} + From 6dabcbb7bee7ed875e95d0f927a50619f879d15d Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 17:54:58 -0600 Subject: [PATCH 02/21] routes_mask: use new context for router masks Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_routes_mask.h | 31 +++++++------- src/flb_routes_mask.c | 63 ++++++++++++++++------------ 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/include/fluent-bit/flb_routes_mask.h b/include/fluent-bit/flb_routes_mask.h index 3f47f549124..50938e5463a 100644 --- a/include/fluent-bit/flb_routes_mask.h +++ b/include/fluent-bit/flb_routes_mask.h @@ -47,22 +47,25 @@ typedef uint64_t flb_route_mask_element; struct flb_input_instance; struct flb_config; -int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, - const char *tag, - int tag_len, +int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, + const char *tag, + int tag_len, struct flb_input_instance *in); -int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, - struct flb_config *config); +int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); +void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); + void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); +int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, + struct flb_router *router); -int flb_routes_empty_mask_create(struct flb_config *config); -void flb_routes_empty_mask_destroy(struct flb_config *config); +int flb_routes_empty_mask_create(struct flb_router *router); +void flb_routes_empty_mask_destroy(struct flb_router *router); -int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config); +int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router); +size_t flb_routes_mask_get_size(struct flb_router *router); + +size_t flb_routes_mask_get_slots(struct flb_router *router); #endif diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index e10ad9a463a..e83bb4ea97e 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -23,6 +23,15 @@ #include #include +size_t flb_routes_mask_get_size(struct flb_router *router) +{ + return router->route_mask_size; +} + +size_t flb_routes_mask_get_slots(struct flb_router *router) +{ + return router->route_mask_slots; +} /* * Set the routes_mask for input chunk with a router_match on tag, return a @@ -34,17 +43,17 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, struct flb_input_instance *in) { int has_routes = 0; + size_t size; struct mk_list *o_head; struct flb_output_instance *o_ins; if (!in) { return 0; } + /* Clear the bit field */ - memset(routes_mask, - 0, - sizeof(flb_route_mask_element) * - in->config->route_mask_size); + size = flb_routes_mask_get_size(in->config->router); + memset(routes_mask, 0, sizeof(flb_route_mask_element) * size); /* Find all matching routes for the given tag */ mk_list_foreach(o_head, &in->config->outputs) { @@ -58,7 +67,7 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, , NULL #endif )) { - flb_routes_mask_set_bit(routes_mask, o_ins->id, o_ins->config); + flb_routes_mask_set_bit(routes_mask, o_ins->id, o_ins->config->router); has_routes = 1; } } @@ -74,12 +83,12 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, * */ void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { + if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", value); return; @@ -98,12 +107,12 @@ void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, * */ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { + if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", value); return; @@ -123,12 +132,12 @@ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, * */ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { + if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't get bit (%d) past limits of bitfield", value); return 0; @@ -140,37 +149,36 @@ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, } int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, - struct flb_config *config) + struct flb_router *router) { return memcmp(routes_mask, - config->route_empty_mask, - config->route_mask_size) == 0; + router->route_empty_mask, + router->route_mask_size) == 0; } -int flb_routes_empty_mask_create(struct flb_config *config) +int flb_routes_empty_mask_create(struct flb_router *router) { - flb_routes_empty_mask_destroy(config); + flb_routes_empty_mask_destroy(router); - config->route_empty_mask = flb_calloc(config->route_mask_size, + router->route_empty_mask = flb_calloc(router->route_mask_size, sizeof(flb_route_mask_element)); - if (config->route_empty_mask == NULL) { + if (router->route_empty_mask == NULL) { return -1; } return 0; } -void flb_routes_empty_mask_destroy(struct flb_config *config) +void flb_routes_empty_mask_destroy(struct flb_router *router) { - if (config->route_empty_mask != NULL) { - flb_free(config->route_empty_mask); - - config->route_empty_mask = NULL; + if (router->route_empty_mask != NULL) { + flb_free(router->route_empty_mask); + router->route_empty_mask = NULL; } } -int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config) +int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router) { if (mask_size < 1) { mask_size = 1; @@ -179,8 +187,9 @@ int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config) mask_size = (mask_size / FLB_ROUTES_MASK_ELEMENT_BITS) + (mask_size % FLB_ROUTES_MASK_ELEMENT_BITS); - config->route_mask_size = mask_size; - config->route_mask_slots = mask_size * FLB_ROUTES_MASK_ELEMENT_BITS; + router->route_mask_size = mask_size; + router->route_mask_slots = mask_size * FLB_ROUTES_MASK_ELEMENT_BITS; - return flb_routes_empty_mask_create(config); + return flb_routes_empty_mask_create(router); } + From 46f47a32e617b1bd658dfad3bc3f1de34f0a1147 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 17:55:10 -0600 Subject: [PATCH 03/21] engine: remove duplicated router code Signed-off-by: Eduardo Silva --- src/flb_engine.c | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index d7fcd7a6223..ff64f21a98a 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -811,15 +811,7 @@ int flb_engine_start(struct flb_config *config) config->notification_channels_initialized = FLB_TRUE; config->notification_event.type = FLB_ENGINE_EV_NOTIFICATION; - ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config); - - if (ret != 0) { - flb_error("[engine] routing mask dimensioning failed"); - return -1; - } - - ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config); - + ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config->router); if (ret != 0) { flb_error("[engine] routing mask dimensioning failed"); return -1; From 79b09b65f67ae598a0626525fa5e2ee45f18bd25 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 17:55:33 -0600 Subject: [PATCH 04/21] config: use new router context Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_config.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e1e8f2f0783..7b5f454f4e2 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -291,10 +291,9 @@ struct flb_config { int hot_reload_watchdog_timeout_seconds; - /* Routing */ - size_t route_mask_size; - size_t route_mask_slots; - uint64_t *route_empty_mask; + /* router context */ + struct flb_router *router; + #ifdef FLB_SYSTEM_WINDOWS /* maxstdio (Windows) */ int win_maxstdio; From 874429daf2a94cca0a32281213f3759f04c7b2ec Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 17:55:52 -0600 Subject: [PATCH 05/21] in_storage_backlog: use new router/routes api Signed-off-by: Eduardo Silva --- plugins/in_storage_backlog/sb.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/in_storage_backlog/sb.c b/plugins/in_storage_backlog/sb.c index de3f0b4694e..306f967e1b2 100644 --- a/plugins/in_storage_backlog/sb.c +++ b/plugins/in_storage_backlog/sb.c @@ -281,12 +281,12 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun int tag_len; const char * tag_buf; int result; + size_t slots; - memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); - memset(context->dummy_routes_mask, - 0, - context->ins->config->route_mask_slots * sizeof(flb_route_mask_element)); + slots = flb_routes_mask_get_slots(context->ins->config->router); + memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); + memset(context->dummy_routes_mask, 0, slots * sizeof(flb_route_mask_element)); dummy_input_chunk.in = context->ins; dummy_input_chunk.chunk = target_chunk; @@ -317,7 +317,7 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun backlog = mk_list_entry(head, struct sb_out_queue, _head); if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask, backlog->ins->id, - backlog->ins->config)) { + backlog->ins->config->router)) { result = sb_append_chunk_to_segregated_backlog(target_chunk, stream, chunk_size, backlog); if (result) { @@ -655,6 +655,7 @@ static int cb_sb_init(struct flb_input_instance *in, { int ret; char mem[32]; + size_t size; struct flb_sb *ctx; ctx = flb_calloc(1, sizeof(struct flb_sb)); @@ -664,9 +665,8 @@ static int cb_sb_init(struct flb_input_instance *in, return -1; } - ctx->dummy_routes_mask = flb_calloc(in->config->route_mask_slots, - sizeof(flb_route_mask_element)); - + size = flb_routes_mask_get_slots(config->router); + ctx->dummy_routes_mask = flb_calloc(size, sizeof(flb_route_mask_element)); if (ctx->dummy_routes_mask == NULL) { flb_errno(); flb_free(ctx); From d7e1647dc14861ff38de283c12575f656279af3f Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 17:56:12 -0600 Subject: [PATCH 06/21] input_chunk: use new router context Signed-off-by: Eduardo Silva --- src/flb_input_chunk.c | 39 +++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index b8f431452c4..44f66e19e0d 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -162,7 +162,7 @@ static int flb_input_chunk_release_space( if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, output_plugin->id, - input_plugin->config)) { + input_plugin->config->router)) { continue; } @@ -185,14 +185,14 @@ static int flb_input_chunk_release_space( if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { flb_routes_mask_clear_bit(old_input_chunk->routes_mask, output_plugin->id, - input_plugin->config); + input_plugin->config->router); FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); output_plugin->fs_chunks_size -= chunk_size; chunk_destroy_flag = flb_routes_mask_is_empty( old_input_chunk->routes_mask, - input_plugin->config); + input_plugin->config->router); chunk_released = FLB_TRUE; } @@ -411,7 +411,7 @@ static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, */ if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id, - ic->in->config) == 0) { + ic->in->config->router) == 0) { return FLB_FALSE; } @@ -516,7 +516,7 @@ int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic, if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 || (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) == 0)) { + o_ins->config->router) == 0)) { continue; } @@ -558,7 +558,7 @@ int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic, if ((o_ins->total_limit_size == -1) || (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) == 0)) { + o_ins->config->router) == 0)) { continue; } @@ -599,8 +599,7 @@ int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_siz } } } - return !flb_routes_mask_is_empty(ic->routes_mask, - i_ins->config); + return !flb_routes_mask_is_empty(ic->routes_mask, i_ins->config->router); } /* Create an input chunk using a Chunk I/O */ @@ -612,6 +611,7 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, int tag_len; int has_routes; int ret; + size_t size; uint64_t ts; char *buf_data; size_t buf_size; @@ -641,10 +641,9 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, return NULL; } - ic->routes_mask = (flb_route_mask_element *) - flb_calloc(in->config->route_mask_size, - sizeof(flb_route_mask_element)); + size = flb_routes_mask_get_size(in->config->router); + ic->routes_mask = (flb_route_mask_element *) flb_calloc(size, sizeof(flb_route_mask_element)); if (ic->routes_mask == NULL) { flb_errno(); cio_chunk_close(chunk, CIO_TRUE); @@ -879,6 +878,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in int err; int set_down = FLB_FALSE; int has_routes; + size_t size; char name[64]; struct cio_chunk *chunk; struct flb_storage_input *storage; @@ -943,10 +943,9 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in #ifdef FLB_HAVE_METRICS ic->total_records = 0; #endif - ic->routes_mask = (flb_route_mask_element *) - flb_calloc(in->config->route_mask_size, - sizeof(flb_route_mask_element)); + size = flb_routes_mask_get_size(in->config->router); + ic->routes_mask = (flb_route_mask_element *) flb_calloc(size, sizeof(flb_route_mask_element)); if (ic->routes_mask == NULL) { flb_errno(); cio_chunk_close(chunk, CIO_TRUE); @@ -1008,7 +1007,7 @@ int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { if (ic->fs_counted == FLB_TRUE) { FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); o_ins->fs_chunks_size -= bytes; @@ -1092,7 +1091,7 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { if (ic->fs_counted == FLB_TRUE) { FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); o_ins->fs_chunks_size -= bytes; @@ -1250,8 +1249,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks * (based in creation time) to get enough space for the incoming chunk. */ - if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) - && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { + if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) && + flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { /* * If the chunk is not newly created, the chunk might already have logs inside. * We cannot delete (reused) chunks here. @@ -1259,7 +1258,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * the chunk. */ if (new_chunk || - flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) == FLB_TRUE) { + flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) == FLB_TRUE) { flb_input_chunk_destroy(ic, FLB_TRUE); } return NULL; @@ -2218,7 +2217,7 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { /* * if there is match on any index of 1's in the binary, it indicates * that the input chunk will flush to this output instance From f551d71ddc974b54fb7f58426c9348dd9a0d651e Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 18:22:10 -0600 Subject: [PATCH 07/21] router: handle exit with the api Signed-off-by: Eduardo Silva --- src/flb_router.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flb_router.c b/src/flb_router.c index 7fe1d9b827d..7814aafb841 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -350,7 +350,7 @@ struct flb_router *flb_router_create(struct flb_config *config) } if (router_metrics_create(router) != 0) { - flb_free(router); + flb_router_destroy(router); return NULL; } return router; From f226b87f7ed22b95b2ce5cbbd5071610d9245100 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 18:22:34 -0600 Subject: [PATCH 08/21] routes_mask: fix warning message Signed-off-by: Eduardo Silva --- src/flb_routes_mask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index e83bb4ea97e..b0e64d62fa3 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -113,7 +113,7 @@ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, uint64_t bit; if (value < 0 || value >= router->route_mask_slots) { - flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", + flb_warn("[routes_mask] Can't clear bit (%d) past limits of bitfield", value); return; } From 2e482c71decc6a45bd93e0350f61f1053f6b647b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 18:23:27 -0600 Subject: [PATCH 09/21] task: use new router context Signed-off-by: Eduardo Silva --- src/flb_task.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flb_task.c b/src/flb_task.c index 75263ff70d4..6b3382e6277 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -444,9 +444,9 @@ struct flb_task *flb_task_create(uint64_t ref_id, continue; } - if (flb_routes_mask_get_bit(task_ic->routes_mask, + if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno(); From 5df52ed463322f479e52a82e5b67ee141eaa68d6 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 23 May 2025 18:29:28 -0600 Subject: [PATCH 10/21] config: pass router context Signed-off-by: Eduardo Silva --- src/flb_config.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flb_config.c b/src/flb_config.c index ddfdd010a1c..ed512c9336e 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -303,7 +303,7 @@ struct flb_config *flb_config_init() } /* Routing */ - flb_routes_mask_set_size(1, config); + flb_routes_mask_set_size(1, config->router); config->cio = NULL; config->storage_path = NULL; @@ -611,7 +611,7 @@ void flb_config_exit(struct flb_config *config) /* release task map */ flb_config_task_map_resize(config, 0); - flb_routes_empty_mask_destroy(config); + flb_routes_empty_mask_destroy(config->router); flb_free(config); } From f5f94c5bdb46a479dbd3ede55320bd58dedd6bd5 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sat, 24 May 2025 11:03:53 -0600 Subject: [PATCH 11/21] config: use new router create/destroy api Signed-off-by: Eduardo Silva --- src/flb_config.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/flb_config.c b/src/flb_config.c index ed512c9336e..1dc8f94608f 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -44,6 +44,7 @@ #include #include #include +#include const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL"; @@ -303,6 +304,13 @@ struct flb_config *flb_config_init() } /* Routing */ + config->router = flb_router_create(config); + if (!config->router) { + flb_error("[config] could not create router"); + flb_cf_destroy(cf); + flb_free(config); + return NULL; + } flb_routes_mask_set_size(1, config->router); config->cio = NULL; @@ -611,8 +619,8 @@ void flb_config_exit(struct flb_config *config) /* release task map */ flb_config_task_map_resize(config, 0); - flb_routes_empty_mask_destroy(config->router); + flb_router_destroy(config->router); flb_free(config); } From 4c001cf8bf7877b9f81517316c9a7d4f04526678 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sat, 24 May 2025 11:04:19 -0600 Subject: [PATCH 12/21] router: export destroy context function Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 3 ++- src/flb_router.c | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index f952e53a793..a4fe678af71 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -84,4 +84,5 @@ void flb_router_exit(struct flb_config *config); #endif int flb_router_metrics_create(struct flb_config *config, struct flb_router *router); -struct flb_router *flb_router_create(struct flb_config *config); \ No newline at end of file +struct flb_router *flb_router_create(struct flb_config *config); +void flb_router_destroy(struct flb_router *router); \ No newline at end of file diff --git a/src/flb_router.c b/src/flb_router.c index 7814aafb841..e610c10b6c4 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -358,6 +358,8 @@ struct flb_router *flb_router_create(struct flb_config *config) void flb_router_destroy(struct flb_router *router) { + flb_routes_empty_mask_destroy(router); + if (router->cmt) { cmt_destroy(router->cmt); } From 38a7aa3d209b25c1696ed072ce1d207ff3ddb582 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 30 Sep 2025 13:30:49 -0600 Subject: [PATCH 13/21] router: use proper metrics prefix 'fluentbit_' Signed-off-by: Eduardo Silva --- src/flb_router.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/flb_router.c b/src/flb_router.c index e610c10b6c4..01bcee7fc18 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -291,7 +291,7 @@ static int router_metrics_create(struct flb_router *router) */ router->logs_records_total = cmt_counter_create( router->cmt, - "flb", "routing_logs", "records_total", + "fluentbit", "routing_logs", "records_total", "Total log records routed from input to output", 2, (char *[]) { "input", "output" }); @@ -301,7 +301,7 @@ static int router_metrics_create(struct flb_router *router) router->logs_bytes_total = cmt_counter_create( router->cmt, - "flb", "routing_logs", "bytes_total", + "fluentbit", "routing_logs", "bytes_total", "Total bytes routed from input to output (logs)", 2, (char *[]) { "input", "output" }); @@ -311,7 +311,7 @@ static int router_metrics_create(struct flb_router *router) router->logs_drop_records_total = cmt_counter_create( router->cmt, - "flb", "routing_logs", "drop_records_total", + "fluentbit", "routing_logs", "drop_records_total", "Total log records dropped during routing", 2, (char *[]) { "input", "output" }); @@ -321,7 +321,7 @@ static int router_metrics_create(struct flb_router *router) router->logs_drop_bytes_total = cmt_counter_create( router->cmt, - "flb", "routing_logs", "drop_bytes_total", + "fluentbit", "routing_logs", "drop_bytes_total", "Total bytes dropped during routing (logs)", 2, (char *[]) { "input", "output" }); From 85ec82abf4136ff82d11dd32f19cf2bd51c0602c Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 30 Sep 2025 13:31:14 -0600 Subject: [PATCH 14/21] metrics_exporter: include router metrics context Signed-off-by: Eduardo Silva --- src/flb_metrics_exporter.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/flb_metrics_exporter.c b/src/flb_metrics_exporter.c index 9834ab96cb5..9a3221b4ecd 100644 --- a/src/flb_metrics_exporter.c +++ b/src/flb_metrics_exporter.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -308,6 +309,15 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx) } } + if (ctx->router && ctx->router->cmt) { + ret = cmt_cat(cmt, ctx->router->cmt); + if (ret == -1) { + flb_error("[metrics exporter] could not append routing metrics"); + cmt_destroy(cmt); + return NULL; + } + } + /* Pipeline metrics: input, filters, outputs */ mk_list_foreach(head, &ctx->inputs) { i = mk_list_entry(head, struct flb_input_instance, _head); From 200eb001bb31f2395e7e74f2920951e91bfaec77 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 30 Sep 2025 13:32:11 -0600 Subject: [PATCH 15/21] input_chunk: router: manage drop records/bytes metrics Signed-off-by: Eduardo Silva --- src/flb_input_chunk.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 44f66e19e0d..6b34bdd03c2 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -214,6 +214,24 @@ static int flb_input_chunk_release_space( dropped_record_count, 1, (char *[]) {(char *) flb_output_name(output_plugin)}); + if (old_input_chunk->event_type == FLB_INPUT_LOGS) { + struct flb_router *router = output_plugin->config->router; + + cmt_counter_add(router->logs_drop_records_total, + cfl_time_now(), + dropped_record_count, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + + cmt_counter_add(router->logs_drop_bytes_total, + cfl_time_now(), + chunk_size, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + } + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, dropped_record_count, output_plugin->metrics); From cfe64742a8ecab9889672b118160576e7f2e1da5 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 30 Sep 2025 13:33:36 -0600 Subject: [PATCH 16/21] engine: based on task return status update routing metrics Signed-off-by: Eduardo Silva --- src/flb_engine.c | 81 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 17 deletions(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index ff64f21a98a..c6658acc8a5 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -243,7 +243,8 @@ static inline int handle_output_event(uint64_t ts, uint32_t type; uint32_t key; double latency_seconds; - char *name; + char *in_name; + char *out_name; struct flb_task *task; struct flb_task_retry *retry; struct flb_output_instance *ins; @@ -289,7 +290,9 @@ static inline int handle_output_event(uint64_t ts, if (flb_output_is_threaded(ins) == FLB_FALSE) { flb_output_flush_finished(config, out_id); } - name = (char *) flb_output_name(ins); + + in_name = (char *) flb_input_name(task->i_ins); + out_name = (char *) flb_output_name(ins); /* If we are in synchronous mode, flush the next waiting task */ if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) { @@ -302,16 +305,27 @@ static inline int handle_output_event(uint64_t ts, if (ret == FLB_OK) { /* cmetrics */ cmt_counter_add(ins->cmt_proc_records, ts, task->event_chunk->total_events, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_proc_bytes, ts, task->event_chunk->size, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + /* router metrics */ + if (task->event_chunk->type == FLB_INPUT_LOGS) { + cmt_counter_add(config->router->logs_records_total, ts, + task->event_chunk->total_events, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } /* latency histogram */ if (ins->cmt_latency) { latency_seconds = flb_time_now() - ((struct flb_input_chunk *) task->ic)->create_time; cmt_histogram_observe(ins->cmt_latency, ts, latency_seconds, 2, - (char *[]) {(char *) flb_input_name(task->i_ins), name}); + (char *[]) {(char *) flb_input_name(task->i_ins), out_name}); } /* [OLD API] Update metrics */ @@ -346,7 +360,7 @@ static inline int handle_output_event(uint64_t ts, cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); flb_task_retry_clean(task, ins); flb_task_users_dec(task, FLB_TRUE); @@ -355,11 +369,21 @@ static inline int handle_output_event(uint64_t ts, if (ins->retry_limit == FLB_OUT_RETRY_NONE) { /* cmetrics: output_dropped_records_total */ cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (task->event_chunk->type == FLB_INPUT_LOGS) { + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API */ #ifdef FLB_HAVE_METRICS @@ -389,13 +413,26 @@ static inline int handle_output_event(uint64_t ts, */ /* cmetrics */ - cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (task->event_chunk->type == FLB_INPUT_LOGS) { + in_name = (char *) flb_input_name(task->i_ins); + out_name = (char *) flb_output_name(ins); + + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API */ #ifdef FLB_HAVE_METRICS @@ -449,13 +486,13 @@ static inline int handle_output_event(uint64_t ts, flb_output_name(ins), out_id); /* cmetrics */ - cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_retried_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API: update the metrics since a new retry is coming */ #ifdef FLB_HAVE_METRICS @@ -466,13 +503,23 @@ static inline int handle_output_event(uint64_t ts, } else if (ret == FLB_ERROR) { /* cmetrics */ - cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (task->event_chunk->type == FLB_INPUT_LOGS) { + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD API */ #ifdef FLB_HAVE_METRICS From 49e644c827e3c516d4bba0c2fe94093c982e0357 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 8 Oct 2025 07:55:25 -0600 Subject: [PATCH 17/21] router: fix header pre-processor location Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index a4fe678af71..b1c38b1568b 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -81,8 +81,9 @@ int flb_router_match(const char *tag, int tag_len, const char *match, void *match_regex); int flb_router_io_set(struct flb_config *config); void flb_router_exit(struct flb_config *config); -#endif int flb_router_metrics_create(struct flb_config *config, struct flb_router *router); struct flb_router *flb_router_create(struct flb_config *config); -void flb_router_destroy(struct flb_router *router); \ No newline at end of file +void flb_router_destroy(struct flb_router *router); + +#endif From e79a77d9a616447b5c84e8f5094bae5ac23cee8b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 8 Oct 2025 08:00:26 -0600 Subject: [PATCH 18/21] routes_mask: fix empty check Signed-off-by: Eduardo Silva --- src/flb_routes_mask.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index b0e64d62fa3..0e8f300b169 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -151,9 +151,12 @@ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, struct flb_router *router) { + if (router == NULL || router->route_empty_mask == NULL) { + return 0; + } return memcmp(routes_mask, router->route_empty_mask, - router->route_mask_size) == 0; + router->route_mask_size * sizeof(flb_route_mask_element)) == 0; } int flb_routes_empty_mask_create(struct flb_router *router) From d87b70ac561a80a7def658bc728120ab7a0e6a6c Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 8 Oct 2025 08:22:21 -0600 Subject: [PATCH 19/21] routes_mask: add NULL check guard Signed-off-by: Eduardo Silva --- src/flb_routes_mask.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index 0e8f300b169..77b49c760fe 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -49,7 +49,9 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, if (!in) { return 0; } - + if (in->config == NULL || in->config->router == NULL) { + return 0; + } /* Clear the bit field */ size = flb_routes_mask_get_size(in->config->router); @@ -161,6 +163,9 @@ int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, int flb_routes_empty_mask_create(struct flb_router *router) { + if (router == NULL) { + return -1; + } flb_routes_empty_mask_destroy(router); router->route_empty_mask = flb_calloc(router->route_mask_size, @@ -175,6 +180,9 @@ int flb_routes_empty_mask_create(struct flb_router *router) void flb_routes_empty_mask_destroy(struct flb_router *router) { + if (router == NULL) { + return; + } if (router->route_empty_mask != NULL) { flb_free(router->route_empty_mask); router->route_empty_mask = NULL; From 26ca51505643c0467cdb4a318d67e57c1baf85d3 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 8 Oct 2025 08:52:35 -0600 Subject: [PATCH 20/21] routes_mask: correct mask size calculation Signed-off-by: Eduardo Silva --- src/flb_routes_mask.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index 77b49c760fe..f3e91726d64 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -195,8 +195,7 @@ int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router) mask_size = 1; } - mask_size = (mask_size / FLB_ROUTES_MASK_ELEMENT_BITS) + - (mask_size % FLB_ROUTES_MASK_ELEMENT_BITS); + mask_size = (mask_size + FLB_ROUTES_MASK_ELEMENT_BITS - 1) / FLB_ROUTES_MASK_ELEMENT_BITS; router->route_mask_size = mask_size; router->route_mask_slots = mask_size * FLB_ROUTES_MASK_ELEMENT_BITS; From bfeac401dbb2266d1415c99bdba3d61fc41d8969 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 8 Oct 2025 10:31:56 -0600 Subject: [PATCH 21/21] routes_mask: add extra checks for router context Signed-off-by: Eduardo Silva --- src/flb_routes_mask.c | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index f3e91726d64..be5296f3c24 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -25,11 +25,17 @@ size_t flb_routes_mask_get_size(struct flb_router *router) { + if (router == NULL) { + return 0; + } return router->route_mask_size; } size_t flb_routes_mask_get_slots(struct flb_router *router) { + if (router == NULL) { + return 0; + } return router->route_mask_slots; } @@ -90,6 +96,9 @@ void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, int index; uint64_t bit; + if (router == NULL) { + return; + } if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", value); @@ -114,6 +123,9 @@ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, int index; uint64_t bit; + if (router == NULL) { + return; + } if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't clear bit (%d) past limits of bitfield", value); @@ -139,6 +151,9 @@ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, int index; uint64_t bit; + if (router == NULL) { + return 0; + } if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't get bit (%d) past limits of bitfield", value); @@ -166,6 +181,7 @@ int flb_routes_empty_mask_create(struct flb_router *router) if (router == NULL) { return -1; } + flb_routes_empty_mask_destroy(router); router->route_empty_mask = flb_calloc(router->route_mask_size, @@ -191,6 +207,10 @@ void flb_routes_empty_mask_destroy(struct flb_router *router) int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router) { + if (router == NULL) { + return -1; + } + if (mask_size < 1) { mask_size = 1; }