diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index b6090eea537..05ac136cff7 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -145,6 +145,7 @@ struct flb_config { /* Multiline core parser definitions */ struct mk_list multiline_parsers; + char *multiline_buffer_limit; /* limit for multiline concatenated data */ /* Outputs instances */ struct mk_list outputs; /* list of output plugins */ @@ -408,6 +409,9 @@ enum conf_type { /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" +/* Multiline */ +#define FLB_CONF_STR_MULTILINE_BUFFER_LIMIT "multiline_buffer_limit" + /* Scheduler */ #define FLB_CONF_STR_SCHED_CAP "scheduler.cap" #define FLB_CONF_STR_SCHED_BASE "scheduler.base" diff --git a/include/fluent-bit/flb_utils.h b/include/fluent-bit/flb_utils.h index 56f286f6738..ef02d670583 100644 --- a/include/fluent-bit/flb_utils.h +++ b/include/fluent-bit/flb_utils.h @@ -49,6 +49,7 @@ void flb_utils_split_free_entry(struct flb_split_entry *entry); void flb_utils_split_free(struct mk_list *list); int flb_utils_timer_consume(flb_pipefd_t fd); int64_t flb_utils_size_to_bytes(const char *size); +int64_t flb_utils_size_to_binary_bytes(const char *size); int64_t flb_utils_hex2int(char *hex, int len); int flb_utils_time_to_seconds(const char *time); int flb_utils_pipe_byte_consume(flb_pipefd_t fd); diff --git a/include/fluent-bit/multiline/flb_ml.h b/include/fluent-bit/multiline/flb_ml.h index baa87bdb8e0..c27f9e1580b 100644 --- a/include/fluent-bit/multiline/flb_ml.h +++ b/include/fluent-bit/multiline/flb_ml.h @@ -52,6 +52,15 @@ /* Default multiline buffer size: 4Kb */ #define FLB_ML_BUF_SIZE 1024*4 +/* Default limit for concatenated multiline messages: 2MB */ +#define FLB_ML_BUFFER_LIMIT_DEFAULT_STR "2MB" +#define FLB_ML_BUFFER_LIMIT_DEFAULT (1024 * 1024 * 2) + +/* Return codes */ +#define FLB_MULTILINE_OK 0 +#define FLB_MULTILINE_PROCESSED 1 /* Reserved */ +#define FLB_MULTILINE_TRUNCATED 2 + /* Maximum number of groups per stream */ #define FLB_ML_MAX_GROUPS 6 @@ -106,6 +115,10 @@ struct flb_ml_stream_group { msgpack_sbuffer mp_sbuf; /* temporary msgpack buffer */ msgpack_packer mp_pck; /* temporary msgpack packer */ struct flb_time mp_time; /* multiline time parsed from first line */ + int truncated; /* was the buffer truncated? */ + + /* parent stream reference */ + struct flb_ml_stream *stream; struct mk_list _head; }; @@ -275,6 +288,9 @@ struct flb_ml { struct flb_log_event_encoder log_event_encoder; struct flb_log_event_decoder log_event_decoder; struct flb_config *config; /* Fluent Bit context */ + + /* Limit for concatenated multiline messages */ + size_t buffer_limit; }; struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name); diff --git a/include/fluent-bit/multiline/flb_ml_group.h b/include/fluent-bit/multiline/flb_ml_group.h index d035160884e..7a19bb7bbf4 100644 --- a/include/fluent-bit/multiline/flb_ml_group.h +++ b/include/fluent-bit/multiline/flb_ml_group.h @@ -28,4 +28,12 @@ struct flb_ml_group *flb_ml_group_create(struct flb_ml *ml); void flb_ml_group_destroy(struct flb_ml_group *group); int flb_ml_group_add_parser(struct flb_ml *ctx, struct flb_ml_parser_ins *p); +/* + * Append data to a multiline stream group respecting the configured + * buffer limit. The length of the appended data might be reduced if + * the limit is reached. + */ +int flb_ml_group_cat(struct flb_ml_stream_group *group, + const char *data, size_t len); + #endif diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c index 41559296d05..ea0e50120c1 100644 --- a/plugins/filter_multiline/ml.c +++ b/plugins/filter_multiline/ml.c @@ -375,6 +375,17 @@ static int cb_ml_init(struct flb_filter_instance *ins, "emit_records", ctx->ins->metrics); #endif } + /* Truncated metrics always should be existing. */ +#ifdef FLB_HAVE_METRICS + ctx->cmt_truncated = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "emit_truncated_total", + "Total number of truncated occurence of multiline", + 1, (char *[]) {"name"}); + + /* OLD api */ + flb_metrics_add(FLB_MULTILINE_METRIC_TRUNCATED, + "emit_truncated", ctx->ins->metrics); +#endif mk_list_init(&ctx->ml_streams); mk_list_init(&ctx->split_message_packers); @@ -780,6 +791,10 @@ static int cb_ml_filter(const void *data, size_t bytes, struct flb_log_event event; int ret; struct ml_ctx *ctx; +#ifdef FLB_HAVE_METRICS + uint64_t ts; + char *name; +#endif (void) f_ins; (void) config; @@ -820,7 +835,19 @@ static int cb_ml_filter(const void *data, size_t bytes, FLB_EVENT_DECODER_SUCCESS) { ret = flb_ml_append_event(ctx->m, ctx->stream_id, &event); - if (ret != 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + flb_plg_warn(ctx->ins, + "multiline message truncated due to buffer limit"); +#ifdef FLB_HAVE_METRICS + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics); +#endif + } + else if (ret != FLB_MULTILINE_OK) { flb_plg_debug(ctx->ins, "could not append object from tag: %s", tag); } @@ -871,10 +898,32 @@ static int cb_ml_filter(const void *data, size_t bytes, FLB_EVENT_DECODER_SUCCESS) { ret = flb_ml_append_event(ctx->m, stream->stream_id, &event); - if (ret != 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + flb_plg_warn(ctx->ins, + "multiline message truncated due to buffer limit"); +#ifdef FLB_HAVE_METRICS + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics); +#endif + } + else if (ret != FLB_MULTILINE_OK) { flb_plg_debug(ctx->ins, "could not append object from tag: %s", tag); } + else if (ret == FLB_MULTILINE_OK) { +#ifdef FLB_HAVE_METRICS + name = (char *) flb_filter_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name}); + + /* old api */ + flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics); +#endif + } } flb_log_event_decoder_destroy(&decoder); diff --git a/plugins/filter_multiline/ml.h b/plugins/filter_multiline/ml.h index cae8fb64166..3bcb8d4eca8 100644 --- a/plugins/filter_multiline/ml.h +++ b/plugins/filter_multiline/ml.h @@ -25,10 +25,11 @@ #define FLB_MULTILINE_MEM_BUF_LIMIT_DEFAULT "10M" #define FLB_MULTILINE_METRIC_EMITTED 200 +#define FLB_MULTILINE_METRIC_TRUNCATED 201 #define FLB_MULTILINE_MODE_PARTIAL_MESSAGE "partial_message" #define FLB_MULTILINE_MODE_PARSER "parser" -/* +/* * input instance + tag is the unique identifier * for a multiline stream * TODO: implement clean up of streams that haven't been used recently @@ -77,6 +78,7 @@ struct ml_ctx { #ifdef FLB_HAVE_METRICS struct cmt_counter *cmt_emitted; + struct cmt_counter *cmt_truncated; #endif }; diff --git a/plugins/in_tail/tail_config.c b/plugins/in_tail/tail_config.c index f06e1c3ec63..929834b782e 100644 --- a/plugins/in_tail/tail_config.c +++ b/plugins/in_tail/tail_config.c @@ -479,6 +479,12 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, "Total number of rotated files", 1, (char *[]) {"name"}); + ctx->cmt_multiline_truncated = \ + cmt_counter_create(ins->cmt, + "fluentbit", "input", + "multiline_truncated_total", + "Total number of truncated occurences for multilines", + 1, (char *[]) {"name"}); /* OLD metrics */ flb_metrics_add(FLB_TAIL_METRIC_F_OPENED, "files_opened", ctx->ins->metrics); @@ -486,6 +492,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, "files_closed", ctx->ins->metrics); flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED, "files_rotated", ctx->ins->metrics); + flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED, + "multiline_truncated", ctx->ins->metrics); #endif return ctx; diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h index 326dba870a4..dfd5f919354 100644 --- a/plugins/in_tail/tail_config.h +++ b/plugins/in_tail/tail_config.h @@ -41,6 +41,7 @@ #define FLB_TAIL_METRIC_F_OPENED 100 /* number of opened files */ #define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */ #define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */ +#define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */ #endif struct flb_tail_config { @@ -167,6 +168,7 @@ struct flb_tail_config { struct cmt_counter *cmt_files_opened; struct cmt_counter *cmt_files_closed; struct cmt_counter *cmt_files_rotated; + struct cmt_counter *cmt_multiline_truncated; /* Hash: hash tables for quick acess to registered files */ struct flb_hash_table *static_hash; diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 1d6ba3f95fe..6d222d3814c 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -447,6 +447,11 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) #ifdef FLB_HAVE_UNICODE_ENCODER size_t decoded_len; #endif +#ifdef FLB_HAVE_METRICS + uint64_t ts; + char *name; +#endif + ctx = (struct flb_tail_config *) file->config; @@ -563,6 +568,17 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) &out_time, line, line_len); + if (ret == FLB_MULTILINE_TRUNCATED) { + flb_plg_warn(ctx->ins, "multiline message truncated due to buffer limit"); +#ifdef FLB_HAVE_METRICS + name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_multiline_truncated, ts, 1, (char *[]) {name}); + + /* Old api */ + flb_metrics_sum(FLB_TAIL_METRIC_M_TRUNCATED, 1, ctx->ins->metrics); +#endif + } goto go_next; } else if (ctx->docker_mode) { diff --git a/src/flb_config.c b/src/flb_config.c index 407464b4942..1cf509b4ee4 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -169,6 +169,10 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_INT, offsetof(struct flb_config, coro_stack_size)}, + {FLB_CONF_STR_MULTILINE_BUFFER_LIMIT, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, multiline_buffer_limit)}, + /* Scheduler */ {FLB_CONF_STR_SCHED_CAP, FLB_CONF_TYPE_INT, @@ -361,6 +365,7 @@ struct flb_config *flb_config_init() * on we use flb_config_exit to cleanup the config, which requires * the config->multiline_parsers list to be initialized. */ mk_list_init(&config->multiline_parsers); + config->multiline_buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT_STR; /* Task map */ ret = flb_config_task_map_resize(config, FLB_CONFIG_DEFAULT_TASK_MAP_SIZE); diff --git a/src/flb_utils.c b/src/flb_utils.c index 56e6877d7bb..375a945e9f5 100644 --- a/src/flb_utils.c +++ b/src/flb_utils.c @@ -607,6 +607,92 @@ int64_t flb_utils_size_to_bytes(const char *size) return (int64_t)val; } +int64_t flb_utils_size_to_binary_bytes(const char *size) +{ + int i; + int len; + int plen = 0; + double val; + char tmp[4] = {0}; + int64_t KiB = 1024; + int64_t MiB = 1024 * KiB; + int64_t GiB = 1024 * MiB; + + if (!size) { + return -1; + } + + if (strcasecmp(size, "false") == 0) { + return 0; + } + + len = strlen(size); + val = atof(size); + + if (len == 0) { + return -1; + } + + for (i = len - 1; i >= 0; i--) { + if (isalpha(size[i])) { + plen++; + } + else { + break; + } + } + + if (plen == 0) { + return (int64_t)val; + } + else if (plen > 3) { + return -1; + } + + for (i = 0; i < plen; i++) { + tmp[i] = toupper(size[len - plen + i]); + } + + if (plen == 2) { + if (tmp[1] != 'B') { + return -1; + } + } + if (plen == 3) { + if (tmp[1] != 'I' || tmp[2] != 'B') { + return -1; + } + } + + if (tmp[0] == 'K') { + /* set upper bound (2**64/KiB)/2 to avoid overflows */ + if (val >= 9223372036854775.0 || val <= -9223372036854774.0) + { + return -1; + } + return (int64_t)(val * KiB); + } + else if (tmp[0] == 'M') { + /* set upper bound (2**64/MiB)/2 to avoid overflows */ + if (val >= 9223372036854.0 || val <= -9223372036853.0) { + return -1; + } + return (int64_t)(val * MiB); + } + else if (tmp[0] == 'G') { + /* set upper bound (2**64/GiB)/2 to avoid overflows */ + if (val >= 9223372036.0 || val <= -9223372035.0) { + return -1; + } + return (int64_t)(val * GiB); + } + else { + return -1; + } + + return (int64_t)val; +} + int64_t flb_utils_hex2int(char *hex, int len) { int i = 0; diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 99e3d126a46..a148d7c2c56 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -213,6 +214,7 @@ static int package_content(struct flb_ml_stream *mst, { int len; int ret; + int truncated = FLB_FALSE; int rule_match = FLB_FALSE; int processed = FLB_FALSE; int type; @@ -259,11 +261,13 @@ static int package_content(struct flb_ml_stream *mst, stream_group, full_map, buf, size, tm, val_content, val_pattern); if (ret == -1) { - processed = FLB_FALSE; + return -1; } - else { - processed = FLB_TRUE; + + if (ret == FLB_MULTILINE_TRUNCATED) { + truncated = FLB_TRUE; } + processed = FLB_TRUE; } else if (type == FLB_ML_ENDSWITH) { len = flb_sds_len(parser->match_str); @@ -340,6 +344,10 @@ static int package_content(struct flb_ml_stream *mst, msgpack_pack_object(&stream_group->mp_md_pck, *metadata); } + if (truncated) { + return FLB_MULTILINE_TRUNCATED; + } + return processed; } @@ -410,10 +418,8 @@ static int process_append(struct flb_ml_parser_ins *parser_i, /* Lookup the key */ if (type == FLB_ML_TYPE_TEXT) { ret = package_content(mst, NULL, NULL, buf, size, tm, NULL, NULL, NULL); - if (ret == FLB_FALSE) { - return -1; - } - return 0; + /* Return the raw status of text type of result. */ + return ret; } else if (type == FLB_ML_TYPE_MAP) { full_map = obj; @@ -486,7 +492,7 @@ static int process_append(struct flb_ml_parser_ins *parser_i, if (ret == FLB_FALSE) { return -1; } - return 0; + return ret; } static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser, @@ -599,9 +605,12 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser, tm, buf, size, map, &out_buf, &out_size, &release, &out_time); - if (ret < 0) { - return -1; - } + /* + * Do not return -1 here. If the sub-parser fails, we should + * still attempt to process the raw text with multiline rules. + * The 'ret' variable is not used beyond this point, so we can + * safely ignore a failure here and let the multiline rules decide. + */ break; case FLB_ML_TYPE_MAP: ret = ml_append_try_parser_type_map(parser, stream_id, &type, @@ -649,7 +658,7 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser, flb_free(out_buf); } - return 0; + return ret; } int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id, @@ -657,101 +666,93 @@ int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id, { int ret; int processed = FLB_FALSE; + int status = FLB_MULTILINE_OK; struct mk_list *head; struct mk_list *head_group; - struct flb_ml_group *group; - struct flb_ml_stream *mst; + struct flb_ml_group *group = NULL; struct flb_ml_parser_ins *lru_parser = NULL; - struct flb_ml_parser_ins *parser_i; - struct flb_time out_time; + struct flb_ml_parser_ins *parser_i = NULL; + struct flb_ml_stream *mst; struct flb_ml_stream_group *st_group; - int type; - - type = FLB_ML_TYPE_TEXT; - - flb_time_zero(&out_time); + int type = FLB_ML_TYPE_TEXT; mk_list_foreach(head, &ml->groups) { group = mk_list_entry(head, struct flb_ml_group, _head); - /* Check if the incoming data matches the last recently used parser */ lru_parser = group->lru_parser; - if (lru_parser && lru_parser->last_stream_id == stream_id) { ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, tm, buf, size, NULL, NULL); - if (ret == 0) { + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } processed = FLB_TRUE; - break; - } - else { - flb_ml_flush_parser_instance(ml, - lru_parser, - lru_parser->last_stream_id, - FLB_FALSE); + goto done; /* Use goto to break out of nested loops */ } } - else if (lru_parser && lru_parser->last_stream_id > 0) { - /* - * Clear last recently used parser to match new parser. - * Do not flush last_stream_id since it should continue to parsing. - */ - lru_parser = NULL; - } } - mk_list_foreach(head_group, &group->parsers) { - parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - if (lru_parser && lru_parser == parser_i && - lru_parser->last_stream_id == stream_id) { - continue; - } + if (!processed) { + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + lru_parser = group->lru_parser; + + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + if (lru_parser && lru_parser == parser_i && + lru_parser->last_stream_id == stream_id) { + continue; + } - ret = ml_append_try_parser(parser_i, stream_id, type, - tm, buf, size, NULL, NULL); - if (ret == 0) { - group->lru_parser = parser_i; - group->lru_parser->last_stream_id = stream_id; - lru_parser = parser_i; - processed = FLB_TRUE; - break; - } - else { - parser_i = NULL; + ret = ml_append_try_parser(parser_i, stream_id, type, + tm, buf, size, NULL, NULL); + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } + group->lru_parser = parser_i; + group->lru_parser->last_stream_id = stream_id; + processed = FLB_TRUE; + goto done; + } } + } } +done: if (!processed) { - if (lru_parser) { - flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); - parser_i = lru_parser; - } - else { - /* get the first parser (just to make use of it buffers) */ - parser_i = mk_list_entry_first(&group->parsers, - struct flb_ml_parser_ins, - _head); + /* A non-matching line breaks any multiline sequence. Flush all pending data. */ + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + } } - flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + /* Now process the current line as a standalone message. */ + group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head); + parser_i = mk_list_entry_first(&group->parsers, + struct flb_ml_parser_ins, + _head); mst = flb_ml_stream_get(parser_i, stream_id); if (!mst) { - flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " - "append content to multiline context", stream_id); return -1; } - /* Get stream group */ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); - flb_sds_cat_safe(&st_group->buf, buf, size); + flb_ml_register_context(st_group, tm, NULL); + ret = flb_ml_group_cat(st_group, buf, size); + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE); } - return 0; + return status; } - - int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id, struct flb_time *tm, @@ -759,176 +760,98 @@ int flb_ml_append_object(struct flb_ml *ml, msgpack_object *obj) { int ret; - int type; int processed = FLB_FALSE; + int status = FLB_MULTILINE_OK; struct mk_list *head; struct mk_list *head_group; - struct flb_ml_group *group; + struct flb_ml_group *group = NULL; struct flb_ml_parser_ins *lru_parser = NULL; - struct flb_ml_parser_ins *parser_i; + struct flb_ml_parser_ins *parser_i = NULL; struct flb_ml_stream *mst; struct flb_ml_stream_group *st_group; - struct flb_log_event event; + int type; if (metadata == NULL) { metadata = ml->log_event_decoder.empty_map; } - /* - * As incoming objects, we accept packed events - * and msgpack Maps containing key/value pairs. - */ - if (obj->type == MSGPACK_OBJECT_ARRAY) { + if (obj->type != MSGPACK_OBJECT_MAP) { flb_error("[multiline] appending object with invalid type, expected " "map, received type=%i", obj->type); return -1; - - - flb_log_event_decoder_reset(&ml->log_event_decoder, NULL, 0); - - ret = flb_event_decoder_decode_object(&ml->log_event_decoder, - &event, - obj); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_error("[multiline] invalid event object"); - - return -1; - } - - tm = &event.timestamp; - obj = event.body; - metadata = event.metadata; - - type = FLB_ML_TYPE_MAP; - } - else if (obj->type == MSGPACK_OBJECT_MAP) { - type = FLB_ML_TYPE_MAP; - } - else { - flb_error("[multiline] appending object with invalid type, expected " - "array or map, received type=%i", obj->type); - return -1; } + type = FLB_ML_TYPE_MAP; mk_list_foreach(head, &ml->groups) { group = mk_list_entry(head, struct flb_ml_group, _head); - /* Check if the incoming data matches the last recently used parser */ lru_parser = group->lru_parser; - if (lru_parser && lru_parser->last_stream_id == stream_id) { ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, tm, NULL, 0, metadata, obj); - if (ret == 0) { + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } processed = FLB_TRUE; - break; - } - else { - flb_ml_flush_parser_instance(ml, - lru_parser, - lru_parser->last_stream_id, - FLB_FALSE); + goto done; } } - else if (lru_parser && lru_parser->last_stream_id > 0) { - /* - * Clear last recently used parser to match new parser. - * Do not flush last_stream_id since it should continue to parsing. - */ - lru_parser = NULL; - } } - mk_list_foreach(head_group, &group->parsers) { - parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - if (lru_parser && parser_i == lru_parser) { - continue; - } + if (!processed) { + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + lru_parser = group->lru_parser; + + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + if (lru_parser && parser_i == lru_parser && + lru_parser->last_stream_id == stream_id) { + continue; + } - ret = ml_append_try_parser(parser_i, stream_id, type, - tm, NULL, 0, metadata, obj); - if (ret == 0) { - group->lru_parser = parser_i; - group->lru_parser->last_stream_id = stream_id; - lru_parser = parser_i; - processed = FLB_TRUE; - break; - } - else { - parser_i = NULL; + ret = ml_append_try_parser(parser_i, stream_id, type, + tm, NULL, 0, metadata, obj); + if (ret >= 0) { + if (ret == FLB_MULTILINE_TRUNCATED) { + status = FLB_MULTILINE_TRUNCATED; + } + group->lru_parser = parser_i; + group->lru_parser->last_stream_id = stream_id; + processed = FLB_TRUE; + goto done; + } + } } } +done: if (!processed) { - if (lru_parser) { - flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); - parser_i = lru_parser; - } - else { - /* get the first parser (just to make use of it buffers) */ - parser_i = mk_list_entry_first(&group->parsers, - struct flb_ml_parser_ins, - _head); + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + } } - flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head); + parser_i = mk_list_entry_first(&group->parsers, + struct flb_ml_parser_ins, + _head); + mst = flb_ml_stream_get(parser_i, stream_id); if (!mst) { - flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " - "append content to multiline context", stream_id); - return -1; } - /* Get stream group */ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); - - ret = flb_log_event_encoder_begin_record(&ml->log_event_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ml->log_event_encoder, tm); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (metadata != ml->log_event_decoder.empty_map) { - ret = flb_log_event_encoder_set_metadata_from_msgpack_object( - &ml->log_event_encoder, metadata); - } - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ml->log_event_encoder, obj); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ml->log_event_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - mst->cb_flush(parser_i->ml_parser, - mst, - mst->cb_data, - ml->log_event_encoder.output_buffer, - ml->log_event_encoder.output_length); - } - else { - flb_error("[multiline] log event encoder error : %d", ret); - } - - flb_log_event_encoder_reset(&ml->log_event_encoder); - - /* reset group buffer counters */ - st_group->mp_sbuf.size = 0; - flb_sds_len_set(st_group->buf, 0); - - /* Update last flush time */ - st_group->last_flush = time_ms_now(); + flb_ml_register_context(st_group, tm, obj); + flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE); } - return 0; + return status; } int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id, @@ -945,6 +868,7 @@ int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id, struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) { int result; + size_t limit = 0; struct flb_ml *ml; ml = flb_calloc(1, sizeof(struct flb_ml)); @@ -959,6 +883,13 @@ struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) } ml->config = ctx; + limit = flb_utils_size_to_binary_bytes(ml->config->multiline_buffer_limit); + if (limit > 0) { + ml->buffer_limit = (size_t)limit; + } + else { + ml->buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT; + } ml->last_flush = time_ms_now(); mk_list_init(&ml->groups); @@ -1412,38 +1343,45 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, return -1; } - /* Take the first line keys and repack */ - len = flb_sds_len(parser_i->key_content); - size = map.via.map.size; - msgpack_pack_map(&mp_pck, size); - - for (i = 0; i < size; i++) { - k = map.via.map.ptr[i].key; - v = map.via.map.ptr[i].val; - - /* - * Check if the current key is the key that will contain the - * concatenated multiline buffer - */ - if (k.type == MSGPACK_OBJECT_STR && - parser_i->key_content && - k.via.str.size == len && - strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { - - /* key */ - msgpack_pack_object(&mp_pck, k); - - /* value */ - len = flb_sds_len(group->buf); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, group->buf, len); - } - else { - /* key / val */ - msgpack_pack_object(&mp_pck, k); - msgpack_pack_object(&mp_pck, v); + if (flb_sds_len(group->buf) > 0) { + /* Take the first line keys and repack */ + len = flb_sds_len(parser_i->key_content); + size = map.via.map.size; + msgpack_pack_map(&mp_pck, size); + + for (i = 0; i < size; i++) { + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + /* + * Check if the current key is the key that will contain the + * concatenated multiline buffer + */ + if (k.type == MSGPACK_OBJECT_STR && + parser_i->key_content && + k.via.str.size == len && + strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { + + /* key */ + msgpack_pack_object(&mp_pck, k); + + /* value */ + len = flb_sds_len(group->buf); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, group->buf, len); + } + else { + /* key / val */ + msgpack_pack_object(&mp_pck, k); + msgpack_pack_object(&mp_pck, v); + } } } + else { + /* The buffer is empty, so just pack the original map from the context */ + msgpack_pack_object(&mp_pck, map); + } + msgpack_unpacked_destroy(&result); group->mp_sbuf.size = 0; group->mp_md_sbuf.size = 0; @@ -1497,6 +1435,14 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, FLB_TRUE); } + /* If the buffer was truncated, append the marker to the metadata */ + if (ret == FLB_EVENT_ENCODER_SUCCESS && group->truncated) { + ret = flb_log_event_encoder_append_metadata_values( + &mst->ml->log_event_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("multiline_truncated"), + FLB_LOG_EVENT_BOOLEAN_VALUE(FLB_TRUE)); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_set_body_from_raw_msgpack( &mst->ml->log_event_encoder, @@ -1535,6 +1481,7 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, msgpack_sbuffer_destroy(&mp_sbuf); flb_sds_len_set(group->buf, 0); + group->truncated = FLB_FALSE; /* Update last flush time */ group->last_flush = time_ms_now(); diff --git a/src/multiline/flb_ml_group.c b/src/multiline/flb_ml_group.c index d12caac0ba6..67481771945 100644 --- a/src/multiline/flb_ml_group.c +++ b/src/multiline/flb_ml_group.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -84,3 +85,38 @@ void flb_ml_group_destroy(struct flb_ml_group *group) mk_list_del(&group->_head); flb_free(group); } + +int flb_ml_group_cat(struct flb_ml_stream_group *group, + const char *data, size_t len) +{ + size_t avail; + size_t limit; + int ret; + int status = FLB_MULTILINE_OK; + + limit = group->stream->ml->buffer_limit; + if (limit > 0) { + if (flb_sds_len(group->buf) >= limit) { + group->truncated = FLB_TRUE; + return FLB_MULTILINE_TRUNCATED; + } + + avail = limit - flb_sds_len(group->buf); + if (len > avail) { + len = avail; + group->truncated = FLB_TRUE; + status = FLB_MULTILINE_TRUNCATED; + } + } + + if (len == 0) { + return status; + } + + ret = flb_sds_cat_safe(&group->buf, data, len); + if (ret == -1) { + return -1; + } + + return status; +} diff --git a/src/multiline/flb_ml_rule.c b/src/multiline/flb_ml_rule.c index 8ee0de5a946..ceca40a4070 100644 --- a/src/multiline/flb_ml_rule.c +++ b/src/multiline/flb_ml_rule.c @@ -23,6 +23,7 @@ #include #include +#include struct to_state { struct flb_ml_rule *rule; @@ -379,14 +380,21 @@ int flb_ml_rule_process(struct flb_ml_parser *ml_parser, flb_sds_cat_safe(&group->buf, "\n", 1); } else { - flb_sds_cat_safe(&group->buf, buf_data, buf_size); + ret = flb_ml_group_cat(group, buf_data, buf_size); + if (ret == FLB_MULTILINE_TRUNCATED) { + /* Buffer is full. Flush immediately to send the truncated record. */ + flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); + + /* Reset state so no more lines are appended to this record. */ + group->rule_to_state = NULL; + return ret; + } } rule = st->rule; break; } rule = NULL; } - } if (!rule) { @@ -402,12 +410,13 @@ int flb_ml_rule_process(struct flb_ml_parser *ml_parser, group->rule_to_state = rule; /* concatenate the data */ - flb_sds_cat_safe(&group->buf, buf_data, buf_size); + ret = flb_ml_group_cat(group, buf_data, buf_size); + if (ret == FLB_MULTILINE_TRUNCATED) { + return ret; + } /* Copy full map content in stream buffer */ flb_ml_register_context(group, tm, full_map); - - return 0; } } diff --git a/src/multiline/flb_ml_stream.c b/src/multiline/flb_ml_stream.c index ded0cba8e43..b263e6f6e0a 100644 --- a/src/multiline/flb_ml_stream.c +++ b/src/multiline/flb_ml_stream.c @@ -79,6 +79,10 @@ static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst msgpack_sbuffer_init(&group->mp_sbuf); msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write); + group->truncated = FLB_FALSE; + /* parent stream reference */ + group->stream = mst; + mk_list_add(&group->_head, &mst->groups); return group; diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index e175e1171aa..28e4efb08fd 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -111,12 +111,11 @@ struct record_check container_mix_output[] = { {"a1\n"}, {"a2\n"}, {"ddee\n"}, - {"bbcc"}, + {"bbccdd-out\n"}, + {"dd-err\n"}, {"single full"}, {"1a. some multiline log"}, {"1b. some multiline log"}, - {"dd-out\n"}, - {"dd-err\n"}, }; /* Java stacktrace detection */ @@ -393,6 +392,10 @@ static int flush_callback(struct flb_ml_parser *parser, fprintf(stdout, "%s----------- EOF -----------%s\n", ANSI_YELLOW, ANSI_RESET); + if (!res) { + return 0; + } + /* Validate content */ msgpack_unpacked_init(&result); off = 0; @@ -1457,6 +1460,75 @@ static void test_issue_5504() #endif } +static void test_buffer_limit_truncation() +{ + int ret; + uint64_t stream_id; + struct flb_config *config; + struct flb_ml *ml; + struct flb_ml_parser *mlp; + struct flb_ml_parser_ins *mlp_i; + struct flb_parser *p; + struct flb_time tm; + + /* + * A realistic Docker log where the content of the "log" field will be + * concatenated, and that concatenated buffer is what should be truncated. + */ + char *line1 = "{\"log\": \"12345678901234567890\", \"stream\": \"stdout\"}"; + char *line2 = "{\"log\": \"abcdefghijklmnopqrstuvwxyz\", \"stream\": \"stdout\"}"; + + config = flb_config_init(); + /* The buffer limit is for the concatenated 'log' content, not the full JSON */ + config->multiline_buffer_limit = "80"; + + /* Use the dummy 'docker' parser for JSON extraction */ + p = flb_parser_get("docker", config); + + /* This parser will trigger on any content, ensuring concatenation. */ + ml = flb_ml_create(config, "limit-test"); + TEST_CHECK(ml != NULL); + + mlp = flb_ml_parser_create(config, "test-concat", FLB_ML_REGEX, + NULL, FLB_FALSE, 1000, + "log", NULL, NULL, + p, "docker"); + TEST_CHECK(mlp != NULL); + + /* Define rules that will always match the test data */ + ret = flb_ml_rule_create(mlp, "start_state", "/./", "cont", NULL); + TEST_CHECK(ret == 0); + ret = flb_ml_rule_create(mlp, "cont", "/./", "cont", NULL); + TEST_CHECK(ret == 0); + + /* Finalize parser initialization */ + ret = flb_ml_parser_init(mlp); + TEST_CHECK(ret == 0); + + mlp_i = flb_ml_parser_instance_create(ml, "test-concat"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "test", -1, flush_callback, NULL, &stream_id); + TEST_CHECK(ret == 0); + + flb_time_get(&tm); + + /* Append the first line. It will match the 'start_state' and start a block. */ + ret = flb_ml_append_text(ml, stream_id, &tm, line1, strlen(line1)); + TEST_CHECK(ret == FLB_MULTILINE_OK); + + /* + * Append the second line. This will match the 'cont' state and concatenate. + * The concatenation will exceed the limit + * and correctly trigger the truncation logic. + */ + ret = flb_ml_append_text(ml, stream_id, &tm, line2, strlen(line2)); + TEST_CHECK(ret == FLB_MULTILINE_TRUNCATED); + + flb_ml_destroy(ml); + flb_config_exit(config); +} + TEST_LIST = { /* Normal features tests */ { "parser_docker", test_parser_docker}, @@ -1468,6 +1540,7 @@ TEST_LIST = { { "parser_go", test_parser_go}, { "container_mix", test_container_mix}, { "endswith", test_endswith}, + { "buffer_limit_truncation", test_buffer_limit_truncation}, /* Issues reported on Github */ { "issue_3817_1" , test_issue_3817_1}, diff --git a/tests/internal/utils.c b/tests/internal/utils.c index 232e2461c0b..897cf3b756a 100644 --- a/tests/internal/utils.c +++ b/tests/internal/utils.c @@ -798,6 +798,45 @@ void test_size_to_bytes() } } +struct size_to_bytes_check size_to_binary_bytes_checks[] = { + {"922337.63", 922337}, + {"2K",2048}, + {"5.7263K", 5863}, + {"5.7263KB", 5863}, + {"5.7263KiB", 5863}, + {"9223372036854775.23K", -1}, + {"1M", 1048576}, + {"1.1M", 1153433}, + {"1.1MB", 1153433}, + {"1.1MiB", 1153433}, + {"3.592M", 3766484}, + {"52.752383M", 55314882}, + {"52.752383MB", 55314882}, + {"52.752383MiB", 55314882}, + {"9223372036854.42M", -1}, + {"492.364G",528671819431}, + {"492.364GB",528671819431}, + {"492.364GiB",528671819431}, + {"1.2973G", 1392965268}, + {"9223372036.78G", -1}, +}; + +void test_size_to_binary_bytes() +{ + int i; + int size; + int64_t ret; + struct size_to_bytes_check *u; + + size = sizeof(size_to_binary_bytes_checks) / sizeof(struct size_to_bytes_check); + for (i = 0; i < size; i++) { + u = &size_to_binary_bytes_checks[i]; + + ret = flb_utils_size_to_binary_bytes(u->size); + TEST_CHECK_(ret == u->ret, "ret = %zu, u->ret = %zu", ret, u->ret); + } +} + TEST_LIST = { /* JSON maps iteration */ { "url_split", test_url_split }, @@ -815,5 +854,6 @@ TEST_LIST = { { "test_flb_utils_split_quoted_errors", test_flb_utils_split_quoted_errors}, { "test_flb_utils_get_machine_id", test_flb_utils_get_machine_id }, { "test_size_to_bytes", test_size_to_bytes }, + { "test_size_to_bianry_bytes", test_size_to_binary_bytes }, { 0 } };