diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e1e8f2f0783..7b5f454f4e2 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -291,10 +291,9 @@ struct flb_config { int hot_reload_watchdog_timeout_seconds; - /* Routing */ - size_t route_mask_size; - size_t route_mask_slots; - uint64_t *route_empty_mask; + /* router context */ + struct flb_router *router; + #ifdef FLB_SYSTEM_WINDOWS /* maxstdio (Windows) */ int win_maxstdio; diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index 81528248eb6..b1c38b1568b 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -29,6 +29,22 @@ struct flb_router_path { struct mk_list _head; }; +struct flb_router { + /* Routing masks */ + size_t route_mask_size; + size_t route_mask_slots; + uint64_t *route_empty_mask; + + /* metrics */ + struct cmt *cmt; + + /* logs routing metrics */ + struct cmt_counter *logs_records_total; + struct cmt_counter *logs_bytes_total; + struct cmt_counter *logs_drop_records_total; + struct cmt_counter *logs_drop_bytes_total; +}; + static inline int flb_router_match_type(int in_event_type, struct flb_output_instance *o_ins) { @@ -65,4 +81,9 @@ int flb_router_match(const char *tag, int tag_len, const char *match, void *match_regex); int flb_router_io_set(struct flb_config *config); void flb_router_exit(struct flb_config *config); + +int flb_router_metrics_create(struct flb_config *config, struct flb_router *router); +struct flb_router *flb_router_create(struct flb_config *config); +void flb_router_destroy(struct flb_router *router); + #endif diff --git a/include/fluent-bit/flb_routes_mask.h b/include/fluent-bit/flb_routes_mask.h index 3f47f549124..50938e5463a 100644 --- a/include/fluent-bit/flb_routes_mask.h +++ b/include/fluent-bit/flb_routes_mask.h @@ -47,22 +47,25 @@ typedef uint64_t flb_route_mask_element; struct flb_input_instance; struct flb_config; -int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, - const char *tag, - int tag_len, +int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, + const char *tag, + int tag_len, struct flb_input_instance *in); -int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, - struct flb_config *config); +int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); +void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); + void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); +int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, + struct flb_router *router); -int flb_routes_empty_mask_create(struct flb_config *config); -void flb_routes_empty_mask_destroy(struct flb_config *config); +int flb_routes_empty_mask_create(struct flb_router *router); +void flb_routes_empty_mask_destroy(struct flb_router *router); -int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config); +int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router); +size_t flb_routes_mask_get_size(struct flb_router *router); + +size_t flb_routes_mask_get_slots(struct flb_router *router); #endif diff --git a/plugins/in_storage_backlog/sb.c b/plugins/in_storage_backlog/sb.c index de3f0b4694e..306f967e1b2 100644 --- a/plugins/in_storage_backlog/sb.c +++ b/plugins/in_storage_backlog/sb.c @@ -281,12 +281,12 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun int tag_len; const char * tag_buf; int result; + size_t slots; - memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); - memset(context->dummy_routes_mask, - 0, - context->ins->config->route_mask_slots * sizeof(flb_route_mask_element)); + slots = flb_routes_mask_get_slots(context->ins->config->router); + memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); + memset(context->dummy_routes_mask, 0, slots * sizeof(flb_route_mask_element)); dummy_input_chunk.in = context->ins; dummy_input_chunk.chunk = target_chunk; @@ -317,7 +317,7 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun backlog = mk_list_entry(head, struct sb_out_queue, _head); if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask, backlog->ins->id, - backlog->ins->config)) { + backlog->ins->config->router)) { result = sb_append_chunk_to_segregated_backlog(target_chunk, stream, chunk_size, backlog); if (result) { @@ -655,6 +655,7 @@ static int cb_sb_init(struct flb_input_instance *in, { int ret; char mem[32]; + size_t size; struct flb_sb *ctx; ctx = flb_calloc(1, sizeof(struct flb_sb)); @@ -664,9 +665,8 @@ static int cb_sb_init(struct flb_input_instance *in, return -1; } - ctx->dummy_routes_mask = flb_calloc(in->config->route_mask_slots, - sizeof(flb_route_mask_element)); - + size = flb_routes_mask_get_slots(config->router); + ctx->dummy_routes_mask = flb_calloc(size, sizeof(flb_route_mask_element)); if (ctx->dummy_routes_mask == NULL) { flb_errno(); flb_free(ctx); diff --git a/src/flb_config.c b/src/flb_config.c index ddfdd010a1c..1dc8f94608f 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -44,6 +44,7 @@ #include #include #include +#include const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL"; @@ -303,7 +304,14 @@ struct flb_config *flb_config_init() } /* Routing */ - flb_routes_mask_set_size(1, config); + config->router = flb_router_create(config); + if (!config->router) { + flb_error("[config] could not create router"); + flb_cf_destroy(cf); + flb_free(config); + return NULL; + } + flb_routes_mask_set_size(1, config->router); config->cio = NULL; config->storage_path = NULL; @@ -611,8 +619,8 @@ void flb_config_exit(struct flb_config *config) /* release task map */ flb_config_task_map_resize(config, 0); - flb_routes_empty_mask_destroy(config); + flb_router_destroy(config->router); flb_free(config); } diff --git a/src/flb_engine.c b/src/flb_engine.c index d7fcd7a6223..c6658acc8a5 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -243,7 +243,8 @@ static inline int handle_output_event(uint64_t ts, uint32_t type; uint32_t key; double latency_seconds; - char *name; + char *in_name; + char *out_name; struct flb_task *task; struct flb_task_retry *retry; struct flb_output_instance *ins; @@ -289,7 +290,9 @@ static inline int handle_output_event(uint64_t ts, if (flb_output_is_threaded(ins) == FLB_FALSE) { flb_output_flush_finished(config, out_id); } - name = (char *) flb_output_name(ins); + + in_name = (char *) flb_input_name(task->i_ins); + out_name = (char *) flb_output_name(ins); /* If we are in synchronous mode, flush the next waiting task */ if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) { @@ -302,16 +305,27 @@ static inline int handle_output_event(uint64_t ts, if (ret == FLB_OK) { /* cmetrics */ cmt_counter_add(ins->cmt_proc_records, ts, task->event_chunk->total_events, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_proc_bytes, ts, task->event_chunk->size, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + /* router metrics */ + if (task->event_chunk->type == FLB_INPUT_LOGS) { + cmt_counter_add(config->router->logs_records_total, ts, + task->event_chunk->total_events, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } /* latency histogram */ if (ins->cmt_latency) { latency_seconds = flb_time_now() - ((struct flb_input_chunk *) task->ic)->create_time; cmt_histogram_observe(ins->cmt_latency, ts, latency_seconds, 2, - (char *[]) {(char *) flb_input_name(task->i_ins), name}); + (char *[]) {(char *) flb_input_name(task->i_ins), out_name}); } /* [OLD API] Update metrics */ @@ -346,7 +360,7 @@ static inline int handle_output_event(uint64_t ts, cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); flb_task_retry_clean(task, ins); flb_task_users_dec(task, FLB_TRUE); @@ -355,11 +369,21 @@ static inline int handle_output_event(uint64_t ts, if (ins->retry_limit == FLB_OUT_RETRY_NONE) { /* cmetrics: output_dropped_records_total */ cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (task->event_chunk->type == FLB_INPUT_LOGS) { + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API */ #ifdef FLB_HAVE_METRICS @@ -389,13 +413,26 @@ static inline int handle_output_event(uint64_t ts, */ /* cmetrics */ - cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (task->event_chunk->type == FLB_INPUT_LOGS) { + in_name = (char *) flb_input_name(task->i_ins); + out_name = (char *) flb_output_name(ins); + + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API */ #ifdef FLB_HAVE_METRICS @@ -449,13 +486,13 @@ static inline int handle_output_event(uint64_t ts, flb_output_name(ins), out_id); /* cmetrics */ - cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_retried_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API: update the metrics since a new retry is coming */ #ifdef FLB_HAVE_METRICS @@ -466,13 +503,23 @@ static inline int handle_output_event(uint64_t ts, } else if (ret == FLB_ERROR) { /* cmetrics */ - cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (task->event_chunk->type == FLB_INPUT_LOGS) { + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD API */ #ifdef FLB_HAVE_METRICS @@ -811,15 +858,7 @@ int flb_engine_start(struct flb_config *config) config->notification_channels_initialized = FLB_TRUE; config->notification_event.type = FLB_ENGINE_EV_NOTIFICATION; - ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config); - - if (ret != 0) { - flb_error("[engine] routing mask dimensioning failed"); - return -1; - } - - ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config); - + ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config->router); if (ret != 0) { flb_error("[engine] routing mask dimensioning failed"); return -1; diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index b8f431452c4..6b34bdd03c2 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -162,7 +162,7 @@ static int flb_input_chunk_release_space( if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, output_plugin->id, - input_plugin->config)) { + input_plugin->config->router)) { continue; } @@ -185,14 +185,14 @@ static int flb_input_chunk_release_space( if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { flb_routes_mask_clear_bit(old_input_chunk->routes_mask, output_plugin->id, - input_plugin->config); + input_plugin->config->router); FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); output_plugin->fs_chunks_size -= chunk_size; chunk_destroy_flag = flb_routes_mask_is_empty( old_input_chunk->routes_mask, - input_plugin->config); + input_plugin->config->router); chunk_released = FLB_TRUE; } @@ -214,6 +214,24 @@ static int flb_input_chunk_release_space( dropped_record_count, 1, (char *[]) {(char *) flb_output_name(output_plugin)}); + if (old_input_chunk->event_type == FLB_INPUT_LOGS) { + struct flb_router *router = output_plugin->config->router; + + cmt_counter_add(router->logs_drop_records_total, + cfl_time_now(), + dropped_record_count, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + + cmt_counter_add(router->logs_drop_bytes_total, + cfl_time_now(), + chunk_size, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + } + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, dropped_record_count, output_plugin->metrics); @@ -411,7 +429,7 @@ static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, */ if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id, - ic->in->config) == 0) { + ic->in->config->router) == 0) { return FLB_FALSE; } @@ -516,7 +534,7 @@ int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic, if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 || (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) == 0)) { + o_ins->config->router) == 0)) { continue; } @@ -558,7 +576,7 @@ int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic, if ((o_ins->total_limit_size == -1) || (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) == 0)) { + o_ins->config->router) == 0)) { continue; } @@ -599,8 +617,7 @@ int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_siz } } } - return !flb_routes_mask_is_empty(ic->routes_mask, - i_ins->config); + return !flb_routes_mask_is_empty(ic->routes_mask, i_ins->config->router); } /* Create an input chunk using a Chunk I/O */ @@ -612,6 +629,7 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, int tag_len; int has_routes; int ret; + size_t size; uint64_t ts; char *buf_data; size_t buf_size; @@ -641,10 +659,9 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, return NULL; } - ic->routes_mask = (flb_route_mask_element *) - flb_calloc(in->config->route_mask_size, - sizeof(flb_route_mask_element)); + size = flb_routes_mask_get_size(in->config->router); + ic->routes_mask = (flb_route_mask_element *) flb_calloc(size, sizeof(flb_route_mask_element)); if (ic->routes_mask == NULL) { flb_errno(); cio_chunk_close(chunk, CIO_TRUE); @@ -879,6 +896,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in int err; int set_down = FLB_FALSE; int has_routes; + size_t size; char name[64]; struct cio_chunk *chunk; struct flb_storage_input *storage; @@ -943,10 +961,9 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in #ifdef FLB_HAVE_METRICS ic->total_records = 0; #endif - ic->routes_mask = (flb_route_mask_element *) - flb_calloc(in->config->route_mask_size, - sizeof(flb_route_mask_element)); + size = flb_routes_mask_get_size(in->config->router); + ic->routes_mask = (flb_route_mask_element *) flb_calloc(size, sizeof(flb_route_mask_element)); if (ic->routes_mask == NULL) { flb_errno(); cio_chunk_close(chunk, CIO_TRUE); @@ -1008,7 +1025,7 @@ int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { if (ic->fs_counted == FLB_TRUE) { FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); o_ins->fs_chunks_size -= bytes; @@ -1092,7 +1109,7 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { if (ic->fs_counted == FLB_TRUE) { FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); o_ins->fs_chunks_size -= bytes; @@ -1250,8 +1267,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks * (based in creation time) to get enough space for the incoming chunk. */ - if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) - && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { + if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) && + flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { /* * If the chunk is not newly created, the chunk might already have logs inside. * We cannot delete (reused) chunks here. @@ -1259,7 +1276,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * the chunk. */ if (new_chunk || - flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) == FLB_TRUE) { + flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) == FLB_TRUE) { flb_input_chunk_destroy(ic, FLB_TRUE); } return NULL; @@ -2218,7 +2235,7 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { /* * if there is match on any index of 1's in the binary, it indicates * that the input chunk will flush to this output instance diff --git a/src/flb_metrics_exporter.c b/src/flb_metrics_exporter.c index 9834ab96cb5..9a3221b4ecd 100644 --- a/src/flb_metrics_exporter.c +++ b/src/flb_metrics_exporter.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -308,6 +309,15 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx) } } + if (ctx->router && ctx->router->cmt) { + ret = cmt_cat(cmt, ctx->router->cmt); + if (ret == -1) { + flb_error("[metrics exporter] could not append routing metrics"); + cmt_destroy(cmt); + return NULL; + } + } + /* Pipeline metrics: input, filters, outputs */ mk_list_foreach(head, &ctx->inputs) { i = mk_list_entry(head, struct flb_input_instance, _head); diff --git a/src/flb_router.c b/src/flb_router.c index dbc2243edc1..01bcee7fc18 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -269,3 +269,102 @@ void flb_router_exit(struct flb_config *config) } } } + +static int router_metrics_create(struct flb_router *router) +{ + if (!router || !router->cmt) { + return -1; + } + + /* Metrics for Logs + * ---------------- + * The following metrics are used to track the number of records routed from input to output, + * the number of bytes routed, and the number of records and bytes dropped during routing. + * + * The metrics are defined as follows: + * + * - flb_routing_logs_records_total: Log records routed to outputs + * - flb_routing_logs_bytes_total: Total log bytes routed + * - flb_routing_logs_drop_records_total: Log records dropped during routing + * - flb_routing_logs_drop_bytes_total: Log bytes dropped during routing + * + */ + router->logs_records_total = cmt_counter_create( + router->cmt, + "fluentbit", "routing_logs", "records_total", + "Total log records routed from input to output", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_records_total) { + return -1; + } + + router->logs_bytes_total = cmt_counter_create( + router->cmt, + "fluentbit", "routing_logs", "bytes_total", + "Total bytes routed from input to output (logs)", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_bytes_total) { + return -1; + } + + router->logs_drop_records_total = cmt_counter_create( + router->cmt, + "fluentbit", "routing_logs", "drop_records_total", + "Total log records dropped during routing", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_drop_records_total) { + return -1; + } + + router->logs_drop_bytes_total = cmt_counter_create( + router->cmt, + "fluentbit", "routing_logs", "drop_bytes_total", + "Total bytes dropped during routing (logs)", + 2, + (char *[]) { "input", "output" }); + if (!router->logs_drop_bytes_total) { + return -1; + } + + return 0; +} + +struct flb_router *flb_router_create(struct flb_config *config) +{ + struct flb_router *router; + + router = flb_calloc(1, sizeof(struct flb_router)); + if (!router) { + flb_errno(); + return NULL; + } + + /* create metrics instance */ + router->cmt = cmt_create(); + if (!router->cmt) { + flb_free(router); + return NULL; + } + + if (router_metrics_create(router) != 0) { + flb_router_destroy(router); + return NULL; + } + return router; +} + +void flb_router_destroy(struct flb_router *router) +{ + flb_routes_empty_mask_destroy(router); + + if (router->cmt) { + cmt_destroy(router->cmt); + } + + flb_free(router); + router = NULL; +} + diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index e10ad9a463a..be5296f3c24 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -23,6 +23,21 @@ #include #include +size_t flb_routes_mask_get_size(struct flb_router *router) +{ + if (router == NULL) { + return 0; + } + return router->route_mask_size; +} + +size_t flb_routes_mask_get_slots(struct flb_router *router) +{ + if (router == NULL) { + return 0; + } + return router->route_mask_slots; +} /* * Set the routes_mask for input chunk with a router_match on tag, return a @@ -34,17 +49,19 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, struct flb_input_instance *in) { int has_routes = 0; + size_t size; struct mk_list *o_head; struct flb_output_instance *o_ins; if (!in) { return 0; } + if (in->config == NULL || in->config->router == NULL) { + return 0; + } /* Clear the bit field */ - memset(routes_mask, - 0, - sizeof(flb_route_mask_element) * - in->config->route_mask_size); + size = flb_routes_mask_get_size(in->config->router); + memset(routes_mask, 0, sizeof(flb_route_mask_element) * size); /* Find all matching routes for the given tag */ mk_list_foreach(o_head, &in->config->outputs) { @@ -58,7 +75,7 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, , NULL #endif )) { - flb_routes_mask_set_bit(routes_mask, o_ins->id, o_ins->config); + flb_routes_mask_set_bit(routes_mask, o_ins->id, o_ins->config->router); has_routes = 1; } } @@ -74,12 +91,15 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, * */ void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { + if (router == NULL) { + return; + } + if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", value); return; @@ -98,13 +118,16 @@ void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, * */ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { - flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", + if (router == NULL) { + return; + } + if (value < 0 || value >= router->route_mask_slots) { + flb_warn("[routes_mask] Can't clear bit (%d) past limits of bitfield", value); return; } @@ -123,12 +146,15 @@ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, * */ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { + if (router == NULL) { + return 0; + } + if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't get bit (%d) past limits of bitfield", value); return 0; @@ -140,47 +166,60 @@ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, } int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, - struct flb_config *config) + struct flb_router *router) { + if (router == NULL || router->route_empty_mask == NULL) { + return 0; + } return memcmp(routes_mask, - config->route_empty_mask, - config->route_mask_size) == 0; + router->route_empty_mask, + router->route_mask_size * sizeof(flb_route_mask_element)) == 0; } -int flb_routes_empty_mask_create(struct flb_config *config) +int flb_routes_empty_mask_create(struct flb_router *router) { - flb_routes_empty_mask_destroy(config); + if (router == NULL) { + return -1; + } + + flb_routes_empty_mask_destroy(router); - config->route_empty_mask = flb_calloc(config->route_mask_size, + router->route_empty_mask = flb_calloc(router->route_mask_size, sizeof(flb_route_mask_element)); - if (config->route_empty_mask == NULL) { + if (router->route_empty_mask == NULL) { return -1; } return 0; } -void flb_routes_empty_mask_destroy(struct flb_config *config) +void flb_routes_empty_mask_destroy(struct flb_router *router) { - if (config->route_empty_mask != NULL) { - flb_free(config->route_empty_mask); - - config->route_empty_mask = NULL; + if (router == NULL) { + return; + } + if (router->route_empty_mask != NULL) { + flb_free(router->route_empty_mask); + router->route_empty_mask = NULL; } } -int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config) +int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router) { + if (router == NULL) { + return -1; + } + if (mask_size < 1) { mask_size = 1; } - mask_size = (mask_size / FLB_ROUTES_MASK_ELEMENT_BITS) + - (mask_size % FLB_ROUTES_MASK_ELEMENT_BITS); + mask_size = (mask_size + FLB_ROUTES_MASK_ELEMENT_BITS - 1) / FLB_ROUTES_MASK_ELEMENT_BITS; - config->route_mask_size = mask_size; - config->route_mask_slots = mask_size * FLB_ROUTES_MASK_ELEMENT_BITS; + router->route_mask_size = mask_size; + router->route_mask_slots = mask_size * FLB_ROUTES_MASK_ELEMENT_BITS; - return flb_routes_empty_mask_create(config); + return flb_routes_empty_mask_create(router); } + diff --git a/src/flb_task.c b/src/flb_task.c index 75263ff70d4..6b3382e6277 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -444,9 +444,9 @@ struct flb_task *flb_task_create(uint64_t ref_id, continue; } - if (flb_routes_mask_get_bit(task_ic->routes_mask, + if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno();