Skip to content

config: multiline: in_tail: filter_multiline: Add configurable buffer limit for multiline interface #10653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
76d0d4f
config: Add parameter for multiline limitation
cosmo0920 Jul 25, 2025
503cdbd
ml: group: stream: Prepare to handle limitations of multiline
cosmo0920 Jul 25, 2025
062bfa3
ml: Process truncations
cosmo0920 Jul 25, 2025
c5e5b51
filter_multiline: Handle truncations due to exceeded limits
cosmo0920 Jul 25, 2025
a2804ec
in_tail: Handle truncations due to exceeded limits
cosmo0920 Jul 25, 2025
b71f25e
ml: rule: Apply limit of group during regex handling
cosmo0920 Aug 1, 2025
4132292
ml: group: stream: Add multiline_truncated: true metadata for truncat…
cosmo0920 Jul 29, 2025
5aa2e61
ml: Refer the configured limitation of multiline
cosmo0920 Aug 1, 2025
a72b13f
ml: rule: Flush immediately when truncated
cosmo0920 Aug 1, 2025
65a2793
ml: Avoid to use the collided status code
cosmo0920 Aug 1, 2025
e40c50b
ml: tests: Process truncated case and follow the interface change
cosmo0920 Jul 25, 2025
dec20f1
ml: tests: Detect truncation ocurrence in multiline testcase.
cosmo0920 Jul 25, 2025
e98ce4a
in_tail: Add metrics for number of occurrence of truncations on multi…
cosmo0920 Aug 4, 2025
a0b6299
filter_multiline: Add metrics for emissions and truncations
cosmo0920 Aug 4, 2025
b08d69e
config: ml: Handle SI prefixes on the buffer limit of multiline
cosmo0920 Aug 4, 2025
5b60a72
ml: tests: Follow the type change for the buffer limit
cosmo0920 Aug 4, 2025
5e91524
utils: tests: Implement binary bytes conversion function
cosmo0920 Aug 4, 2025
20c7695
ml: Use binary byte conversion function
cosmo0920 Aug 4, 2025
d366fca
filter_multiline: Process truncated metrics always
cosmo0920 Aug 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ struct flb_config {

/* Multiline core parser definitions */
struct mk_list multiline_parsers;
char *multiline_buffer_limit; /* limit for multiline concatenated data */

/* Outputs instances */
struct mk_list outputs; /* list of output plugins */
Expand Down Expand Up @@ -408,6 +409,9 @@ enum conf_type {
/* Coroutines */
#define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size"

/* Multiline */
#define FLB_CONF_STR_MULTILINE_BUFFER_LIMIT "multiline_buffer_limit"

/* Scheduler */
#define FLB_CONF_STR_SCHED_CAP "scheduler.cap"
#define FLB_CONF_STR_SCHED_BASE "scheduler.base"
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void flb_utils_split_free_entry(struct flb_split_entry *entry);
void flb_utils_split_free(struct mk_list *list);
int flb_utils_timer_consume(flb_pipefd_t fd);
int64_t flb_utils_size_to_bytes(const char *size);
int64_t flb_utils_size_to_binary_bytes(const char *size);
int64_t flb_utils_hex2int(char *hex, int len);
int flb_utils_time_to_seconds(const char *time);
int flb_utils_pipe_byte_consume(flb_pipefd_t fd);
Expand Down
16 changes: 16 additions & 0 deletions include/fluent-bit/multiline/flb_ml.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@
/* Default multiline buffer size: 4Kb */
#define FLB_ML_BUF_SIZE 1024*4

/* Default limit for concatenated multiline messages: 2MB */
#define FLB_ML_BUFFER_LIMIT_DEFAULT_STR "2MB"
#define FLB_ML_BUFFER_LIMIT_DEFAULT (1024 * 1024 * 2)

/* Return codes */
#define FLB_MULTILINE_OK 0
#define FLB_MULTILINE_PROCESSED 1 /* Reserved */
#define FLB_MULTILINE_TRUNCATED 2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to status code 2 is needed because status code 1 will be collided for FLB_TRUE status.


/* Maximum number of groups per stream */
#define FLB_ML_MAX_GROUPS 6

Expand Down Expand Up @@ -106,6 +115,10 @@ struct flb_ml_stream_group {
msgpack_sbuffer mp_sbuf; /* temporary msgpack buffer */
msgpack_packer mp_pck; /* temporary msgpack packer */
struct flb_time mp_time; /* multiline time parsed from first line */
int truncated; /* was the buffer truncated? */

/* parent stream reference */
struct flb_ml_stream *stream;

struct mk_list _head;
};
Expand Down Expand Up @@ -275,6 +288,9 @@ struct flb_ml {
struct flb_log_event_encoder log_event_encoder;
struct flb_log_event_decoder log_event_decoder;
struct flb_config *config; /* Fluent Bit context */

/* Limit for concatenated multiline messages */
size_t buffer_limit;
};

struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name);
Expand Down
8 changes: 8 additions & 0 deletions include/fluent-bit/multiline/flb_ml_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,12 @@ struct flb_ml_group *flb_ml_group_create(struct flb_ml *ml);
void flb_ml_group_destroy(struct flb_ml_group *group);
int flb_ml_group_add_parser(struct flb_ml *ctx, struct flb_ml_parser_ins *p);

/*
* Append data to a multiline stream group respecting the configured
* buffer limit. The length of the appended data might be reduced if
* the limit is reached.
*/
int flb_ml_group_cat(struct flb_ml_stream_group *group,
const char *data, size_t len);

#endif
53 changes: 51 additions & 2 deletions plugins/filter_multiline/ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,17 @@ static int cb_ml_init(struct flb_filter_instance *ins,
"emit_records", ctx->ins->metrics);
#endif
}
/* Truncated metrics always should be existing. */
#ifdef FLB_HAVE_METRICS
ctx->cmt_truncated = cmt_counter_create(ins->cmt,
"fluentbit", "filter", "emit_truncated_total",
"Total number of truncated occurence of multiline",
1, (char *[]) {"name"});

