4545/* Macro to increment records metrics */
4646#ifdef FLB_HAVE_METRICS
4747#define INCREMENT_SKIPPED_METRIC (ctx , ins ) do { \
48- uint64_t ts = cfl_time_now(); \
49- static char* labels_array[1]; \
50- labels_array[0] = (char*)flb_filter_name(ins); \
51- cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \
48+ if (ctx->cmt_skipped) { \
49+ uint64_t ts = cfl_time_now(); \
50+ char* labels_array[1] = {(char*)flb_filter_name(ins)}; \
51+ cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \
52+ } \
5253 flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, ins->metrics); \
5354} while(0)
5455
5556#define INCREMENT_MATCHED_METRIC (ctx , ins ) do { \
56- uint64_t ts = cfl_time_now(); \
57- static char* labels_array[1]; \
58- labels_array[0] = (char*)flb_filter_name(ins); \
59- cmt_counter_add(ctx->cmt_matched, ts, 1, 1, labels_array); \
57+ if (ctx->cmt_matched) { \
58+ uint64_t ts = cfl_time_now(); \
59+ char* labels_array[1] = {(char*)flb_filter_name(ins)}; \
60+ cmt_counter_add(ctx->cmt_matched, ts, 1, 1, labels_array); \
61+ } \
6062 flb_metrics_sum(FLB_LOOKUP_METRIC_MATCHED, 1, ins->metrics); \
6163} while(0)
6264
6365#define INCREMENT_PROCESSED_METRIC (ctx , ins ) do { \
64- uint64_t ts = cfl_time_now(); \
65- static char* labels_array[1]; \
66- labels_array[0] = (char*)flb_filter_name(ins); \
67- cmt_counter_add(ctx->cmt_processed, ts, 1, 1, labels_array); \
66+ if (ctx->cmt_processed) { \
67+ uint64_t ts = cfl_time_now(); \
68+ char* labels_array[1] = {(char*)flb_filter_name(ins)}; \
69+ cmt_counter_add(ctx->cmt_processed, ts, 1, 1, labels_array); \
70+ } \
6871 flb_metrics_sum(FLB_LOOKUP_METRIC_PROCESSED, 1, ins->metrics); \
6972} while(0)
7073#else
@@ -84,20 +87,26 @@ struct val_node {
8487 */
8588static int normalize_and_trim (const char * input , size_t len , int ignore_case , char * * output , size_t * out_len )
8689{
90+ const char * start ;
91+ const char * end ;
92+ size_t n ;
93+ char * buf ;
94+ size_t j ;
95+
8796 if (!input || len == 0 ) {
8897 * output = NULL ;
8998 * out_len = 0 ;
9099 return 0 ;
91100 }
92101 /* Trim leading whitespace */
93- const char * start = input ;
94- size_t n = len ;
102+ start = input ;
103+ n = len ;
95104 while (n > 0 && isspace ((unsigned char )* start )) {
96105 start ++ ;
97106 n -- ;
98107 }
99108 /* Trim trailing whitespace */
100- const char * end = start + n ;
109+ end = start + n ;
101110 while (n > 0 && isspace ((unsigned char )* (end - 1 ))) {
102111 end -- ;
103112 n -- ;
@@ -108,8 +117,6 @@ static int normalize_and_trim(const char *input, size_t len, int ignore_case, ch
108117 return 0 ;
109118 }
110119 if (ignore_case ) {
111- char * buf ;
112- size_t j ;
113120 buf = flb_malloc (n + 1 );
114121 if (!buf ) {
115122 * output = NULL ;
@@ -153,10 +160,13 @@ static int dynbuf_init(struct dynamic_buffer *buf, size_t initial_capacity)
153160/* Append a character to dynamic buffer, growing if necessary */
154161static int dynbuf_append_char (struct dynamic_buffer * buf , char c )
155162{
163+ size_t new_capacity ;
164+ char * new_data ;
165+
156166 /* Ensure we have space for the character plus null terminator */
157167 if (buf -> len + 1 >= buf -> capacity ) {
158- size_t new_capacity = buf -> capacity * 2 ;
159- char * new_data = flb_realloc (buf -> data , new_capacity );
168+ new_capacity = buf -> capacity * 2 ;
169+ new_data = flb_realloc (buf -> data , new_capacity );
160170 if (!new_data ) {
161171 return -1 ;
162172 }
@@ -182,20 +192,27 @@ static void dynbuf_destroy(struct dynamic_buffer *buf)
182192/* Read a line of arbitrary length from file using dynamic allocation */
183193static char * read_line_dynamic (FILE * fp , size_t * line_length )
184194{
185- size_t capacity = 256 ; /* Initial capacity */
186- size_t len = 0 ;
187- char * line = flb_malloc ( capacity ) ;
195+ size_t capacity ;
196+ size_t len ;
197+ char * line ;
188198 int c ;
199+ size_t new_capacity ;
200+ char * new_line ;
189201
202+ /* Initialize variables after declaration */
203+ capacity = 256 ; /* Initial capacity */
204+ len = 0 ;
205+
206+ line = flb_malloc (capacity );
190207 if (!line ) {
191208 return NULL ;
192209 }
193210
194211 while ((c = fgetc (fp )) != EOF ) {
195212 /* Check if we need to grow the buffer */
196213 if (len + 1 >= capacity ) {
197- size_t new_capacity = capacity * 2 ;
198- char * new_line = flb_realloc (line , new_capacity );
214+ new_capacity = capacity * 2 ;
215+ new_line = flb_realloc (line , new_capacity );
199216 if (!new_line ) {
200217 flb_free (line );
201218 return NULL ;
@@ -221,7 +238,7 @@ static char *read_line_dynamic(FILE *fp, size_t *line_length)
221238
222239 /* Null terminate the string */
223240 if (len >= capacity ) {
224- char * new_line = flb_realloc (line , len + 1 );
241+ new_line = flb_realloc (line , len + 1 );
225242 if (!new_line ) {
226243 flb_free (line );
227244 return NULL ;
@@ -246,6 +263,7 @@ static int load_csv(struct lookup_ctx *ctx)
246263{
247264 FILE * fp ;
248265 int line_num = 1 ;
266+ int loaded_entries = 0 ; /* Track loaded entries count */
249267 char * header_line ;
250268 char * line ;
251269 size_t line_length ;
@@ -362,6 +380,16 @@ static int load_csv(struct lookup_ctx *ctx)
362380 p ++ ;
363381 }
364382
383+ /* Check for unmatched quote after key parsing */
384+ if (in_quotes ) {
385+ flb_plg_error (ctx -> ins , "Unmatched opening quote in key at line %d, skipping malformed line" , line_num );
386+ dynbuf_destroy (& key_buf );
387+ dynbuf_destroy (& val_buf );
388+ flb_free (line );
389+ line_num ++ ;
390+ goto next_line ;
391+ }
392+
365393 /* Parse value from second column (handle quotes) */
366394 in_quotes = 0 ;
367395 while (* p && (field == 1 )) {
@@ -471,9 +499,30 @@ static int load_csv(struct lookup_ctx *ctx)
471499 }
472500 memcpy (val_heap , val_ptr , val_len );
473501 val_heap [val_len ] = '\0' ;
502+
503+ /* Allocate and initialize val_node first to track allocated value for cleanup */
504+ node = flb_malloc (sizeof (struct val_node ));
505+ if (!node ) {
506+ flb_free (val_heap );
507+ flb_plg_warn (ctx -> ins , "Failed to allocate val_node for value cleanup, skipping" );
508+ if (key_ptr_allocated ) flb_free (key_ptr );
509+ if (val_ptr_allocated ) flb_free (val_ptr );
510+ dynbuf_destroy (& key_buf );
511+ dynbuf_destroy (& val_buf );
512+ flb_free (line );
513+ line_num ++ ;
514+ continue ;
515+ }
516+ node -> val = val_heap ;
517+ mk_list_add (& node -> _head , & ctx -> val_list );
518+
519+ /* Now add to hash table - if this fails, val_heap is still tracked in val_list */
474520 ret = flb_hash_table_add (ctx -> ht , key_ptr , key_len , val_heap , val_len );
475521 if (ret < 0 ) {
522+ /* Remove from val_list and free the node since hash table add failed */
523+ mk_list_del (& node -> _head );
476524 flb_free (val_heap );
525+ flb_free (node );
477526 flb_plg_warn (ctx -> ins , "Failed to add key '%.*s' (duplicate or error), skipping" , (int )key_len , key_ptr );
478527 if (key_ptr_allocated ) flb_free (key_ptr );
479528 if (val_ptr_allocated ) flb_free (val_ptr );
@@ -483,15 +532,8 @@ static int load_csv(struct lookup_ctx *ctx)
483532 line_num ++ ;
484533 continue ;
485534 }
486- /* Track allocated value for later cleanup */
487- node = flb_malloc (sizeof (struct val_node ));
488- if (node ) {
489- node -> val = val_heap ;
490- mk_list_add (& node -> _head , & ctx -> val_list );
491- } else {
492- /* If malloc fails, value will leak, but plugin will still function */
493- flb_plg_warn (ctx -> ins , "Failed to allocate val_node for value cleanup, value will leak" );
494- }
535+ /* Successfully loaded entry */
536+ loaded_entries ++ ;
495537 /* Do not free val_heap; hash table owns it now */
496538 if (key_ptr_allocated ) flb_free (key_ptr );
497539 if (val_ptr_allocated ) flb_free (val_ptr );
@@ -506,7 +548,7 @@ static int load_csv(struct lookup_ctx *ctx)
506548 continue ;
507549 }
508550 fclose (fp );
509- return 0 ;
551+ return loaded_entries ; /* Return count of successfully loaded entries */
510552}
511553
512554static int cb_lookup_init (struct flb_filter_instance * ins ,
@@ -534,16 +576,25 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
534576 "fluentbit" , "filter" , "lookup_processed_records_total" ,
535577 "Total number of processed records" ,
536578 1 , labels_name );
579+ if (!ctx -> cmt_processed ) {
580+ flb_plg_warn (ins , "failed to create processed_records_total counter" );
581+ }
537582
538583 ctx -> cmt_matched = cmt_counter_create (ins -> cmt ,
539584 "fluentbit" , "filter" , "lookup_matched_records_total" ,
540585 "Total number of matched records" ,
541586 1 , labels_name );
587+ if (!ctx -> cmt_matched ) {
588+ flb_plg_warn (ins , "failed to create matched_records_total counter" );
589+ }
542590
543591 ctx -> cmt_skipped = cmt_counter_create (ins -> cmt ,
544592 "fluentbit" , "filter" , "lookup_skipped_records_total" ,
545593 "Total number of skipped records due to errors" ,
546594 1 , labels_name );
595+ if (!ctx -> cmt_skipped ) {
596+ flb_plg_warn (ins , "failed to create skipped_records_total counter" );
597+ }
547598 }
548599
549600 /* Add to old metrics system */
@@ -571,6 +622,9 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
571622 goto error ;
572623 }
573624
625+ /* Precompute result_key length for hot path optimization */
626+ ctx -> result_key_len = strlen (ctx -> result_key );
627+
574628 /* Check file existence and readability */
575629#ifdef _WIN32
576630 if (_access (ctx -> file , 04 ) != 0 ) { /* 04 = R_OK on Windows */
@@ -603,7 +657,7 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
603657 if (ret < 0 ) {
604658 goto error ;
605659 }
606- flb_plg_info (ins , "Loaded %zu entries from CSV file '%s'" , ( size_t ) ctx -> ht -> total_count , ctx -> file );
660+ flb_plg_info (ins , "Loaded %d entries from CSV file '%s'" , ret , ctx -> file );
607661 flb_plg_info (ins , "Lookup filter initialized: lookup_key='%s', result_key='%s', ignore_case=%s" ,
608662 ctx -> lookup_key , ctx -> result_key , ctx -> ignore_case ? "true" : "false" );
609663
@@ -758,26 +812,35 @@ static int cb_lookup_filter(const void *data, size_t bytes,
758812 /* First pass: determine required buffer size */
759813 switch (rval -> type ) {
760814 case FLB_RA_BOOL :
815+ /* Check if this boolean was converted from a MAP type */
816+ if (rval -> o .type == MSGPACK_OBJECT_MAP ) {
817+ flb_plg_debug (ins , "Record %d: MAP type from record accessor, skipping conversion" , rec_num );
818+ CLEANUP_DYNAMIC_BUFFERS ();
819+ flb_ra_key_value_destroy (rval );
820+ emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
821+ continue ;
822+ }
761823 required_size = snprintf (NULL , 0 , "%s" , rval -> o .via .boolean ? "true" : "false" );
762824 break ;
763825 case FLB_RA_INT :
764826 required_size = snprintf (NULL , 0 , "%" PRId64 , rval -> o .via .i64 );
765827 break ;
766828 case FLB_RA_FLOAT :
767- required_size = snprintf (NULL , 0 , "%f " , rval -> o .via .f64 );
829+ required_size = snprintf (NULL , 0 , "%.15g " , rval -> o .via .f64 );
768830 break ;
769831 case FLB_RA_NULL :
770832 required_size = snprintf (NULL , 0 , "null" );
771833 break ;
772- case 5 :
773- case 6 :
774- flb_plg_debug (ins , "Record %d: complex type (ARRAY/MAP) from record accessor, skipping conversion" , rec_num );
775- CLEANUP_DYNAMIC_BUFFERS ();
776- flb_ra_key_value_destroy (rval );
777- emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
778- continue ;
779834 default :
780- flb_plg_debug (ins , "Record %d: unsupported type %d, skipping conversion" , rec_num , rval -> type );
835+ /* Check for ARRAY type that might not be properly handled by RA */
836+ if (rval -> o .type == MSGPACK_OBJECT_ARRAY ) {
837+ flb_plg_debug (ins , "Record %d: ARRAY type from record accessor, skipping conversion" , rec_num );
838+ CLEANUP_DYNAMIC_BUFFERS ();
839+ flb_ra_key_value_destroy (rval );
840+ emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
841+ continue ;
842+ }
843+ flb_plg_debug (ins , "Record %d: unsupported type %d (msgpack type %d), skipping conversion" , rec_num , rval -> type , rval -> o .type );
781844 CLEANUP_DYNAMIC_BUFFERS ();
782845 flb_ra_key_value_destroy (rval );
783846 emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
@@ -807,13 +870,14 @@ static int cb_lookup_filter(const void *data, size_t bytes,
807870 printed = 0 ;
808871 switch (rval -> type ) {
809872 case FLB_RA_BOOL :
873+ /* Note: MAP types are converted to boolean, but we already handled them in first pass */
810874 printed = snprintf (dynamic_val_buf , required_size + 1 , "%s" , rval -> o .via .boolean ? "true" : "false" );
811875 break ;
812876 case FLB_RA_INT :
813877 printed = snprintf (dynamic_val_buf , required_size + 1 , "%" PRId64 , rval -> o .via .i64 );
814878 break ;
815879 case FLB_RA_FLOAT :
816- printed = snprintf (dynamic_val_buf , required_size + 1 , "%f " , rval -> o .via .f64 );
880+ printed = snprintf (dynamic_val_buf , required_size + 1 , "%.15g " , rval -> o .via .f64 );
817881 break ;
818882 case FLB_RA_NULL :
819883 printed = snprintf (dynamic_val_buf , required_size + 1 , "null" );
@@ -921,8 +985,8 @@ static int cb_lookup_filter(const void *data, size_t bytes,
921985 for (i = 0 ; i < log_event .body -> via .map .size ; i ++ ) {
922986 msgpack_object_kv * kv = & log_event .body -> via .map .ptr [i ];
923987 if (kv -> key .type == MSGPACK_OBJECT_STR &&
924- kv -> key .via .str .size == strlen ( ctx -> result_key ) &&
925- strncmp (kv -> key .via .str .ptr , ctx -> result_key , kv -> key . via . str . size ) == 0 ) {
988+ kv -> key .via .str .size == ctx -> result_key_len &&
989+ memcmp (kv -> key .via .str .ptr , ctx -> result_key , ctx -> result_key_len ) == 0 ) {
926990 continue ;
927991 }
928992 ret = flb_log_event_encoder_append_body_values (& log_encoder ,
0 commit comments