From cf6ccc73bf1a3aae77d0fc0f29a1d6a3526c1c0a Mon Sep 17 00:00:00 2001 From: zshuang0316 <163435379+zshuang0316@users.noreply.github.com> Date: Sun, 3 Aug 2025 20:32:13 +0800 Subject: [PATCH] in_tail: fix last_processed_bytes calculation Fix last_processed_bytes calculation in multiple line scenario. Signed-off-by: zshuang0316 <163435379+zshuang0316@users.noreply.github.com> --- plugins/in_tail/tail_file.c | 3 +- .../runtime/data/tail/parsers_multiline.conf | 6 ++ tests/runtime/in_tail.c | 86 +++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 tests/runtime/data/tail/parsers_multiline.conf diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 1d6ba3f95fe..308d52cc5cd 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -648,7 +648,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) processed_bytes += len + 1; lines++; file->parsed = 0; - file->last_processed_bytes += processed_bytes; + file->last_processed_bytes = processed_bytes; } #ifdef FLB_HAVE_UNICODE_ENCODER @@ -1644,6 +1644,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) /* Adjust the file offset and buffer */ file->stream_offset += processed_bytes; + file->last_processed_bytes = 0; consume_bytes(file->buf_data, processed_bytes, file->buf_len); file->buf_len -= processed_bytes; file->buf_data[file->buf_len] = '\0'; diff --git a/tests/runtime/data/tail/parsers_multiline.conf b/tests/runtime/data/tail/parsers_multiline.conf new file mode 100644 index 00000000000..6d57fae6742 --- /dev/null +++ b/tests/runtime/data/tail/parsers_multiline.conf @@ -0,0 +1,6 @@ +[MULTILINE_PARSER] + name multiline-regex + type regex + flush_timeout 2000 + rule "start_state" "/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]/" "cont" + rule "cont" "/^(?!\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]).*/" "cont" diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index e3af2c3ba88..c83e2b01213 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -1390,6 +1390,91 @@ void flb_test_offset_key() test_tail_ctx_destroy(ctx); } +void flb_test_multiline_offset_key() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"multiline_offset.log"}; + char *offset_key = "OffsetKey"; + char *msg_before_tail = "[2025-06-16 20:42:22,291] INFO - aaaaaaaaaaa"; + char *msg_before_tail2 = "[2025-06-16 20:42:22,500] Error"; + char *msg_final = "[2025-06-16 20:45:29,234] Fatal"; + char expected_msg[1024] = {0}; + int ret; + int num; + + char *expected_strs[] = {msg_final, &expected_msg[0]}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + // multiline offset is at the end of the message + ret = snprintf(&expected_msg[0], sizeof(expected_msg), "\"%s\":%ld", offset_key, strlen(msg_before_tail)+strlen(NEW_LINE)+strlen(msg_before_tail2)+strlen(NEW_LINE)+strlen(msg_final)+strlen(NEW_LINE)); + if(!TEST_CHECK(ret >= 0)) { + TEST_MSG("snprintf failed"); + exit(EXIT_FAILURE); + } + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_service_set(ctx->flb, "Parsers_File", DPATH "/parsers_multiline.conf", NULL); + TEST_CHECK(ret == 0); + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "offset_key", offset_key, + "multiline.parser", "multiline-regex", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg_before_tail, strlen(msg_before_tail)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + ret = write_msg(ctx, msg_before_tail2, strlen(msg_before_tail2)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg_final, strlen(msg_final)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* wait up to 5s for at least one output */ + wait_num_with_timeout(5000, &num); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_tail_ctx_destroy(ctx); +} + void flb_test_skip_empty_lines() { struct flb_lib_out_cb cb_data; @@ -2345,6 +2430,7 @@ TEST_LIST = { {"path_key", flb_test_path_key}, {"exclude_path", flb_test_exclude_path}, {"offset_key", flb_test_offset_key}, + {"multiline_offset_key", flb_test_multiline_offset_key}, {"skip_empty_lines", flb_test_skip_empty_lines}, {"skip_empty_lines_crlf", flb_test_skip_empty_lines_crlf}, {"ignore_older", flb_test_ignore_older},