/* OLD api */
flb_metrics_add(FLB_MULTILINE_METRIC_TRUNCATED,
"emit_truncated", ctx->ins->metrics);
#endif

mk_list_init(&ctx->ml_streams);
mk_list_init(&ctx->split_message_packers);
Expand Down Expand Up @@ -780,6 +791,10 @@ static int cb_ml_filter(const void *data, size_t bytes,
struct flb_log_event event;
int ret;
struct ml_ctx *ctx;
#ifdef FLB_HAVE_METRICS
uint64_t ts;
char *name;
#endif

(void) f_ins;
(void) config;
Expand Down Expand Up @@ -820,7 +835,19 @@ static int cb_ml_filter(const void *data, size_t bytes,
FLB_EVENT_DECODER_SUCCESS) {
ret = flb_ml_append_event(ctx->m, ctx->stream_id, &event);

if (ret != 0) {
if (ret == FLB_MULTILINE_TRUNCATED) {
flb_plg_warn(ctx->ins,
"multiline message truncated due to buffer limit");
#ifdef FLB_HAVE_METRICS
name = (char *) flb_filter_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name});

/* old api */
flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics);
#endif
}
else if (ret != FLB_MULTILINE_OK) {
flb_plg_debug(ctx->ins,
"could not append object from tag: %s", tag);
}
Expand Down Expand Up @@ -871,10 +898,32 @@ static int cb_ml_filter(const void *data, size_t bytes,
FLB_EVENT_DECODER_SUCCESS) {
ret = flb_ml_append_event(ctx->m, stream->stream_id, &event);

if (ret != 0) {
if (ret == FLB_MULTILINE_TRUNCATED) {
flb_plg_warn(ctx->ins,
"multiline message truncated due to buffer limit");
#ifdef FLB_HAVE_METRICS
name = (char *) flb_filter_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_truncated, ts, 1, (char *[]) {name});

/* old api */
flb_metrics_sum(FLB_MULTILINE_METRIC_TRUNCATED, 1, ctx->ins->metrics);
#endif
}
else if (ret != FLB_MULTILINE_OK) {
flb_plg_debug(ctx->ins,
"could not append object from tag: %s", tag);
}
else if (ret == FLB_MULTILINE_OK) {
#ifdef FLB_HAVE_METRICS
name = (char *) flb_filter_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_emitted, ts, 1, (char *[]) {name});

/* old api */
flb_metrics_sum(FLB_MULTILINE_METRIC_EMITTED, 1, ctx->ins->metrics);
#endif
}
}

flb_log_event_decoder_destroy(&decoder);
Expand Down
4 changes: 3 additions & 1 deletion plugins/filter_multiline/ml.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@

#define FLB_MULTILINE_MEM_BUF_LIMIT_DEFAULT "10M"
#define FLB_MULTILINE_METRIC_EMITTED 200
#define FLB_MULTILINE_METRIC_TRUNCATED 201
#define FLB_MULTILINE_MODE_PARTIAL_MESSAGE "partial_message"
#define FLB_MULTILINE_MODE_PARSER "parser"

/*
/*
* input instance + tag is the unique identifier
* for a multiline stream
* TODO: implement clean up of streams that haven't been used recently
Expand Down Expand Up @@ -77,6 +78,7 @@ struct ml_ctx {

#ifdef FLB_HAVE_METRICS
struct cmt_counter *cmt_emitted;
struct cmt_counter *cmt_truncated;
#endif
};

Expand Down
8 changes: 8 additions & 0 deletions plugins/in_tail/tail_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,21 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
"Total number of rotated files",
1, (char *[]) {"name"});

ctx->cmt_multiline_truncated = \
cmt_counter_create(ins->cmt,
"fluentbit", "input",
"multiline_truncated_total",
"Total number of truncated occurences for multilines",
1, (char *[]) {"name"});
/* OLD metrics */
flb_metrics_add(FLB_TAIL_METRIC_F_OPENED,
"files_opened", ctx->ins->metrics);
flb_metrics_add(FLB_TAIL_METRIC_F_CLOSED,
"files_closed", ctx->ins->metrics);
flb_metrics_add(FLB_TAIL_METRIC_F_ROTATED,
"files_rotated", ctx->ins->metrics);
flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED,
"multiline_truncated", ctx->ins->metrics);
#endif

