3131#include <string.h>
3232#include <stdio.h>
3333#include <ctype.h>
34+ #ifndef _WIN32
3435#include <unistd.h>
36+ #else
37+ #include <io.h>
38+ #endif
3539#include <stdlib.h>
36- #include <stdbool.h>
3740#include <errno.h>
41+ #include <inttypes.h>
3842
3943#include "lookup.h"
4044
4145/* Macro to increment records metrics */
4246#ifdef FLB_HAVE_METRICS
4347#define INCREMENT_SKIPPED_METRIC (ctx , ins ) do { \
4448 uint64_t ts = cfl_time_now(); \
45- cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \
49+ static char* labels_array[1]; \
50+ labels_array[0] = (char*)flb_filter_name(ins); \
51+ cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \
4652 flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, ins->metrics); \
4753} while(0)
4854
4955#define INCREMENT_MATCHED_METRIC (ctx , ins ) do { \
5056 uint64_t ts = cfl_time_now(); \
51- cmt_counter_add(ctx->cmt_matched, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \
57+ static char* labels_array[1]; \
58+ labels_array[0] = (char*)flb_filter_name(ins); \
59+ cmt_counter_add(ctx->cmt_matched, ts, 1, 1, labels_array); \
5260 flb_metrics_sum(FLB_LOOKUP_METRIC_MATCHED, 1, ins->metrics); \
5361} while(0)
5462
5563#define INCREMENT_PROCESSED_METRIC (ctx , ins ) do { \
5664 uint64_t ts = cfl_time_now(); \
57- cmt_counter_add(ctx->cmt_processed, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \
65+ static char* labels_array[1]; \
66+ labels_array[0] = (char*)flb_filter_name(ins); \
67+ cmt_counter_add(ctx->cmt_processed, ts, 1, 1, labels_array); \
5868 flb_metrics_sum(FLB_LOOKUP_METRIC_PROCESSED, 1, ins->metrics); \
5969} while(0)
6070#else
@@ -71,7 +81,6 @@ struct val_node {
7181
7282/*
7383 * Trims leading/trailing whitespace and optionally normalizes to lower-case.
74- * Allocates output buffer (caller must free if output != input).
7584 */
7685static int normalize_and_trim (const char * input , size_t len , int ignore_case , char * * output , size_t * out_len )
7786{
@@ -99,13 +108,15 @@ static int normalize_and_trim(const char *input, size_t len, int ignore_case, ch
99108 return 0 ;
100109 }
101110 if (ignore_case ) {
102- char * buf = flb_malloc (n + 1 );
111+ char * buf ;
112+ size_t j ;
113+ buf = flb_malloc (n + 1 );
103114 if (!buf ) {
104115 * output = NULL ;
105116 * out_len = 0 ;
106117 return -1 ;
107118 }
108- for (size_t j = 0 ; j < n ; j ++ ) {
119+ for (j = 0 ; j < n ; j ++ ) {
109120 buf [j ] = tolower ((unsigned char )start [j ]);
110121 }
111122 buf [n ] = '\0' ;
@@ -235,6 +246,10 @@ static int load_csv(struct lookup_ctx *ctx)
235246{
236247 FILE * fp ;
237248 int line_num = 1 ;
249+ char * header_line ;
250+ char * line ;
251+ size_t line_length ;
252+
238253 fp = fopen (ctx -> file , "r" );
239254 if (!fp ) {
240255 flb_plg_error (ctx -> ins , "cannot open CSV file '%s': %s" , ctx -> file , strerror (errno ));
@@ -244,28 +259,39 @@ static int load_csv(struct lookup_ctx *ctx)
244259 mk_list_init (& ctx -> val_list );
245260
246261 /* Skip header using dynamic line reading */
247- char * header_line = read_line_dynamic (fp , NULL );
262+ header_line = read_line_dynamic (fp , NULL );
248263 if (!header_line ) {
249264 flb_plg_error (ctx -> ins , "empty CSV file: %s" , ctx -> file );
250265 fclose (fp );
251266 return -1 ;
252267 }
253268 flb_free (header_line ); /* Free the header line as we don't need it */
254269
255- char * line ;
256- size_t line_length ;
257270 while ((line = read_line_dynamic (fp , & line_length )) != NULL ) {
271+ char * p ;
272+ struct dynamic_buffer key_buf , val_buf ;
273+ int in_quotes ;
274+ int field ; /* 0=key, 1=val */
275+ char * key_ptr ;
276+ size_t key_len ;
277+ int key_ptr_allocated ;
278+ char * val_ptr ;
279+ size_t val_len ;
280+ int val_ptr_allocated ;
281+ char * val_heap ;
282+ int ret ;
283+ struct val_node * node ;
284+
258285 if (line_length == 0 ) {
259286 flb_free (line );
260287 line_num ++ ;
261288 continue ;
262289 }
263290
264291 /* Handle quotes in CSV files using dynamic buffers */
265- char * p = line ;
266- struct dynamic_buffer key_buf , val_buf ;
267- int in_quotes = 0 ;
268- int field = 0 ; /* 0=key, 1=val */
292+ p = line ;
293+ in_quotes = 0 ;
294+ field = 0 ;
269295
270296 /* Initialize dynamic buffers */
271297 if (dynbuf_init (& key_buf , 256 ) != 0 ) {
@@ -347,7 +373,7 @@ static int load_csv(struct lookup_ctx *ctx)
347373 if (in_quotes ) {
348374 if (* p == '"' ) {
349375 if (* (p + 1 ) == '"' ) {
350- // Escaped quote
376+ /* Escaped quote */
351377 if (dynbuf_append_char (& val_buf , '"' ) != 0 ) {
352378 flb_plg_error (ctx -> ins , "Failed to append to value buffer for line %d" , line_num );
353379 dynbuf_destroy (& key_buf );
@@ -401,9 +427,9 @@ static int load_csv(struct lookup_ctx *ctx)
401427 }
402428
403429 /* Normalize and trim key */
404- char * key_ptr = NULL ;
405- size_t key_len = 0 ;
406- int key_ptr_allocated = normalize_and_trim (key_buf .data , key_buf .len , ctx -> ignore_case , & key_ptr , & key_len );
430+ key_ptr = NULL ;
431+ key_len = 0 ;
432+ key_ptr_allocated = normalize_and_trim (key_buf .data , key_buf .len , ctx -> ignore_case , & key_ptr , & key_len );
407433 if (key_ptr_allocated < 0 ) {
408434 dynbuf_destroy (& key_buf );
409435 dynbuf_destroy (& val_buf );
@@ -412,9 +438,9 @@ static int load_csv(struct lookup_ctx *ctx)
412438 continue ;
413439 }
414440 /* Normalize and trim value */
415- char * val_ptr = NULL ;
416- size_t val_len = 0 ;
417- int val_ptr_allocated = normalize_and_trim (val_buf .data , val_buf .len , 0 , & val_ptr , & val_len );
441+ val_ptr = NULL ;
442+ val_len = 0 ;
443+ val_ptr_allocated = normalize_and_trim (val_buf .data , val_buf .len , 0 , & val_ptr , & val_len );
418444 if (val_ptr_allocated < 0 ) {
419445 if (key_ptr_allocated ) flb_free (key_ptr );
420446 dynbuf_destroy (& key_buf );
@@ -433,7 +459,7 @@ static int load_csv(struct lookup_ctx *ctx)
433459 continue ;
434460 }
435461 /* Explicitly duplicate value buffer for hash table safety, allocate +1 for null terminator */
436- char * val_heap = flb_malloc (val_len + 1 );
462+ val_heap = flb_malloc (val_len + 1 );
437463 if (!val_heap ) {
438464 if (key_ptr_allocated ) flb_free (key_ptr );
439465 if (val_ptr_allocated ) flb_free (val_ptr );
@@ -445,7 +471,7 @@ static int load_csv(struct lookup_ctx *ctx)
445471 }
446472 memcpy (val_heap , val_ptr , val_len );
447473 val_heap [val_len ] = '\0' ;
448- int ret = flb_hash_table_add (ctx -> ht , key_ptr , key_len , val_heap , val_len );
474+ ret = flb_hash_table_add (ctx -> ht , key_ptr , key_len , val_heap , val_len );
449475 if (ret < 0 ) {
450476 flb_free (val_heap );
451477 flb_plg_warn (ctx -> ins , "Failed to add key '%.*s' (duplicate or error), skipping" , (int )key_len , key_ptr );
@@ -458,7 +484,7 @@ static int load_csv(struct lookup_ctx *ctx)
458484 continue ;
459485 }
460486 /* Track allocated value for later cleanup */
461- struct val_node * node = flb_malloc (sizeof (struct val_node ));
487+ node = flb_malloc (sizeof (struct val_node ));
462488 if (node ) {
463489 node -> val = val_heap ;
464490 mk_list_add (& node -> _head , & ctx -> val_list );
@@ -502,20 +528,23 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
502528
503529#ifdef FLB_HAVE_METRICS
504530 /* Initialize CMT metrics */
505- ctx -> cmt_processed = cmt_counter_create (ins -> cmt ,
506- "fluentbit" , "filter" , "lookup_processed_records_total" ,
507- "Total number of processed records" ,
508- 1 , (char * []) {"name" });
531+ {
532+ static char * labels_name [] = {"name" };
533+ ctx -> cmt_processed = cmt_counter_create (ins -> cmt ,
534+ "fluentbit" , "filter" , "lookup_processed_records_total" ,
535+ "Total number of processed records" ,
536+ 1 , labels_name );
509537
510- ctx -> cmt_matched = cmt_counter_create (ins -> cmt ,
511- "fluentbit" , "filter" , "lookup_matched_records_total" ,
512- "Total number of matched records" ,
513- 1 , ( char * []) { "name" } );
538+ ctx -> cmt_matched = cmt_counter_create (ins -> cmt ,
539+ "fluentbit" , "filter" , "lookup_matched_records_total" ,
540+ "Total number of matched records" ,
541+ 1 , labels_name );
514542
515- ctx -> cmt_skipped = cmt_counter_create (ins -> cmt ,
516- "fluentbit" , "filter" , "lookup_skipped_records_total" ,
517- "Total number of skipped records due to errors" ,
518- 1 , (char * []) {"name" });
543+ ctx -> cmt_skipped = cmt_counter_create (ins -> cmt ,
544+ "fluentbit" , "filter" , "lookup_skipped_records_total" ,
545+ "Total number of skipped records due to errors" ,
546+ 1 , labels_name );
547+ }
519548
520549 /* Add to old metrics system */
521550 flb_metrics_add (FLB_LOOKUP_METRIC_PROCESSED , "processed_records_total" , ins -> metrics );
@@ -543,7 +572,11 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
543572 }
544573
545574 /* Check file existence and readability */
575+ #ifdef _WIN32
576+ if (_access (ctx -> file , 04 ) != 0 ) { /* 04 = R_OK on Windows */
577+ #else
546578 if (access (ctx -> file , R_OK ) != 0 ) {
579+ #endif
547580 flb_plg_error (ins , "CSV file '%s' does not exist or is not readable: %s" , ctx -> file , strerror (errno ));
548581 goto error ;
549582 }
@@ -570,7 +603,7 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
570603 if (ret < 0 ) {
571604 goto error ;
572605 }
573- flb_plg_info (ins , "Loaded %d entries from CSV file '%s'" , (int )ctx -> ht -> total_count , ctx -> file );
606+ flb_plg_info (ins , "Loaded %zu entries from CSV file '%s'" , (size_t )ctx -> ht -> total_count , ctx -> file );
574607 flb_plg_info (ins , "Lookup filter initialized: lookup_key='%s', result_key='%s', ignore_case=%s" ,
575608 ctx -> lookup_key , ctx -> result_key , ctx -> ignore_case ? "true" : "false" );
576609
@@ -643,7 +676,7 @@ static int cb_lookup_filter(const void *data, size_t bytes,
643676 char * lookup_val_str = NULL ;
644677 size_t lookup_val_len = 0 ;
645678 int lookup_val_allocated = 0 ;
646- bool any_modified = false ; /* Track if any records were modified */
679+ int any_modified = 0 ; /* Track if any records were modified */
647680
648681 /* Ensure context is valid */
649682 if (!ctx ) {
@@ -668,12 +701,18 @@ static int cb_lookup_filter(const void *data, size_t bytes,
668701
669702 /* Process each log event in the input batch */
670703 while ((ret = flb_log_event_decoder_next (& log_decoder , & log_event )) == FLB_EVENT_DECODER_SUCCESS ) {
704+ char * dynamic_val_buf ; /* Track dynamic buffer for numeric conversions */
705+ int required_size ;
706+ int printed ;
707+ int ht_get_ret ;
708+ struct flb_ra_value * rval ;
709+
671710 rec_num ++ ;
672711 INCREMENT_PROCESSED_METRIC (ctx , ins );
673712 lookup_val_str = NULL ;
674713 lookup_val_len = 0 ;
675714 lookup_val_allocated = 0 ;
676- char * dynamic_val_buf = NULL ; /* Track dynamic buffer for numeric conversions */
715+ dynamic_val_buf = NULL ;
677716
678717 /* Helper macro to clean up dynamic buffer and allocated lookup strings */
679718 #define CLEANUP_DYNAMIC_BUFFERS () do { \
@@ -695,7 +734,7 @@ static int cb_lookup_filter(const void *data, size_t bytes,
695734 }
696735
697736 /* Use record accessor to get the lookup value */
698- struct flb_ra_value * rval = flb_ra_get_value_object (ctx -> ra_lookup_key , * log_event .body );
737+ rval = flb_ra_get_value_object (ctx -> ra_lookup_key , * log_event .body );
699738 if (!rval ) {
700739 /* Key not found, emit original record */
701740 emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
@@ -714,7 +753,7 @@ static int cb_lookup_filter(const void *data, size_t bytes,
714753 }
715754 else {
716755 /* Non-string value: convert to string using two-pass dynamic allocation */
717- int required_size = 0 ;
756+ required_size = 0 ;
718757
719758 /* First pass: determine required buffer size */
720759 switch (rval -> type ) {
@@ -756,7 +795,7 @@ static int cb_lookup_filter(const void *data, size_t bytes,
756795 /* Allocate buffer with required size plus null terminator */
757796 dynamic_val_buf = flb_malloc (required_size + 1 );
758797 if (!dynamic_val_buf ) {
759- flb_plg_warn (ins , "Record %d: malloc failed for dynamic value buffer (size %d ), skipping" , rec_num , required_size + 1 );
798+ flb_plg_warn (ins , "Record %d: malloc failed for dynamic value buffer (size %zu ), skipping" , rec_num , ( size_t )( required_size + 1 ) );
760799 INCREMENT_SKIPPED_METRIC (ctx , ins );
761800 CLEANUP_DYNAMIC_BUFFERS ();
762801 flb_ra_key_value_destroy (rval );
@@ -765,7 +804,7 @@ static int cb_lookup_filter(const void *data, size_t bytes,
765804 }
766805
767806 /* Second pass: write to allocated buffer */
768- int printed = 0 ;
807+ printed = 0 ;
769808 switch (rval -> type ) {
770809 case FLB_RA_BOOL :
771810 printed = snprintf (dynamic_val_buf , required_size + 1 , "%s" , rval -> o .via .boolean ? "true" : "false" );
@@ -826,7 +865,7 @@ static int cb_lookup_filter(const void *data, size_t bytes,
826865 * Attempt to find the lookup value in the hash table.
827866 * If not found, emit the original record unchanged.
828867 */
829- int ht_get_ret = flb_hash_table_get (ctx -> ht , lookup_val_str , lookup_val_len , & found_val , & found_len );
868+ ht_get_ret = flb_hash_table_get (ctx -> ht , lookup_val_str , lookup_val_len , & found_val , & found_len );
830869
831870 if (ht_get_ret < 0 || !found_val || found_len == 0 ) {
832871 /* Not found, emit original record */
@@ -838,7 +877,7 @@ static int cb_lookup_filter(const void *data, size_t bytes,
838877
839878 /* Match found - increment counter */
840879 INCREMENT_MATCHED_METRIC (ctx , ins );
841- any_modified = true ; /* Mark that we have modified records */
880+ any_modified = 1 ; /* Mark that we have modified records */
842881
843882 flb_plg_trace (ins , "Record %d: Found match for '%.*s' -> '%.*s'" ,
844883 rec_num , (int )lookup_val_len , lookup_val_str , (int )found_len , (char * )found_val );
0 commit comments