45
45
/* Macro to increment records metrics */
46
46
#ifdef FLB_HAVE_METRICS
47
47
#define INCREMENT_SKIPPED_METRIC (ctx , ins ) do { \
48
- uint64_t ts = cfl_time_now(); \
49
- static char* labels_array[1]; \
50
- labels_array[0] = (char*)flb_filter_name(ins); \
51
- cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \
48
+ if (ctx->cmt_skipped) { \
49
+ uint64_t ts = cfl_time_now(); \
50
+ char* labels_array[1] = {(char*)flb_filter_name(ins)}; \
51
+ cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \
52
+ } \
52
53
flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, ins->metrics); \
53
54
} while(0)
54
55
55
56
#define INCREMENT_MATCHED_METRIC (ctx , ins ) do { \
56
- uint64_t ts = cfl_time_now(); \
57
- static char* labels_array[1]; \
58
- labels_array[0] = (char*)flb_filter_name(ins); \
59
- cmt_counter_add(ctx->cmt_matched, ts, 1, 1, labels_array); \
57
+ if (ctx->cmt_matched) { \
58
+ uint64_t ts = cfl_time_now(); \
59
+ char* labels_array[1] = {(char*)flb_filter_name(ins)}; \
60
+ cmt_counter_add(ctx->cmt_matched, ts, 1, 1, labels_array); \
61
+ } \
60
62
flb_metrics_sum(FLB_LOOKUP_METRIC_MATCHED, 1, ins->metrics); \
61
63
} while(0)
62
64
63
65
#define INCREMENT_PROCESSED_METRIC (ctx , ins ) do { \
64
- uint64_t ts = cfl_time_now(); \
65
- static char* labels_array[1]; \
66
- labels_array[0] = (char*)flb_filter_name(ins); \
67
- cmt_counter_add(ctx->cmt_processed, ts, 1, 1, labels_array); \
66
+ if (ctx->cmt_processed) { \
67
+ uint64_t ts = cfl_time_now(); \
68
+ char* labels_array[1] = {(char*)flb_filter_name(ins)}; \
69
+ cmt_counter_add(ctx->cmt_processed, ts, 1, 1, labels_array); \
70
+ } \
68
71
flb_metrics_sum(FLB_LOOKUP_METRIC_PROCESSED, 1, ins->metrics); \
69
72
} while(0)
70
73
#else
@@ -84,20 +87,26 @@ struct val_node {
84
87
*/
85
88
static int normalize_and_trim (const char * input , size_t len , int ignore_case , char * * output , size_t * out_len )
86
89
{
90
+ const char * start ;
91
+ const char * end ;
92
+ size_t n ;
93
+ char * buf ;
94
+ size_t j ;
95
+
87
96
if (!input || len == 0 ) {
88
97
* output = NULL ;
89
98
* out_len = 0 ;
90
99
return 0 ;
91
100
}
92
101
/* Trim leading whitespace */
93
- const char * start = input ;
94
- size_t n = len ;
102
+ start = input ;
103
+ n = len ;
95
104
while (n > 0 && isspace ((unsigned char )* start )) {
96
105
start ++ ;
97
106
n -- ;
98
107
}
99
108
/* Trim trailing whitespace */
100
- const char * end = start + n ;
109
+ end = start + n ;
101
110
while (n > 0 && isspace ((unsigned char )* (end - 1 ))) {
102
111
end -- ;
103
112
n -- ;
@@ -108,8 +117,6 @@ static int normalize_and_trim(const char *input, size_t len, int ignore_case, ch
108
117
return 0 ;
109
118
}
110
119
if (ignore_case ) {
111
- char * buf ;
112
- size_t j ;
113
120
buf = flb_malloc (n + 1 );
114
121
if (!buf ) {
115
122
* output = NULL ;
@@ -153,10 +160,13 @@ static int dynbuf_init(struct dynamic_buffer *buf, size_t initial_capacity)
153
160
/* Append a character to dynamic buffer, growing if necessary */
154
161
static int dynbuf_append_char (struct dynamic_buffer * buf , char c )
155
162
{
163
+ size_t new_capacity ;
164
+ char * new_data ;
165
+
156
166
/* Ensure we have space for the character plus null terminator */
157
167
if (buf -> len + 1 >= buf -> capacity ) {
158
- size_t new_capacity = buf -> capacity * 2 ;
159
- char * new_data = flb_realloc (buf -> data , new_capacity );
168
+ new_capacity = buf -> capacity * 2 ;
169
+ new_data = flb_realloc (buf -> data , new_capacity );
160
170
if (!new_data ) {
161
171
return -1 ;
162
172
}
@@ -182,20 +192,27 @@ static void dynbuf_destroy(struct dynamic_buffer *buf)
182
192
/* Read a line of arbitrary length from file using dynamic allocation */
183
193
static char * read_line_dynamic (FILE * fp , size_t * line_length )
184
194
{
185
- size_t capacity = 256 ; /* Initial capacity */
186
- size_t len = 0 ;
187
- char * line = flb_malloc ( capacity ) ;
195
+ size_t capacity ;
196
+ size_t len ;
197
+ char * line ;
188
198
int c ;
199
+ size_t new_capacity ;
200
+ char * new_line ;
189
201
202
+ /* Initialize variables after declaration */
203
+ capacity = 256 ; /* Initial capacity */
204
+ len = 0 ;
205
+
206
+ line = flb_malloc (capacity );
190
207
if (!line ) {
191
208
return NULL ;
192
209
}
193
210
194
211
while ((c = fgetc (fp )) != EOF ) {
195
212
/* Check if we need to grow the buffer */
196
213
if (len + 1 >= capacity ) {
197
- size_t new_capacity = capacity * 2 ;
198
- char * new_line = flb_realloc (line , new_capacity );
214
+ new_capacity = capacity * 2 ;
215
+ new_line = flb_realloc (line , new_capacity );
199
216
if (!new_line ) {
200
217
flb_free (line );
201
218
return NULL ;
@@ -221,7 +238,7 @@ static char *read_line_dynamic(FILE *fp, size_t *line_length)
221
238
222
239
/* Null terminate the string */
223
240
if (len >= capacity ) {
224
- char * new_line = flb_realloc (line , len + 1 );
241
+ new_line = flb_realloc (line , len + 1 );
225
242
if (!new_line ) {
226
243
flb_free (line );
227
244
return NULL ;
@@ -246,6 +263,7 @@ static int load_csv(struct lookup_ctx *ctx)
246
263
{
247
264
FILE * fp ;
248
265
int line_num = 1 ;
266
+ int loaded_entries = 0 ; /* Track loaded entries count */
249
267
char * header_line ;
250
268
char * line ;
251
269
size_t line_length ;
@@ -362,6 +380,16 @@ static int load_csv(struct lookup_ctx *ctx)
362
380
p ++ ;
363
381
}
364
382
383
+ /* Check for unmatched quote after key parsing */
384
+ if (in_quotes ) {
385
+ flb_plg_error (ctx -> ins , "Unmatched opening quote in key at line %d, skipping malformed line" , line_num );
386
+ dynbuf_destroy (& key_buf );
387
+ dynbuf_destroy (& val_buf );
388
+ flb_free (line );
389
+ line_num ++ ;
390
+ goto next_line ;
391
+ }
392
+
365
393
/* Parse value from second column (handle quotes) */
366
394
in_quotes = 0 ;
367
395
while (* p && (field == 1 )) {
@@ -471,9 +499,30 @@ static int load_csv(struct lookup_ctx *ctx)
471
499
}
472
500
memcpy (val_heap , val_ptr , val_len );
473
501
val_heap [val_len ] = '\0' ;
502
+
503
+ /* Allocate and initialize val_node first to track allocated value for cleanup */
504
+ node = flb_malloc (sizeof (struct val_node ));
505
+ if (!node ) {
506
+ flb_free (val_heap );
507
+ flb_plg_warn (ctx -> ins , "Failed to allocate val_node for value cleanup, skipping" );
508
+ if (key_ptr_allocated ) flb_free (key_ptr );
509
+ if (val_ptr_allocated ) flb_free (val_ptr );
510
+ dynbuf_destroy (& key_buf );
511
+ dynbuf_destroy (& val_buf );
512
+ flb_free (line );
513
+ line_num ++ ;
514
+ continue ;
515
+ }
516
+ node -> val = val_heap ;
517
+ mk_list_add (& node -> _head , & ctx -> val_list );
518
+
519
+ /* Now add to hash table - if this fails, val_heap is still tracked in val_list */
474
520
ret = flb_hash_table_add (ctx -> ht , key_ptr , key_len , val_heap , val_len );
475
521
if (ret < 0 ) {
522
+ /* Remove from val_list and free the node since hash table add failed */
523
+ mk_list_del (& node -> _head );
476
524
flb_free (val_heap );
525
+ flb_free (node );
477
526
flb_plg_warn (ctx -> ins , "Failed to add key '%.*s' (duplicate or error), skipping" , (int )key_len , key_ptr );
478
527
if (key_ptr_allocated ) flb_free (key_ptr );
479
528
if (val_ptr_allocated ) flb_free (val_ptr );
@@ -483,15 +532,8 @@ static int load_csv(struct lookup_ctx *ctx)
483
532
line_num ++ ;
484
533
continue ;
485
534
}
486
- /* Track allocated value for later cleanup */
487
- node = flb_malloc (sizeof (struct val_node ));
488
- if (node ) {
489
- node -> val = val_heap ;
490
- mk_list_add (& node -> _head , & ctx -> val_list );
491
- } else {
492
- /* If malloc fails, value will leak, but plugin will still function */
493
- flb_plg_warn (ctx -> ins , "Failed to allocate val_node for value cleanup, value will leak" );
494
- }
535
+ /* Successfully loaded entry */
536
+ loaded_entries ++ ;
495
537
/* Do not free val_heap; hash table owns it now */
496
538
if (key_ptr_allocated ) flb_free (key_ptr );
497
539
if (val_ptr_allocated ) flb_free (val_ptr );
@@ -506,7 +548,7 @@ static int load_csv(struct lookup_ctx *ctx)
506
548
continue ;
507
549
}
508
550
fclose (fp );
509
- return 0 ;
551
+ return loaded_entries ; /* Return count of successfully loaded entries */
510
552
}
511
553
512
554
static int cb_lookup_init (struct flb_filter_instance * ins ,
@@ -534,16 +576,25 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
534
576
"fluentbit" , "filter" , "lookup_processed_records_total" ,
535
577
"Total number of processed records" ,
536
578
1 , labels_name );
579
+ if (!ctx -> cmt_processed ) {
580
+ flb_plg_warn (ins , "failed to create processed_records_total counter" );
581
+ }
537
582
538
583
ctx -> cmt_matched = cmt_counter_create (ins -> cmt ,
539
584
"fluentbit" , "filter" , "lookup_matched_records_total" ,
540
585
"Total number of matched records" ,
541
586
1 , labels_name );
587
+ if (!ctx -> cmt_matched ) {
588
+ flb_plg_warn (ins , "failed to create matched_records_total counter" );
589
+ }
542
590
543
591
ctx -> cmt_skipped = cmt_counter_create (ins -> cmt ,
544
592
"fluentbit" , "filter" , "lookup_skipped_records_total" ,
545
593
"Total number of skipped records due to errors" ,
546
594
1 , labels_name );
595
+ if (!ctx -> cmt_skipped ) {
596
+ flb_plg_warn (ins , "failed to create skipped_records_total counter" );
597
+ }
547
598
}
548
599
549
600
/* Add to old metrics system */
@@ -571,6 +622,9 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
571
622
goto error ;
572
623
}
573
624
625
+ /* Precompute result_key length for hot path optimization */
626
+ ctx -> result_key_len = strlen (ctx -> result_key );
627
+
574
628
/* Check file existence and readability */
575
629
#ifdef _WIN32
576
630
if (_access (ctx -> file , 04 ) != 0 ) { /* 04 = R_OK on Windows */
@@ -603,7 +657,7 @@ static int cb_lookup_init(struct flb_filter_instance *ins,
603
657
if (ret < 0 ) {
604
658
goto error ;
605
659
}
606
- flb_plg_info (ins , "Loaded %zu entries from CSV file '%s'" , ( size_t ) ctx -> ht -> total_count , ctx -> file );
660
+ flb_plg_info (ins , "Loaded %d entries from CSV file '%s'" , ret , ctx -> file );
607
661
flb_plg_info (ins , "Lookup filter initialized: lookup_key='%s', result_key='%s', ignore_case=%s" ,
608
662
ctx -> lookup_key , ctx -> result_key , ctx -> ignore_case ? "true" : "false" );
609
663
@@ -758,26 +812,35 @@ static int cb_lookup_filter(const void *data, size_t bytes,
758
812
/* First pass: determine required buffer size */
759
813
switch (rval -> type ) {
760
814
case FLB_RA_BOOL :
815
+ /* Check if this boolean was converted from a MAP type */
816
+ if (rval -> o .type == MSGPACK_OBJECT_MAP ) {
817
+ flb_plg_debug (ins , "Record %d: MAP type from record accessor, skipping conversion" , rec_num );
818
+ CLEANUP_DYNAMIC_BUFFERS ();
819
+ flb_ra_key_value_destroy (rval );
820
+ emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
821
+ continue ;
822
+ }
761
823
required_size = snprintf (NULL , 0 , "%s" , rval -> o .via .boolean ? "true" : "false" );
762
824
break ;
763
825
case FLB_RA_INT :
764
826
required_size = snprintf (NULL , 0 , "%" PRId64 , rval -> o .via .i64 );
765
827
break ;
766
828
case FLB_RA_FLOAT :
767
- required_size = snprintf (NULL , 0 , "%f " , rval -> o .via .f64 );
829
+ required_size = snprintf (NULL , 0 , "%.15g " , rval -> o .via .f64 );
768
830
break ;
769
831
case FLB_RA_NULL :
770
832
required_size = snprintf (NULL , 0 , "null" );
771
833
break ;
772
- case 5 :
773
- case 6 :
774
- flb_plg_debug (ins , "Record %d: complex type (ARRAY/MAP) from record accessor, skipping conversion" , rec_num );
775
- CLEANUP_DYNAMIC_BUFFERS ();
776
- flb_ra_key_value_destroy (rval );
777
- emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
778
- continue ;
779
834
default :
780
- flb_plg_debug (ins , "Record %d: unsupported type %d, skipping conversion" , rec_num , rval -> type );
835
+ /* Check for ARRAY type that might not be properly handled by RA */
836
+ if (rval -> o .type == MSGPACK_OBJECT_ARRAY ) {
837
+ flb_plg_debug (ins , "Record %d: ARRAY type from record accessor, skipping conversion" , rec_num );
838
+ CLEANUP_DYNAMIC_BUFFERS ();
839
+ flb_ra_key_value_destroy (rval );
840
+ emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
841
+ continue ;
842
+ }
843
+ flb_plg_debug (ins , "Record %d: unsupported type %d (msgpack type %d), skipping conversion" , rec_num , rval -> type , rval -> o .type );
781
844
CLEANUP_DYNAMIC_BUFFERS ();
782
845
flb_ra_key_value_destroy (rval );
783
846
emit_original_record (& log_encoder , & log_event , ins , ctx , rec_num );
@@ -807,13 +870,14 @@ static int cb_lookup_filter(const void *data, size_t bytes,
807
870
printed = 0 ;
808
871
switch (rval -> type ) {
809
872
case FLB_RA_BOOL :
873
+ /* Note: MAP types are converted to boolean, but we already handled them in first pass */
810
874
printed = snprintf (dynamic_val_buf , required_size + 1 , "%s" , rval -> o .via .boolean ? "true" : "false" );
811
875
break ;
812
876
case FLB_RA_INT :
813
877
printed = snprintf (dynamic_val_buf , required_size + 1 , "%" PRId64 , rval -> o .via .i64 );
814
878
break ;
815
879
case FLB_RA_FLOAT :
816
- printed = snprintf (dynamic_val_buf , required_size + 1 , "%f " , rval -> o .via .f64 );
880
+ printed = snprintf (dynamic_val_buf , required_size + 1 , "%.15g " , rval -> o .via .f64 );
817
881
break ;
818
882
case FLB_RA_NULL :
819
883
printed = snprintf (dynamic_val_buf , required_size + 1 , "null" );
@@ -921,8 +985,8 @@ static int cb_lookup_filter(const void *data, size_t bytes,
921
985
for (i = 0 ; i < log_event .body -> via .map .size ; i ++ ) {
922
986
msgpack_object_kv * kv = & log_event .body -> via .map .ptr [i ];
923
987
if (kv -> key .type == MSGPACK_OBJECT_STR &&
924
- kv -> key .via .str .size == strlen ( ctx -> result_key ) &&
925
- strncmp (kv -> key .via .str .ptr , ctx -> result_key , kv -> key . via . str . size ) == 0 ) {
988
+ kv -> key .via .str .size == ctx -> result_key_len &&
989
+ memcmp (kv -> key .via .str .ptr , ctx -> result_key , ctx -> result_key_len ) == 0 ) {
926
990
continue ;
927
991
}
928
992
ret = flb_log_event_encoder_append_body_values (& log_encoder ,
0 commit comments