return ctx;
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#define FLB_TAIL_METRIC_F_OPENED 100 /* number of opened files */
#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */
#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */
#define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */
#endif

struct flb_tail_config {
Expand Down Expand Up @@ -167,6 +168,7 @@ struct flb_tail_config {
struct cmt_counter *cmt_files_opened;
struct cmt_counter *cmt_files_closed;
struct cmt_counter *cmt_files_rotated;
struct cmt_counter *cmt_multiline_truncated;

/* Hash: hash tables for quick acess to registered files */
struct flb_hash_table *static_hash;
Expand Down
16 changes: 16 additions & 0 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
#ifdef FLB_HAVE_UNICODE_ENCODER
size_t decoded_len;
#endif
#ifdef FLB_HAVE_METRICS
uint64_t ts;
char *name;
#endif


ctx = (struct flb_tail_config *) file->config;

Expand Down Expand Up @@ -563,6 +568,17 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
&out_time,
line,
line_len);
if (ret == FLB_MULTILINE_TRUNCATED) {
flb_plg_warn(ctx->ins, "multiline message truncated due to buffer limit");
#ifdef FLB_HAVE_METRICS
name = (char *) flb_input_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_multiline_truncated, ts, 1, (char *[]) {name});

/* Old api */
flb_metrics_sum(FLB_TAIL_METRIC_M_TRUNCATED, 1, ctx->ins->metrics);
#endif
}
goto go_next;
}
else if (ctx->docker_mode) {
Expand Down
5 changes: 5 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ struct flb_service_config service_configs[] = {
FLB_CONF_TYPE_INT,
offsetof(struct flb_config, coro_stack_size)},

{FLB_CONF_STR_MULTILINE_BUFFER_LIMIT,
FLB_CONF_TYPE_STR,
offsetof(struct flb_config, multiline_buffer_limit)},

/* Scheduler */
{FLB_CONF_STR_SCHED_CAP,
FLB_CONF_TYPE_INT,
Expand Down Expand Up @@ -361,6 +365,7 @@ struct flb_config *flb_config_init()
* on we use flb_config_exit to cleanup the config, which requires
* the config->multiline_parsers list to be initialized. */
mk_list_init(&config->multiline_parsers);
config->multiline_buffer_limit = FLB_ML_BUFFER_LIMIT_DEFAULT_STR;

/* Task map */
ret = flb_config_task_map_resize(config, FLB_CONFIG_DEFAULT_TASK_MAP_SIZE);
Expand Down
86 changes: 86 additions & 0 deletions src/flb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,92 @@ int64_t flb_utils_size_to_bytes(const char *size)
return (int64_t)val;
}

int64_t flb_utils_size_to_binary_bytes(const char *size)
{
int i;
int len;
int plen = 0;
double val;
char tmp[4] = {0};
int64_t KiB = 1024;
int64_t MiB = 1024 * KiB;
int64_t GiB = 1024 * MiB;

if (!size) {
return -1;
}

if (strcasecmp(size, "false") == 0) {
return 0;
}

len = strlen(size);
val = atof(size);

if (len == 0) {
return -1;
}

for (i = len - 1; i >= 0; i--) {
if (isalpha(size[i])) {
plen++;
}
else {
break;
}
}

if (plen == 0) {
return (int64_t)val;
}
else if (plen > 3) {
return -1;
}

for (i = 0; i < plen; i++) {
tmp[i] = toupper(size[len - plen + i]);
}

if (plen == 2) {
if (tmp[1] != 'B') {
return -1;
}
}
if (plen == 3) {
if (tmp[1] != 'I' || tmp[2] != 'B') {
return -1;
}
}

if (tmp[0] == 'K') {
/* set upper bound (2**64/KiB)/2 to avoid overflows */
if (val >= 9223372036854775.0 || val <= -9223372036854774.0)
{
return -1;
}
return (int64_t)(val * KiB);
}
else if (tmp[0] == 'M') {
/* set upper bound (2**64/MiB)/2 to avoid overflows */
if (val >= 9223372036854.0 || val <= -9223372036853.0) {
return -1;
}
return (int64_t)(val * MiB);
}
else if (tmp[0] == 'G') {
/* set upper bound (2**64/GiB)/2 to avoid overflows */
if (val >= 9223372036.0 || val <= -9223372035.0) {
return -1;
}
return (int64_t)(val * GiB);
}
else {
return -1;
}

return (int64_t)val;
}

int64_t flb_utils_hex2int(char *hex, int len)
{
int i = 0;
Expand Down
Loading
Loading