From 76d0d4fd804ae30a197f0d7959e199cd36e63363 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Jul 2025 16:51:52 +0900 Subject: [PATCH 01/19] config: Add parameter for multiline limitation Signed-off-by: Hiroshi Hatake Co-authored-by: Eduardo Silva --- include/fluent-bit/flb_config.h | 4 ++++ src/flb_config.c | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index b6090eea537..35365884a1b 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -145,6 +145,7 @@ struct flb_config { /* Multiline core parser definitions */ struct mk_list multiline_parsers; + size_t multiline_buffer_limit; /* limit for multiline concatenated data */ /* Outputs instances */ struct mk_list outputs; /* list of output plugins */ @@ -408,6 +409,9 @@ enum conf_type { /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" +/* Multiline */ +#define FLB_CONF_STR_MULTILINE_BUFFER_LIMIT "multiline_buffer_limit" + /* Scheduler */ #define FLB_CONF_STR_SCHED_CAP "scheduler.cap" #define FLB_CONF_STR_SCHED_BASE "scheduler.base" diff --git a/src/flb_config.c b/src/flb_config.c index 407464b4942..567848f7633 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -169,6 +169,10 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_INT, offsetof(struct flb_config, coro_stack_size)}, + {FLB_CONF_STR_MULTILINE_BUFFER_LIMIT, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, multiline_buffer_limit)}, + /* Scheduler */ {FLB_CONF_STR_SCHED_CAP, FLB_CONF_TYPE_INT, @@ -361,6 +365,7 @@ struct flb_config *flb_config_init() * on we use flb_config_exit to cleanup the config, which requires * the config->multiline_parsers list to be initialized. */ mk_list_init(&config->multiline_parsers); + config->multiline_buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; /* Task map */ ret = flb_config_task_map_resize(config, FLB_CONFIG_DEFAULT_TASK_MAP_SIZE); From 503cdbdf3a86e4be5e8920b13888e4423a74b9e6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Jul 2025 17:27:54 +0900 Subject: [PATCH 02/19] ml: group: stream: Prepare to handle limitations of multiline Signed-off-by: Hiroshi Hatake Co-authored-by: Eduardo Silva --- include/fluent-bit/multiline/flb_ml.h | 13 ++++++++ include/fluent-bit/multiline/flb_ml_group.h | 8 +++++ src/multiline/flb_ml_group.c | 34 +++++++++++++++++++++ src/multiline/flb_ml_stream.c | 3 ++ 4 files changed, 58 insertions(+) diff --git a/include/fluent-bit/multiline/flb_ml.h b/include/fluent-bit/multiline/flb_ml.h index baa87bdb8e0..94cf2e8fd0b 100644 --- a/include/fluent-bit/multiline/flb_ml.h +++ b/include/fluent-bit/multiline/flb_ml.h @@ -52,6 +52,13 @@ /* Default multiline buffer size: 4Kb */ #define FLB_ML_BUF_SIZE 1024*4 +/* Default limit for concatenated multiline messages: 2MB */ +#define FLB_ML_BUFFER_LIMIT_DEFAULT (1024 * 1024 * 2) + +/* Return codes */ +#define FLB_MULTILINE_OK 0 +#define FLB_MULTILINE_TRUNCATED 1 + /* Maximum number of groups per stream */ #define FLB_ML_MAX_GROUPS 6 @@ -107,6 +114,9 @@ struct flb_ml_stream_group { msgpack_packer mp_pck; /* temporary msgpack packer */ struct flb_time mp_time; /* multiline time parsed from first line */ + /* parent stream reference */ + struct flb_ml_stream *stream; + struct mk_list _head; }; @@ -275,6 +285,9 @@ struct flb_ml { struct flb_log_event_encoder log_event_encoder; struct flb_log_event_decoder log_event_decoder; struct flb_config *config; /* Fluent Bit context */ + + /* Limit for concatenated multiline messages */ + size_t buffer_limit; }; struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name); diff --git a/include/fluent-bit/multiline/flb_ml_group.h b/include/fluent-bit/multiline/flb_ml_group.h index d035160884e..7a19bb7bbf4 100644 --- a/include/fluent-bit/multiline/flb_ml_group.h +++ b/include/fluent-bit/multiline/flb_ml_group.h @@ -28,4 +28,12 @@ struct flb_ml_group *flb_ml_group_create(struct flb_ml *ml); void flb_ml_group_destroy(struct flb_ml_group *group); int flb_ml_group_add_parser(struct flb_ml *ctx, struct flb_ml_parser_ins *p); +/* + * Append data to a multiline stream group respecting the configured + * buffer limit. The length of the appended data might be reduced if + * the limit is reached. + */ +int flb_ml_group_cat(struct flb_ml_stream_group *group, + const char *data, size_t len); + #endif diff --git a/src/multiline/flb_ml_group.c b/src/multiline/flb_ml_group.c index d12caac0ba6..39779115da6 100644 --- a/src/multiline/flb_ml_group.c +++ b/src/multiline/flb_ml_group.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -84,3 +85,36 @@ void flb_ml_group_destroy(struct flb_ml_group *group) mk_list_del(&group->_head); flb_free(group); } + +int flb_ml_group_cat(struct flb_ml_stream_group *group, + const char *data, size_t len) +{ + size_t avail; + size_t limit; + int ret; + int status = FLB_MULTILINE_OK; + + limit = group->stream->ml->buffer_limit; + if (limit > 0) { + if (flb_sds_len(group->buf) >= limit) { + return FLB_MULTILINE_TRUNCATED; + } + + avail = limit - flb_sds_len(group->buf); + if (len > avail) { + len = avail; + status = FLB_MULTILINE_TRUNCATED; + } + } + + if (len == 0) { + return status; + } + + ret = flb_sds_cat_safe(&group->buf, data, len); + if (ret == -1) { + return -1; + } + + return status; +} diff --git a/src/multiline/flb_ml_stream.c b/src/multiline/flb_ml_stream.c index ded0cba8e43..615a0123481 100644 --- a/src/multiline/flb_ml_stream.c +++ b/src/multiline/flb_ml_stream.c @@ -79,6 +79,9 @@ static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst msgpack_sbuffer_init(&group->mp_sbuf); msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write); + /* parent stream reference */ + group->stream = mst; + mk_list_add(&group->_head, &mst->groups); return group; From 062bfa3f0045b7708d8af28ae9eeec94c929bcd4 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Jul 2025 17:54:28 +0900 Subject: [PATCH 03/19] ml: Process truncations Signed-off-by: Hiroshi Hatake Co-authored-by: Eduardo Silva --- src/multiline/flb_ml.c | 372 ++++++++++++++++------------------------- 1 file changed, 147 insertions(+), 225 deletions(-) diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 99e3d126a46..5212cb52f2c 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -410,10 +410,8 @@ static int process_append(struct flb_ml_parser_ins *parser_i, /* Lookup the key */ if (type == FLB_ML_TYPE_TEXT) { ret = package_content(mst, NULL, NULL, buf, size, tm, NULL, NULL, NULL); - if (ret == FLB_FALSE) { - return -1; - } - return 0; + /* Return the raw status of text type of result. */ + return ret; } else if (type == FLB_ML_TYPE_MAP) { full_map = obj; @@ -599,9 +597,12 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser, tm, buf, size, map, &out_buf, &out_size, &release, &out_time); - if (ret < 0) { - return -1; - } + /* + * Do not return -1 here. If the sub-parser fails, we should + * still attempt to process the raw text with multiline rules. + * The 'ret' variable is not used beyond this point, so we can + * safely ignore a failure here and let the multiline rules decide. + */ break; case FLB_ML_TYPE_MAP: ret = ml_append_try_parser_type_map(parser, stream_id, &type, @@ -649,7 +650,7 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser, flb_free(out_buf); } - return 0; + return ret; } int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id, @@ -657,101 +658,93 @@ int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id, { int ret; int processed = FLB_FALSE; + int status = FLB_MULTILINE_OK; struct mk_list *head; struct mk_list *head_group; - struct flb_ml_group *group; - struct flb_ml_stream *mst; + struct flb_ml_group *group = NULL; struct flb_ml_parser_ins *lru_parser = NULL; - struct flb_ml_parser_ins *parser_i; - struct flb_time out_time; + struct flb_ml_parser_ins *parser_i = NULL; + struct flb_ml_stream *mst; struct flb_ml_stream_group *st_group; - int type; - - type = FLB_ML_TYPE_TEXT; - - flb_time_zero(&out_time); + int type = FLB_ML_TYPE_TEXT; mk_list_foreach(head, &ml->groups) { group = mk_list_entry(head, struct flb_ml_group, _head); - /* Check if the incoming data matches the last recently used parser */ lru_parser = group->lru_parser; - if (lru_parser && lru_parser->last_stream_id == stream_id) { ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, tm, buf, size, NULL, NULL); - if (ret == 0) { + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } processed = FLB_TRUE; - break; - } - else { - flb_ml_flush_parser_instance(ml, - lru_parser, - lru_parser->last_stream_id, - FLB_FALSE); + goto done; /* Use goto to break out of nested loops */ } } - else if (lru_parser && lru_parser->last_stream_id > 0) { - /* - * Clear last recently used parser to match new parser. - * Do not flush last_stream_id since it should continue to parsing. - */ - lru_parser = NULL; - } } - mk_list_foreach(head_group, &group->parsers) { - parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - if (lru_parser && lru_parser == parser_i && - lru_parser->last_stream_id == stream_id) { - continue; - } + if (!processed) { + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + lru_parser = group->lru_parser; + + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + if (lru_parser && lru_parser == parser_i && + lru_parser->last_stream_id == stream_id) { + continue; + } - ret = ml_append_try_parser(parser_i, stream_id, type, - tm, buf, size, NULL, NULL); - if (ret == 0) { - group->lru_parser = parser_i; - group->lru_parser->last_stream_id = stream_id; - lru_parser = parser_i; - processed = FLB_TRUE; - break; - } - else { - parser_i = NULL; + ret = ml_append_try_parser(parser_i, stream_id, type, + tm, buf, size, NULL, NULL); + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } + group->lru_parser = parser_i; + group->lru_parser->last_stream_id = stream_id; + processed = FLB_TRUE; + goto done; + } } + } } +done: if (!processed) { - if (lru_parser) { - flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); - parser_i = lru_parser; - } - else { - /* get the first parser (just to make use of it buffers) */ - parser_i = mk_list_entry_first(&group->parsers, - struct flb_ml_parser_ins, - _head); + /* A non-matching line breaks any multiline sequence. Flush all pending data. */ + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + } } - flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + /* Now process the current line as a standalone message. */ + group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head); + parser_i = mk_list_entry_first(&group->parsers, + struct flb_ml_parser_ins, + _head); mst = flb_ml_stream_get(parser_i, stream_id); if (!mst) { - flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " - "append content to multiline context", stream_id); return -1; } - /* Get stream group */ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); - flb_sds_cat_safe(&st_group->buf, buf, size); + flb_ml_register_context(st_group, tm, NULL); + ret = flb_ml_group_cat(st_group, buf, size); + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE); } - return 0; + return status; } - - int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id, struct flb_time *tm, @@ -759,176 +752,98 @@ int flb_ml_append_object(struct flb_ml *ml, msgpack_object *obj) { int ret; - int type; int processed = FLB_FALSE; + int status = FLB_MULTILINE_OK; struct mk_list *head; struct mk_list *head_group; - struct flb_ml_group *group; + struct flb_ml_group *group = NULL; struct flb_ml_parser_ins *lru_parser = NULL; - struct flb_ml_parser_ins *parser_i; + struct flb_ml_parser_ins *parser_i = NULL; struct flb_ml_stream *mst; struct flb_ml_stream_group *st_group; - struct flb_log_event event; + int type; if (metadata == NULL) { metadata = ml->log_event_decoder.empty_map; } - /* - * As incoming objects, we accept packed events - * and msgpack Maps containing key/value pairs. - */ - if (obj->type == MSGPACK_OBJECT_ARRAY) { + if (obj->type != MSGPACK_OBJECT_MAP) { flb_error("[multiline] appending object with invalid type, expected " "map, received type=%i", obj->type); return -1; - - - flb_log_event_decoder_reset(&ml->log_event_decoder, NULL, 0); - - ret = flb_event_decoder_decode_object(&ml->log_event_decoder, - &event, - obj); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_error("[multiline] invalid event object"); - - return -1; - } - - tm = &event.timestamp; - obj = event.body; - metadata = event.metadata; - - type = FLB_ML_TYPE_MAP; - } - else if (obj->type == MSGPACK_OBJECT_MAP) { - type = FLB_ML_TYPE_MAP; - } - else { - flb_error("[multiline] appending object with invalid type, expected " - "array or map, received type=%i", obj->type); - return -1; } + type = FLB_ML_TYPE_MAP; mk_list_foreach(head, &ml->groups) { group = mk_list_entry(head, struct flb_ml_group, _head); - /* Check if the incoming data matches the last recently used parser */ lru_parser = group->lru_parser; - if (lru_parser && lru_parser->last_stream_id == stream_id) { ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, tm, NULL, 0, metadata, obj); - if (ret == 0) { + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } processed = FLB_TRUE; - break; - } - else { - flb_ml_flush_parser_instance(ml, - lru_parser, - lru_parser->last_stream_id, - FLB_FALSE); + goto done; } } - else if (lru_parser && lru_parser->last_stream_id > 0) { - /* - * Clear last recently used parser to match new parser. - * Do not flush last_stream_id since it should continue to parsing. - */ - lru_parser = NULL; - } } - mk_list_foreach(head_group, &group->parsers) { - parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - if (lru_parser && parser_i == lru_parser) { - continue; - } + if (!processed) { + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + lru_parser = group->lru_parser; + + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + if (lru_parser && parser_i == lru_parser && + lru_parser->last_stream_id == stream_id) { + continue; + } - ret = ml_append_try_parser(parser_i, stream_id, type, - tm, NULL, 0, metadata, obj); - if (ret == 0) { - group->lru_parser = parser_i; - group->lru_parser->last_stream_id = stream_id; - lru_parser = parser_i; - processed = FLB_TRUE; - break; - } - else { - parser_i = NULL; + ret = ml_append_try_parser(parser_i, stream_id, type, + tm, NULL, 0, metadata, obj); + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } + group->lru_parser = parser_i; + group->lru_parser->last_stream_id = stream_id; + processed = FLB_TRUE; + goto done; + } + } } } +done: if (!processed) { - if (lru_parser) { - flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); - parser_i = lru_parser; - } - else { - /* get the first parser (just to make use of it buffers) */ - parser_i = mk_list_entry_first(&group->parsers, - struct flb_ml_parser_ins, - _head); + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + } } - flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head); + parser_i = mk_list_entry_first(&group->parsers, + struct flb_ml_parser_ins, + _head); + mst = flb_ml_stream_get(parser_i, stream_id); if (!mst) { - flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " - "append content to multiline context", stream_id); - return -1; } - /* Get stream group */ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); - - ret = flb_log_event_encoder_begin_record(&ml->log_event_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ml->log_event_encoder, tm); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (metadata != ml->log_event_decoder.empty_map) { - ret = flb_log_event_encoder_set_metadata_from_msgpack_object( - &ml->log_event_encoder, metadata); - } - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ml->log_event_encoder, obj); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ml->log_event_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - mst->cb_flush(parser_i->ml_parser, - mst, - mst->cb_data, - ml->log_event_encoder.output_buffer, - ml->log_event_encoder.output_length); - } - else { - flb_error("[multiline] log event encoder error : %d", ret); - } - - flb_log_event_encoder_reset(&ml->log_event_encoder); - - /* reset group buffer counters */ - st_group->mp_sbuf.size = 0; - flb_sds_len_set(st_group->buf, 0); - - /* Update last flush time */ - st_group->last_flush = time_ms_now(); + flb_ml_register_context(st_group, tm, obj); + flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE); } - return 0; + return status; } int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id, @@ -1412,38 +1327,45 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, return -1; } - /* Take the first line keys and repack */ - len = flb_sds_len(parser_i->key_content); - size = map.via.map.size; - msgpack_pack_map(&mp_pck, size); - - for (i = 0; i < size; i++) { - k = map.via.map.ptr[i].key; - v = map.via.map.ptr[i].val; - - /* - * Check if the current key is the key that will contain the - * concatenated multiline buffer - */ - if (k.type == MSGPACK_OBJECT_STR && - parser_i->key_content && - k.via.str.size == len && - strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { - - /* key */ - msgpack_pack_object(&mp_pck, k); - - /* value */ - len = flb_sds_len(group->buf); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, group->buf, len); - } - else { - /* key / val */ - msgpack_pack_object(&mp_pck, k); - msgpack_pack_object(&mp_pck, v); + if (flb_sds_len(group->buf) > 0) { + /* Take the first line keys and repack */ + len = flb_sds_len(parser_i->key_content); + size = map.via.map.size; + msgpack_pack_map(&mp_pck, size); + + for (i = 0; i < size; i++) { + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + /* + * Check if the current key is the key that will contain the + * concatenated multiline buffer + */ + if (k.type == MSGPACK_OBJECT_STR && + parser_i->key_content && + k.via.str.size == len && + strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { + + /* key */ + msgpack_pack_object(&mp_pck, k); + + /* value */ + len = flb_sds_len(group->buf); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, group->buf, len); + } + else { + /* key / val */ + msgpack_pack_object(&mp_pck, k); + msgpack_pack_object(&mp_pck, v); + } } } + else { + /* The buffer is empty, so just pack the original map from the context */ + msgpack_pack_object(&mp_pck, map); + } + msgpack_unpacked_destroy(&result); group->mp_sbuf.size = 0; group->mp_md_sbuf.size = 0; From c5e5b51aa41585c7dc14c709b7d21f298de89049 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Jul 2025 18:39:30 +0900 Subject: [PATCH 04/19] filter_multiline: Handle truncations due to exceeded limits Signed-off-by: Hiroshi Hatake Co-authored-by: Eduardo Silva --- plugins/filter_multiline/ml.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c index 41559296d05..c245d95ffae 100644 --- a/plugins/filter_multiline/ml.c +++ b/plugins/filter_multiline/ml.c @@ -820,7 +820,11 @@ static int cb_ml_filter(const void *data, size_t bytes, FLB_EVENT_DECODER_SUCCESS) { ret = flb_ml_append_event(ctx->m, ctx->stream_id, &event); - if (ret != 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + flb_plg_warn(ctx->ins, + "multiline message truncated due to buffer limit"); + } + else if (ret != FLB_MULTILINE_OK) { flb_plg_debug(ctx->ins, "could not append object from tag: %s", tag); } @@ -871,7 +875,11 @@ static int cb_ml_filter(const void *data, size_t bytes, FLB_EVENT_DECODER_SUCCESS) { ret = flb_ml_append_event(ctx->m, stream->stream_id, &event); - if (ret != 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + flb_plg_warn(ctx->ins, + "multiline message truncated due to buffer limit"); + } + else if (ret != FLB_MULTILINE_OK) { flb_plg_debug(ctx->ins, "could not append object from tag: %s", tag); } From a2804ec7fb4597c089d87b6079e895003a2d3410 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Jul 2025 18:40:01 +0900 Subject: [PATCH 05/19] in_tail: Handle truncations due to exceeded limits Signed-off-by: Hiroshi Hatake Co-authored-by: Eduardo Silva --- plugins/in_tail/tail_file.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 1d6ba3f95fe..5fc828317e5 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -563,6 +563,9 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) &out_time, line, line_len); + if (ret == FLB_MULTILINE_TRUNCATED) { + flb_plg_warn(ctx->ins, "multiline message truncated due to buffer limit"); + } goto go_next; } else if (ctx->docker_mode) { From b71f25e59bc2da35d422ef75fd5911703a7053fe Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 1 Aug 2025 14:18:07 +0900 Subject: [PATCH 06/19] ml: rule: Apply limit of group during regex handling Signed-off-by: Hiroshi Hatake --- src/multiline/flb_ml_rule.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/multiline/flb_ml_rule.c b/src/multiline/flb_ml_rule.c index 8ee0de5a946..62c18e08a1e 100644 --- a/src/multiline/flb_ml_rule.c +++ b/src/multiline/flb_ml_rule.c @@ -23,6 +23,7 @@ #include #include +#include struct to_state { struct flb_ml_rule *rule; @@ -379,14 +380,18 @@ int flb_ml_rule_process(struct flb_ml_parser *ml_parser, flb_sds_cat_safe(&group->buf, "\n", 1); } else { - flb_sds_cat_safe(&group->buf, buf_data, buf_size); + ret = flb_ml_group_cat(group, buf_data, buf_size); + if (ret == FLB_MULTILINE_TRUNCATED) { + rule = st->rule; + group->rule_to_state = rule; + return ret; + } } rule = st->rule; break; } rule = NULL; } - } if (!rule) { @@ -402,7 +407,10 @@ int flb_ml_rule_process(struct flb_ml_parser *ml_parser, group->rule_to_state = rule; /* concatenate the data */ - flb_sds_cat_safe(&group->buf, buf_data, buf_size); + ret = flb_ml_group_cat(group, buf_data, buf_size); + if (ret == FLB_MULTILINE_TRUNCATED) { + return ret; + } /* Copy full map content in stream buffer */ flb_ml_register_context(group, tm, full_map); From 413229265be9746a9d460098552e4afe730002e7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 29 Jul 2025 10:51:42 +0900 Subject: [PATCH 07/19] ml: group: stream: Add multiline_truncated: true metadata for truncated records Signed-off-by: Hiroshi Hatake --- include/fluent-bit/multiline/flb_ml.h | 1 + src/multiline/flb_ml.c | 9 +++++++++ src/multiline/flb_ml_group.c | 2 ++ src/multiline/flb_ml_stream.c | 1 + 4 files changed, 13 insertions(+) diff --git a/include/fluent-bit/multiline/flb_ml.h b/include/fluent-bit/multiline/flb_ml.h index 94cf2e8fd0b..7ac574bd5d0 100644 --- a/include/fluent-bit/multiline/flb_ml.h +++ b/include/fluent-bit/multiline/flb_ml.h @@ -113,6 +113,7 @@ struct flb_ml_stream_group { msgpack_sbuffer mp_sbuf; /* temporary msgpack buffer */ msgpack_packer mp_pck; /* temporary msgpack packer */ struct flb_time mp_time; /* multiline time parsed from first line */ + int truncated; /* was the buffer truncated? */ /* parent stream reference */ struct flb_ml_stream *stream; diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 5212cb52f2c..4edc6b2760f 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -1419,6 +1419,14 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, FLB_TRUE); } + /* If the buffer was truncated, append the marker to the metadata */ + if (ret == FLB_EVENT_ENCODER_SUCCESS && group->truncated) { + ret = flb_log_event_encoder_append_metadata_values( + &mst->ml->log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("multiline_truncated"), + FLB_LOG_EVENT_BOOLEAN_VALUE(FLB_TRUE)); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_set_body_from_raw_msgpack( &mst->ml->log_event_encoder, @@ -1457,6 +1465,7 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, msgpack_sbuffer_destroy(&mp_sbuf); flb_sds_len_set(group->buf, 0); + group->truncated = FLB_FALSE; /* Update last flush time */ group->last_flush = time_ms_now(); diff --git a/src/multiline/flb_ml_group.c b/src/multiline/flb_ml_group.c index 39779115da6..67481771945 100644 --- a/src/multiline/flb_ml_group.c +++ b/src/multiline/flb_ml_group.c @@ -97,12 +97,14 @@ int flb_ml_group_cat(struct flb_ml_stream_group *group, limit = group->stream->ml->buffer_limit; if (limit > 0) { if (flb_sds_len(group->buf) >= limit) { + group->truncated = FLB_TRUE; return FLB_MULTILINE_TRUNCATED; } avail = limit - flb_sds_len(group->buf); if (len > avail) { len = avail; + group->truncated = FLB_TRUE; status = FLB_MULTILINE_TRUNCATED; } } diff --git a/src/multiline/flb_ml_stream.c b/src/multiline/flb_ml_stream.c index 615a0123481..b263e6f6e0a 100644 --- a/src/multiline/flb_ml_stream.c +++ b/src/multiline/flb_ml_stream.c @@ -79,6 +79,7 @@ static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst msgpack_sbuffer_init(&group->mp_sbuf); msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write); + group->truncated = FLB_FALSE; /* parent stream reference */ group->stream = mst; From 5aa2e61375fe953e9cf4a7ba7dc9acd8da912721 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 1 Aug 2025 15:48:08 +0900 Subject: [PATCH 08/19] ml: Refer the configured limitation of multiline Signed-off-by: Hiroshi Hatake --- src/multiline/flb_ml.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 4edc6b2760f..1315557dd1b 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -874,6 +874,7 @@ struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) } ml->config = ctx; + ml->buffer_limit = ml->config->multiline_buffer_limit; ml->last_flush = time_ms_now(); mk_list_init(&ml->groups); From a72b13f09972453bba96170401c040709916d5e3 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 1 Aug 2025 16:03:15 +0900 Subject: [PATCH 09/19] ml: rule: Flush immediately when truncated Signed-off-by: Hiroshi Hatake --- src/multiline/flb_ml_rule.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/multiline/flb_ml_rule.c b/src/multiline/flb_ml_rule.c index 62c18e08a1e..1374c8fc6dc 100644 --- a/src/multiline/flb_ml_rule.c +++ b/src/multiline/flb_ml_rule.c @@ -382,8 +382,11 @@ int flb_ml_rule_process(struct flb_ml_parser *ml_parser, else { ret = flb_ml_group_cat(group, buf_data, buf_size); if (ret == FLB_MULTILINE_TRUNCATED) { - rule = st->rule; - group->rule_to_state = rule; + /* Buffer is full. Flush immediately to send the truncated record. */ + flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); + + /* Reset state so no more lines are appended to this record. */ + group->rule_to_state = NULL; return ret; } } From 65a2793a75d0d274641b8ab3e6f357b128eecd8e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 1 Aug 2025 19:01:00 +0900 Subject: [PATCH 10/19] ml: Avoid to use the collided status code 1 is also indicated for FLB_TRUE. Signed-off-by: Hiroshi Hatake --- include/fluent-bit/multiline/flb_ml.h | 2 +- src/multiline/flb_ml.c | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/include/fluent-bit/multiline/flb_ml.h b/include/fluent-bit/multiline/flb_ml.h index 7ac574bd5d0..50379a9059a 100644 --- a/include/fluent-bit/multiline/flb_ml.h +++ b/include/fluent-bit/multiline/flb_ml.h @@ -57,7 +57,7 @@ /* Return codes */ #define FLB_MULTILINE_OK 0 -#define FLB_MULTILINE_TRUNCATED 1 +#define FLB_MULTILINE_TRUNCATED 2 /* Maximum number of groups per stream */ #define FLB_ML_MAX_GROUPS 6 diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 1315557dd1b..87411446295 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -213,6 +213,7 @@ static int package_content(struct flb_ml_stream *mst, { int len; int ret; + int truncated = FLB_FALSE; int rule_match = FLB_FALSE; int processed = FLB_FALSE; int type; @@ -259,11 +260,13 @@ static int package_content(struct flb_ml_stream *mst, stream_group, full_map, buf, size, tm, val_content, val_pattern); if (ret == -1) { - processed = FLB_FALSE; + return -1; } - else { - processed = FLB_TRUE; + + if (ret == FLB_MULTILINE_TRUNCATED) { + truncated = FLB_TRUE; } + processed = FLB_TRUE; } else if (type == FLB_ML_ENDSWITH) { len = flb_sds_len(parser->match_str); @@ -340,6 +343,10 @@ static int package_content(struct flb_ml_stream *mst, msgpack_pack_object(&stream_group->mp_md_pck, *metadata); } + if (truncated) { + return FLB_MULTILINE_TRUNCATED; + } + return processed; } From e40c50b96827dc00c0182b70d0c251d5eed963fc Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Jul 2025 18:40:22 +0900 Subject: [PATCH 11/19] ml: tests: Process truncated case and follow the interface change This commit updates the expected output for the 'container_mix' unit test. Previously, the multiline engine could incorrectly merge pending messages when the log stream switched between different parser types (e.g., from `docker` to `cri`). The test's original expectations were written to match this buggy behavior. Recent fixes have made the engine's state handling more robust and precise. It now correctly flushes a pending message when the parser context changes, preventing improper merges. This change aligns the test case with the new, correct logic. Signed-off-by: Hiroshi Hatake Co-authored-by: Eduardo Silva --- tests/internal/multiline.c | 40 +++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index e175e1171aa..23afc4d3263 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -111,12 +111,11 @@ struct record_check container_mix_output[] = { {"a1\n"}, {"a2\n"}, {"ddee\n"}, - {"bbcc"}, + {"bbccdd-out\n"}, + {"dd-err\n"}, {"single full"}, {"1a. some multiline log"}, {"1b. some multiline log"}, - {"dd-out\n"}, - {"dd-err\n"}, }; /* Java stacktrace detection */ @@ -393,6 +392,10 @@ static int flush_callback(struct flb_ml_parser *parser, fprintf(stdout, "%s----------- EOF -----------%s\n", ANSI_YELLOW, ANSI_RESET); + if (!res) { + return 0; + } + /* Validate content */ msgpack_unpacked_init(&result); off = 0; @@ -1457,6 +1460,36 @@ static void test_issue_5504() #endif } +static void test_buffer_limit_truncation() +{ + int ret; + uint64_t stream_id; + struct flb_config *config; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct flb_time tm; + + config = flb_config_init(); + config->multiline_buffer_limit = 32; + + ml = flb_ml_create(config, "limit-test"); + TEST_CHECK(ml != NULL); + + mlp_i = flb_ml_parser_instance_create(ml, "docker"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "test", -1, flush_callback, NULL, &stream_id); + TEST_CHECK(ret == 0); + + flb_time_get(&tm); + ret = flb_ml_append_text(ml, stream_id, &tm, + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", 36); + TEST_CHECK(ret == FLB_MULTILINE_TRUNCATED); + + flb_ml_destroy(ml); + flb_config_exit(config); +} + TEST_LIST = { /* Normal features tests */ { "parser_docker", test_parser_docker}, @@ -1468,6 +1501,7 @@ TEST_LIST = { { "parser_go", test_parser_go}, { "container_mix", test_container_mix}, { "endswith", test_endswith}, + { "buffer_limit_truncation", test_buffer_limit_truncation}, /* Issues reported on Github */ { "issue_3817_1" , test_issue_3817_1}, From dec20f19d103699c7a6a45ac4490305888e14aa8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Jul 2025 18:40:22 +0900 Subject: [PATCH 12/19] ml: tests: Detect truncation ocurrence in multiline testcase. Signed-off-by: Hiroshi Hatake --- include/fluent-bit/multiline/flb_ml.h | 1 + src/multiline/flb_ml.c | 2 +- src/multiline/flb_ml_rule.c | 2 -- tests/internal/multiline.c | 47 ++++++++++++++++++++++++--- 4 files changed, 45 insertions(+), 7 deletions(-) diff --git a/include/fluent-bit/multiline/flb_ml.h b/include/fluent-bit/multiline/flb_ml.h index 50379a9059a..0e974722ae5 100644 --- a/include/fluent-bit/multiline/flb_ml.h +++ b/include/fluent-bit/multiline/flb_ml.h @@ -57,6 +57,7 @@ /* Return codes */ #define FLB_MULTILINE_OK 0 +#define FLB_MULTILINE_PROCESSED 1 /* Reserved */ #define FLB_MULTILINE_TRUNCATED 2 /* Maximum number of groups per stream */ diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 87411446295..1ac680eacda 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -491,7 +491,7 @@ static int process_append(struct flb_ml_parser_ins *parser_i, if (ret == FLB_FALSE) { return -1; } - return 0; + return ret; } static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser, diff --git a/src/multiline/flb_ml_rule.c b/src/multiline/flb_ml_rule.c index 1374c8fc6dc..ceca40a4070 100644 --- a/src/multiline/flb_ml_rule.c +++ b/src/multiline/flb_ml_rule.c @@ -417,8 +417,6 @@ int flb_ml_rule_process(struct flb_ml_parser *ml_parser, /* Copy full map content in stream buffer */ flb_ml_register_context(group, tm, full_map); - - return 0; } } diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index 23afc4d3263..368fa04af60 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1466,24 +1466,63 @@ static void test_buffer_limit_truncation() uint64_t stream_id; struct flb_config *config; struct flb_ml *ml; + struct flb_ml_parser *mlp; struct flb_ml_parser_ins *mlp_i; + struct flb_parser *p; struct flb_time tm; + /* + * A realistic Docker log where the content of the "log" field will be + * concatenated, and that concatenated buffer is what should be truncated. + */ + char *line1 = "{\"log\": \"12345678901234567890\", \"stream\": \"stdout\"}"; + char *line2 = "{\"log\": \"abcdefghijklmnopqrstuvwxyz\", \"stream\": \"stdout\"}"; + config = flb_config_init(); - config->multiline_buffer_limit = 32; + /* The buffer limit is for the concatenated 'log' content, not the full JSON */ + config->multiline_buffer_limit = 80; + + /* Use the dummy 'docker' parser for JSON extraction */ + p = flb_parser_get("docker", config); + /* This parser will trigger on any content, ensuring concatenation. */ ml = flb_ml_create(config, "limit-test"); TEST_CHECK(ml != NULL); - mlp_i = flb_ml_parser_instance_create(ml, "docker"); + mlp = flb_ml_parser_create(config, "test-concat", FLB_ML_REGEX, + NULL, FLB_FALSE, 1000, + "log", NULL, NULL, + p, "docker"); + TEST_CHECK(mlp != NULL); + + /* Define rules that will always match the test data */ + ret = flb_ml_rule_create(mlp, "start_state", "/./", "cont", NULL); + TEST_CHECK(ret == 0); + ret = flb_ml_rule_create(mlp, "cont", "/./", "cont", NULL); + TEST_CHECK(ret == 0); + + /* Finalize parser initialization */ + ret = flb_ml_parser_init(mlp); + TEST_CHECK(ret == 0); + + mlp_i = flb_ml_parser_instance_create(ml, "test-concat"); TEST_CHECK(mlp_i != NULL); ret = flb_ml_stream_create(ml, "test", -1, flush_callback, NULL, &stream_id); TEST_CHECK(ret == 0); flb_time_get(&tm); - ret = flb_ml_append_text(ml, stream_id, &tm, - "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", 36); + + /* Append the first line. It will match the 'start_state' and start a block. */ + ret = flb_ml_append_text(ml, stream_id, &tm, line1, strlen(line1)); + TEST_CHECK(ret == FLB_MULTILINE_OK); + + /* + * Append the second line. This will match the 'cont' state and concatenate. + * The concatenation will exceed the limit + * and correctly trigger the truncation logic. + */ + ret = flb_ml_append_text(ml, stream_id, &tm, line2, strlen(line2)); TEST_CHECK(ret == FLB_MULTILINE_TRUNCATED); flb_ml_destroy(ml); From e98ce4ae79852e2eb6acd596a4710a0aa674417e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 4 Aug 2025 20:58:28 +0900 Subject: [PATCH 13/19] in_tail: Add metrics for number of occurrence of truncations on multiline Signed-off-by: Hiroshi Hatake --- plugins/in_tail/tail_config.c | 8 ++++++++ plugins/in_tail/tail_config.h | 2 ++ plugins/in_tail/tail_file.c | 13 +++++++++++++ 3 files changed, 23 insertions(+) diff --git a/plugins/in_tail/tail_config.c b/plugins/in_tail/tail_config.c index f06e1c3ec63..929834b782e 100644 --- a/plugins/in_tail/tail_config.c +++ b/plugins/in_tail/tail_config.c @@ -479,6 +479,12 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, "Total number of rotated files", 1, (char *[]) {"name"}); + ctx->cmt_multiline_truncated = \ + cmt_counter_create(ins->cmt, + "fluentbit", "input", + "multiline_truncated_total", + "Total number of truncated occurences for multilines", + 1, (char *[]) {"name"}); /* OLD metrics */ flb_metrics_add(FLB_TAIL_METRIC_F_OPENED, "files_opened", ctx->ins->metrics); @@ -486,6 +492,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, "files_closed", ctx->ins->metrics); flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED, "files_rotated", ctx->ins->metrics); + flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED, + "multiline_truncated", ctx->ins->metrics); #endif return ctx; diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h index 326dba870a4..dfd5f919354 100644 --- a/plugins/in_tail/tail_config.h +++ b/plugins/in_tail/tail_config.h @@ -41,6 +41,7 @@ #define FLB_TAIL_METRIC_F_OPENED 100 /* number of opened files */ #define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */ #define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */ +#define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */ #endif struct flb_tail_config { @@ -167,6 +168,7 @@ struct flb_tail_config { struct cmt_counter *cmt_files_opened; struct cmt_counter *cmt_files_closed; struct cmt_counter *cmt_files_rotated; + struct cmt_counter *cmt_multiline_truncated; /* Hash: hash tables for quick acess to registered files */ struct flb_hash_table *static_hash; diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 5fc828317e5..6d222d3814c 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -447,6 +447,11 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) #ifdef FLB_HAVE_UNICODE_ENCODER size_t decoded_len; #endif +#ifdef FLB_HAVE_METRICS + uint64_t ts; + char *name; +#endif + ctx = (struct flb_tail_config *) file->config; @@ -565,6 +570,14 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) line_len); if (ret == FLB_MULTILINE_TRUNCATED) { flb_plg_warn(ctx->ins, "multiline message truncated due to buffer limit"); +#ifdef FLB_HAVE_METRICS + name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_multiline_truncated, ts, 1, (char *[]) {name}); + + /* Old api */ + flb_metrics_sum(FLB_TAIL_METRIC_M_TRUNCATED, 1, ctx->ins->metrics); +#endif } goto go_next; } From a0b62998fa938179af6997e49d8ce5375057d9a7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 4 Aug 2025 21:08:00 +0900 Subject: [PATCH 14/19] filter_multiline: Add metrics for emissions and truncations Signed-off-by: Hiroshi Hatake --- plugins/filter_multiline/ml.c | 46 +++++++++++++++++++++++++++++++++++ plugins/filter_multiline/ml.h | 4 ++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c index c245d95ffae..8b3a4bcdca3 100644 --- a/plugins/filter_multiline/ml.c +++ b/plugins/filter_multiline/ml.c @@ -369,10 +369,16 @@ static int cb_ml_init(struct flb_filter_instance *ins, "fluentbit", "filter", "emit_records_total", "Total number of emitted records", 1, (char *[]) {"name"}); + ctx->cmt_truncated = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "emit_truncated_total", + "Total number of truncated occurence of multiline", + 1, (char *[]) {"name"}); /* OLD api */ flb_metrics_add(FLB_MULTILINE_METRIC_EMITTED, "emit_records", ctx->ins->metrics); + flb_metrics_add(FLB_MULTILINE_METRIC_TRUNCATED, + "emit_truncated", ctx->ins->metrics); #endif } @@ -780,6 +786,10 @@ static int cb_ml_filter(const void *data, size_t bytes, struct flb_log_event event; int ret; struct ml_ctx *ctx; +#ifdef FLB_HAVE_METRICS + uint64_t ts; + char *name; +#endif (void) f_ins; (void) config; @@ -823,11 +833,29 @@ static int cb_ml_filter(const void *data, size_t bytes, if (ret == FLB_MULTILINE_TRUNCATED) { flb_plg_warn(ctx->ins, "multiline message truncated due to buffer limit"); +#ifdef FLB_HAVE_METRICS + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics); +#endif } else if (ret != FLB_MULTILINE_OK) { flb_plg_debug(ctx->ins, "could not append object from tag: %s", tag); } + else if (ret == FLB_MULTILINE_OK) { +#ifdef FLB_HAVE_METRICS + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics); +#endif + } } flb_log_event_decoder_destroy(&decoder); @@ -878,11 +906,29 @@ static int cb_ml_filter(const void *data, size_t bytes, if (ret == FLB_MULTILINE_TRUNCATED) { flb_plg_warn(ctx->ins, "multiline message truncated due to buffer limit"); +#ifdef FLB_HAVE_METRICS + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics); +#endif } else if (ret != FLB_MULTILINE_OK) { flb_plg_debug(ctx->ins, "could not append object from tag: %s", tag); } + else if (ret == FLB_MULTILINE_OK) { +#ifdef FLB_HAVE_METRICS + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics); +#endif + } } flb_log_event_decoder_destroy(&decoder); diff --git a/plugins/filter_multiline/ml.h b/plugins/filter_multiline/ml.h index cae8fb64166..3bcb8d4eca8 100644 --- a/plugins/filter_multiline/ml.h +++ b/plugins/filter_multiline/ml.h @@ -25,10 +25,11 @@ #define FLB_MULTILINE_MEM_BUF_LIMIT_DEFAULT "10M" #define FLB_MULTILINE_METRIC_EMITTED 200 +#define FLB_MULTILINE_METRIC_TRUNCATED 201 #define FLB_MULTILINE_MODE_PARTIAL_MESSAGE "partial_message" #define FLB_MULTILINE_MODE_PARSER "parser" -/* +/* * input instance + tag is the unique identifier * for a multiline stream * TODO: implement clean up of streams that haven't been used recently @@ -77,6 +78,7 @@ struct ml_ctx { #ifdef FLB_HAVE_METRICS struct cmt_counter *cmt_emitted; + struct cmt_counter *cmt_truncated; #endif }; From b08d69e00d441d4c235915d2515a78ec6e5dc4b5 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 4 Aug 2025 22:58:33 +0900 Subject: [PATCH 15/19] config: ml: Handle SI prefixes on the buffer limit of multiline Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_config.h | 2 +- include/fluent-bit/multiline/flb_ml.h | 3 ++- src/flb_config.c | 4 ++-- src/multiline/flb_ml.c | 10 +++++++++- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 35365884a1b..05ac136cff7 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -145,7 +145,7 @@ struct flb_config { /* Multiline core parser definitions */ struct mk_list multiline_parsers; - size_t multiline_buffer_limit; /* limit for multiline concatenated data */ + char *multiline_buffer_limit; /* limit for multiline concatenated data */ /* Outputs instances */ struct mk_list outputs; /* list of output plugins */ diff --git a/include/fluent-bit/multiline/flb_ml.h b/include/fluent-bit/multiline/flb_ml.h index 0e974722ae5..c27f9e1580b 100644 --- a/include/fluent-bit/multiline/flb_ml.h +++ b/include/fluent-bit/multiline/flb_ml.h @@ -53,7 +53,8 @@ #define FLB_ML_BUF_SIZE 1024*4 /* Default limit for concatenated multiline messages: 2MB */ -#define FLB_ML_BUFFER_LIMIT_DEFAULT (1024 * 1024 * 2) +#define FLB_ML_BUFFER_LIMIT_DEFAULT_STR "2MB" +#define FLB_ML_BUFFER_LIMIT_DEFAULT (1024 * 1024 * 2) /* Return codes */ #define FLB_MULTILINE_OK 0 diff --git a/src/flb_config.c b/src/flb_config.c index 567848f7633..1cf509b4ee4 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -170,7 +170,7 @@ struct flb_service_config service_configs[] = { offsetof(struct flb_config, coro_stack_size)}, {FLB_CONF_STR_MULTILINE_BUFFER_LIMIT, - FLB_CONF_TYPE_INT, + FLB_CONF_TYPE_STR, offsetof(struct flb_config, multiline_buffer_limit)}, /* Scheduler */ @@ -365,7 +365,7 @@ struct flb_config *flb_config_init() * on we use flb_config_exit to cleanup the config, which requires * the config->multiline_parsers list to be initialized. */ mk_list_init(&config->multiline_parsers); - config->multiline_buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; + config->multiline_buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT_STR; /* Task map */ ret = flb_config_task_map_resize(config, FLB_CONFIG_DEFAULT_TASK_MAP_SIZE); diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 1ac680eacda..f26b26edef1 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -867,6 +868,7 @@ int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id, struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) { int result; + size_t limit = 0; struct flb_ml *ml; ml = flb_calloc(1, sizeof(struct flb_ml)); @@ -881,7 +883,13 @@ struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) } ml->config = ctx; - ml->buffer_limit = ml->config->multiline_buffer_limit; + limit = flb_utils_size_to_bytes(ml->config->multiline_buffer_limit); + if (limit > 0) { + ml->buffer_limit = (size_t)limit; + } + else { + ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; + } ml->last_flush = time_ms_now(); mk_list_init(&ml->groups); From 5b60a7291020c305f4d3b98d1288034f7e6969b7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 4 Aug 2025 22:59:58 +0900 Subject: [PATCH 16/19] ml: tests: Follow the type change for the buffer limit Signed-off-by: Hiroshi Hatake --- tests/internal/multiline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index 368fa04af60..28e4efb08fd 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1480,7 +1480,7 @@ static void test_buffer_limit_truncation() config = flb_config_init(); /* The buffer limit is for the concatenated 'log' content, not the full JSON */ - config->multiline_buffer_limit = 80; + config->multiline_buffer_limit = "80"; /* Use the dummy 'docker' parser for JSON extraction */ p = flb_parser_get("docker", config); From 5e9152438e6afb9bdec05e3e1de6e5a7aa81b5e3 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 4 Aug 2025 23:26:05 +0900 Subject: [PATCH 17/19] utils: tests: Implement binary bytes conversion function size_to_byte function just converts with 1000(K), 1000*K, 1000*M. But this function converts with 1024(KiB), 1024*KiB(MiB), and 1024*MiB(GiB). Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_utils.h | 1 + src/flb_utils.c | 86 ++++++++++++++++++++++++++++++++++ tests/internal/utils.c | 40 ++++++++++++++++ 3 files changed, 127 insertions(+) diff --git a/include/fluent-bit/flb_utils.h b/include/fluent-bit/flb_utils.h index 56f286f6738..ef02d670583 100644 --- a/include/fluent-bit/flb_utils.h +++ b/include/fluent-bit/flb_utils.h @@ -49,6 +49,7 @@ void flb_utils_split_free_entry(struct flb_split_entry *entry); void flb_utils_split_free(struct mk_list *list); int flb_utils_timer_consume(flb_pipefd_t fd); int64_t flb_utils_size_to_bytes(const char *size); +int64_t flb_utils_size_to_binary_bytes(const char *size); int64_t flb_utils_hex2int(char *hex, int len); int flb_utils_time_to_seconds(const char *time); int flb_utils_pipe_byte_consume(flb_pipefd_t fd); diff --git a/src/flb_utils.c b/src/flb_utils.c index 56e6877d7bb..375a945e9f5 100644 --- a/src/flb_utils.c +++ b/src/flb_utils.c @@ -607,6 +607,92 @@ int64_t flb_utils_size_to_bytes(const char *size) return (int64_t)val; } +int64_t flb_utils_size_to_binary_bytes(const char *size) +{ + int i; + int len; + int plen = 0; + double val; + char tmp[4] = {0}; + int64_t KiB = 1024; + int64_t MiB = 1024 * KiB; + int64_t GiB = 1024 * MiB; + + if (!size) { + return -1; + } + + if (strcasecmp(size, "false") == 0) { + return 0; + } + + len = strlen(size); + val = atof(size); + + if (len == 0) { + return -1; + } + + for (i = len - 1; i >= 0; i--) { + if (isalpha(size[i])) { + plen++; + } + else { + break; + } + } + + if (plen == 0) { + return (int64_t)val; + } + else if (plen > 3) { + return -1; + } + + for (i = 0; i < plen; i++) { + tmp[i] = toupper(size[len - plen + i]); + } + + if (plen == 2) { + if (tmp[1] != 'B') { + return -1; + } + } + if (plen == 3) { + if (tmp[1] != 'I' || tmp[2] != 'B') { + return -1; + } + } + + if (tmp[0] == 'K') { + /* set upper bound (2**64/KiB)/2 to avoid overflows */ + if (val >= 9223372036854775.0 || val <= -9223372036854774.0) + { + return -1; + } + return (int64_t)(val * KiB); + } + else if (tmp[0] == 'M') { + /* set upper bound (2**64/MiB)/2 to avoid overflows */ + if (val >= 9223372036854.0 || val <= -9223372036853.0) { + return -1; + } + return (int64_t)(val * MiB); + } + else if (tmp[0] == 'G') { + /* set upper bound (2**64/GiB)/2 to avoid overflows */ + if (val >= 9223372036.0 || val <= -9223372035.0) { + return -1; + } + return (int64_t)(val * GiB); + } + else { + return -1; + } + + return (int64_t)val; +} + int64_t flb_utils_hex2int(char *hex, int len) { int i = 0; diff --git a/tests/internal/utils.c b/tests/internal/utils.c index 232e2461c0b..897cf3b756a 100644 --- a/tests/internal/utils.c +++ b/tests/internal/utils.c @@ -798,6 +798,45 @@ void test_size_to_bytes() } } +struct size_to_bytes_check size_to_binary_bytes_checks[] = { + {"922337.63", 922337}, + {"2K",2048}, + {"5.7263K", 5863}, + {"5.7263KB", 5863}, + {"5.7263KiB", 5863}, + {"9223372036854775.23K", -1}, + {"1M", 1048576}, + {"1.1M", 1153433}, + {"1.1MB", 1153433}, + {"1.1MiB", 1153433}, + {"3.592M", 3766484}, + {"52.752383M", 55314882}, + {"52.752383MB", 55314882}, + {"52.752383MiB", 55314882}, + {"9223372036854.42M", -1}, + {"492.364G",528671819431}, + {"492.364GB",528671819431}, + {"492.364GiB",528671819431}, + {"1.2973G", 1392965268}, + {"9223372036.78G", -1}, +}; + +void test_size_to_binary_bytes() +{ + int i; + int size; + int64_t ret; + struct size_to_bytes_check *u; + + size = sizeof(size_to_binary_bytes_checks) / sizeof(struct size_to_bytes_check); + for (i = 0; i < size; i++) { + u = &size_to_binary_bytes_checks[i]; + + ret = flb_utils_size_to_binary_bytes(u->size); + TEST_CHECK_(ret == u->ret, "ret = %zu, u->ret = %zu", ret, u->ret); + } +} + TEST_LIST = { /* JSON maps iteration */ { "url_split", test_url_split }, @@ -815,5 +854,6 @@ TEST_LIST = { { "test_flb_utils_split_quoted_errors", test_flb_utils_split_quoted_errors}, { "test_flb_utils_get_machine_id", test_flb_utils_get_machine_id }, { "test_size_to_bytes", test_size_to_bytes }, + { "test_size_to_bianry_bytes", test_size_to_binary_bytes }, { 0 } }; From 20c76953e52195e5a57a75a26310f610c9ad67c5 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 4 Aug 2025 23:28:17 +0900 Subject: [PATCH 18/19] ml: Use binary byte conversion function Signed-off-by: Hiroshi Hatake --- src/multiline/flb_ml.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index f26b26edef1..a148d7c2c56 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -883,7 +883,7 @@ struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) } ml->config = ctx; - limit = flb_utils_size_to_bytes(ml->config->multiline_buffer_limit); + limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit); if (limit > 0) { ml->buffer_limit = (size_t)limit; } From d366fcae9a9123ac7a4ac45e7260c286441da8fc Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 5 Aug 2025 01:44:39 +0900 Subject: [PATCH 19/19] filter_multiline: Process truncated metrics always Signed-off-by: Hiroshi Hatake --- plugins/filter_multiline/ml.c | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c index 8b3a4bcdca3..ea0e50120c1 100644 --- a/plugins/filter_multiline/ml.c +++ b/plugins/filter_multiline/ml.c @@ -369,18 +369,23 @@ static int cb_ml_init(struct flb_filter_instance *ins, "fluentbit", "filter", "emit_records_total", "Total number of emitted records", 1, (char *[]) {"name"}); + + /* OLD api */ + flb_metrics_add(FLB_MULTILINE_METRIC_EMITTED, + "emit_records", ctx->ins->metrics); +#endif + } + /* Truncated metrics always should be existing. */ +#ifdef FLB_HAVE_METRICS ctx->cmt_truncated = cmt_counter_create(ins->cmt, "fluentbit", "filter", "emit_truncated_total", "Total number of truncated occurence of multiline", 1, (char *[]) {"name"}); /* OLD api */ - flb_metrics_add(FLB_MULTILINE_METRIC_EMITTED, - "emit_records", ctx->ins->metrics); flb_metrics_add(FLB_MULTILINE_METRIC_TRUNCATED, "emit_truncated", ctx->ins->metrics); #endif - } mk_list_init(&ctx->ml_streams); mk_list_init(&ctx->split_message_packers); @@ -846,16 +851,6 @@ static int cb_ml_filter(const void *data, size_t bytes, flb_plg_debug(ctx->ins, "could not append object from tag: %s", tag); } - else if (ret == FLB_MULTILINE_OK) { -#ifdef FLB_HAVE_METRICS - name = (char *) flb_filter_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); - - /* old api */ - flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics); -#endif - } } flb_log_event_decoder_destroy(&decoder);