Skip to content

Conversation

@edsiper
Copy link
Member

@edsiper edsiper commented Aug 14, 2025

This patch refactors the Kafka input plugin’s offset commit logic when enable.auto.commit is disabled.

Previously, offsets were committed immediately after each individual message was processed, even before it was flushed to the Fluent Bit pipeline. This introduced a small risk window where, in the event of a crash or forced shutdown, messages could be acknowledged (via Kafka commit) without being fully ingested resulting in potential data loss under extreme conditions.

This patch moves the offset commit logic to occur only after a successful batch registration through flb_input_log_append(). This ensures that:

  • offsets are only committed if messages were actually encoded and flushed.
  • we avoid committing unprocessed data, thereby improving correctness.
  • we reduce the number of commits, improving efficiency in batch mode.

This change aligns commit timing with actual ingestion and eliminates the window of inconsistency between processing and commit acknowledgment.


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • Bug Fixes
    • Improved Kafka input reliability by committing offsets only after records are successfully appended to the log.
    • Prevents premature commits during polling, reducing risk of data loss in non-auto-commit setups.
    • No-op commits when there’s no data to flush; failures now propagate correctly.
    • Added clearer error logging for append failures to aid troubleshooting.
    • No configuration or API changes required; behavior is more consistent and robust out of the box.

This patch refactors the Kafka input plugin’s offset commit logic when enable.auto.commit is disabled.

Previously, offsets were committed immediately after each individual message was processed, even before
it was flushed to the Fluent Bit pipeline. This introduced a small risk window where, in the event of a
crash or forced shutdown, messages could be acknowledged (via Kafka commit) without being fully ingested
resulting in potential data loss under extreme conditions.

This patch moves the offset commit logic to occur only after a successful batch flush (flb_input_log_append).
This ensures that:

  - offsets are only committed if messages were actually encoded and flushed.
  - we avoid committing unprocessed data, thereby improving correctness.
  - we reduce the number of commits, improving efficiency in batch mode.

This change aligns commit timing with actual ingestion and eliminates the window of inconsistency between
processing and commit acknowledgment.

Signed-off-by: Eduardo Silva <[email protected]>
@coderabbitai
Copy link

coderabbitai bot commented Aug 14, 2025

Walkthrough

The Kafka input plugin’s commit logic in in_kafka_collect was refactored to conditionally commit only after a successful append to the input log. A new append_ret variable governs the flow, with explicit error logging on append failures and early success when no data is produced.

Changes

Cohort / File(s) Summary of Changes
Kafka input commit handling
plugins/in_kafka/in_kafka.c
Introduced append_ret; removed unconditional commit inside poll loop; after successful encoding, call flb_input_log_append and commit only if append_ret == 0 and auto-commit is disabled; log error and set ret = -1 on append failure; set ret = 0 when no data (output_length == 0).

Sequence Diagram(s)

sequenceDiagram
    participant Kafka as Kafka Broker
    participant Plugin as in_kafka_collect
    participant Encoder as Encoder
    participant InputLog as flb_input_log_append

    Kafka->>Plugin: Poll messages
    Plugin->>Encoder: Encode batch
    alt output_length == 0
        Plugin-->>Kafka: No commit/change
        Plugin-->Plugin: ret = 0
    else Encode success
        Plugin->>InputLog: Append records
        alt append_ret == 0
            opt enable_auto_commit == false
                Plugin->>Kafka: rd_kafka_commit()
            end
            Plugin-->Plugin: ret = 0
        else append_ret != 0
            Plugin->>Plugin: Log error "failed to append records"
            Plugin-->Plugin: ret = -1
        end
    end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

Poem

A rabbit taps the commit key light,
Only after logs are tucked in tight.
If append says “go,” we nod and write—
If not, we thump, abort the flight.
Kafka whispers, “all is right,”
When carrots—er, records—encode just right. 🥕

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch in_kafka_safe_commit

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
plugins/in_kafka/in_kafka.c (2)

160-160: Nit: Initialize append_ret or narrow its scope

Minor polish to avoid any chance of future uninitialized-use if the block is refactored later. Alternatively, declare it inside the append block.

-int append_ret;
+int append_ret = -1;

224-226: Improve append failure logging (include error code and payload size)

Richer context makes triage simpler.

-                flb_plg_error(ins, "failed to append records");
+                flb_plg_error(ins, "failed to append records: ret=%d, bytes=%zu",
+                              append_ret, ctx->log_encoder->output_length);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these settings in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1eefbf3 and 8425a60.

📒 Files selected for processing (1)
  • plugins/in_kafka/in_kafka.c (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (2)
plugins/in_kafka/in_kafka.c (2)

213-216: LGTM: Commit gated by successful append aligns with at-least-once ingestion

Deferring commits until after flb_input_log_append() succeeds removes the data-loss window called out in the PR objective. This is the right place to gate commits.


228-229: LGTM: No-op when no data to flush

Early success when there’s nothing to append is correct and avoids unnecessary commits.

Comment on lines +217 to +222
if (append_ret == 0) {
if (!ctx->enable_auto_commit) {
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}
ret = 0;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle rd_kafka_commit() errors; consider async commit optionally

Currently, commit failures are silent. Capture and log the error so operators can observe commit issues. Keep ret = 0 to preserve at-least-once semantics (append already succeeded), but surface the failure.

Optionally, consider using async commit (third arg = 1) or making it configurable to avoid blocking the input thread on network hiccups.

-            if (!ctx->enable_auto_commit) {
-                rd_kafka_commit(ctx->kafka.rk, NULL, 0);
-            }
+            if (!ctx->enable_auto_commit) {
+                rd_kafka_resp_err_t c_err;
+                c_err = rd_kafka_commit(ctx->kafka.rk, NULL, 0);
+                if (c_err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+                    flb_plg_warn(ins, "rd_kafka_commit failed: %s",
+                                 rd_kafka_err2str(c_err));
+                    /* Keep ret == 0: data has been appended; failing here would
+                     * not help delivery guarantees and could cause unnecessary
+                     * collector backoff. */
+                }
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (append_ret == 0) {
if (!ctx->enable_auto_commit) {
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}
ret = 0;
}
if (append_ret == 0) {
if (!ctx->enable_auto_commit) {
rd_kafka_resp_err_t c_err;
c_err = rd_kafka_commit(ctx->kafka.rk, NULL, 0);
if (c_err != RD_KAFKA_RESP_ERR_NO_ERROR) {
flb_plg_warn(ins, "rd_kafka_commit failed: %s",
rd_kafka_err2str(c_err));
/* Keep ret == 0: data has been appended; failing here would
* not help delivery guarantees and could cause unnecessary
* collector backoff. */
}
}
ret = 0;
}

@edsiper edsiper merged commit 459d4a1 into master Aug 14, 2025
63 checks passed
@edsiper edsiper deleted the in_kafka_safe_commit branch August 14, 2025 21:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants