A transactional outbox pattern implementation for delivering database rows to Kafka with exactly-once semantics.
kafka-row-sender monitors a source table in SingleStoreDB for new rows and reliably delivers them to a Kafka topic. It uses the transactional outbox pattern to guarantee exactly-once delivery even in the face of failures.
- Outbox Writer Worker: Polls the source table for new rows (using a watermark column), atomically writes them to an outbox table and updates the watermark in a single transaction
- Kafka Row Sender Worker: Reads from the outbox table, sends messages to Kafka with idempotent producer settings, and deletes successfully delivered entries
This two-phase approach ensures:
- No messages are lost (rows are persisted in the outbox before delivery)
- No duplicate messages (Kafka idempotence + outbox deduplication)
- Graceful recovery from Kafka or database failures
- Python 3.10+
- SingleStoreDB
- Apache Kafka
- uv (recommended) or pip
# Clone the repository
git clone <repository-url>
cd kafka-row-sender
# Install dependencies with uv
uv sync
# Or with pip
pip install -e .All configuration is done via environment variables with the KAFKA_ROW_SENDER_ prefix. Copy the example file and edit as needed:
cp .env.example .env| Variable | Description |
|---|---|
KAFKA_ROW_SENDER_DB_USER |
SingleStoreDB username |
KAFKA_ROW_SENDER_DB_PASSWORD |
SingleStoreDB password |
KAFKA_ROW_SENDER_DB_NAME |
Database name |
KAFKA_ROW_SENDER_SOURCE_TABLE |
Table to monitor for new rows |
KAFKA_ROW_SENDER_KAFKA_TOPIC |
Kafka topic to publish to |
| Variable | Default | Description |
|---|---|---|
KAFKA_ROW_SENDER_DB_HOST |
localhost |
Database host |
KAFKA_ROW_SENDER_DB_PORT |
3306 |
Database port |
KAFKA_ROW_SENDER_DB_CONNECT_TIMEOUT |
10 |
Database connection timeout (seconds) |
KAFKA_ROW_SENDER_DB_READ_TIMEOUT |
30 |
Database read timeout (seconds) |
KAFKA_ROW_SENDER_SOURCE_WATERMARK_COLUMN |
WatermarkId |
Auto-increment column for tracking progress |
KAFKA_ROW_SENDER_SOURCE_COLUMNS |
(all) | Comma-separated list of columns to include |
KAFKA_ROW_SENDER_KAFKA_BOOTSTRAP_SERVERS |
localhost:9092 |
Kafka bootstrap servers |
KAFKA_ROW_SENDER_KAFKA_KEY_COLUMN |
(none) | Column to use as Kafka partition key |
KAFKA_ROW_SENDER_KAFKA_ACKS |
all |
Kafka acknowledgment level |
KAFKA_ROW_SENDER_KAFKA_ENABLE_IDEMPOTENCE |
true |
Enable Kafka idempotent producer |
KAFKA_ROW_SENDER_OUTBOX_WRITER_POLL_INTERVAL |
1.0 |
Seconds between source table polls |
KAFKA_ROW_SENDER_OUTBOX_WRITER_BATCH_SIZE |
1000 |
Max rows to process per batch |
KAFKA_ROW_SENDER_KAFKA_SENDER_POLL_INTERVAL |
0.5 |
Seconds between outbox polls |
KAFKA_ROW_SENDER_KAFKA_SENDER_BATCH_SIZE |
500 |
Max messages to send per batch |
KAFKA_ROW_SENDER_KAFKA_RETRY_BACKOFF_BASE |
1.0 |
Initial retry backoff (seconds) |
KAFKA_ROW_SENDER_KAFKA_RETRY_BACKOFF_MAX |
60.0 |
Maximum retry backoff (seconds) |
KAFKA_ROW_SENDER_KAFKA_MAX_RETRIES |
100 |
Max retries before message is discarded |
Before running kafka-row-sender, create the required infrastructure tables using the built-in init command:
# Set the required environment variables
export KAFKA_ROW_SENDER_DB_USER=<user>
export KAFKA_ROW_SENDER_DB_PASSWORD=<password>
export KAFKA_ROW_SENDER_DB_NAME=<database>
# Initialize the infrastructure tables
kafka-row-sender initThis creates two tables:
kafka_row_sender_watermark- Tracks the last processed row for each source tablekafka_row_sender_outbox- Stores rows pending delivery to Kafka
If the tables already exist, the command prints a message and exits without error.
Your source table must have an auto-increment column (default: WatermarkId) that kafka-row-sender uses to track which rows have been processed.
kafka-row-sender provides the following subcommands:
Run the kafka-row-sender service. This is the default when no subcommand is specified.
# Run with uv
uv run kafka-row-sender
# Or explicitly use the 'run' subcommand
uv run kafka-row-sender run
# Or if installed with pip
kafka-row-senderThe service runs continuously until stopped with SIGTERM or SIGINT (Ctrl+C). It handles graceful shutdown, ensuring in-flight messages are delivered before exiting.
Initialize the database infrastructure tables. Only needs to be run once per database.
kafka-row-sender initDisplay the SQL statements used to initialize the database tables without executing them. Useful for manual review or execution in a different context.
# Display SQL with default table names
kafka-row-sender init-sql
# Customize table names
kafka-row-sender init-sql --watermark-table my_watermark --outbox-table my_outboxOptions:
--watermark-table: Name for the watermark table (default:kafka_row_sender_watermark)--outbox-table: Name for the outbox table (default:kafka_row_sender_outbox)
Display help information.
kafka-row-sender helpStart the required services using Docker Compose:
docker compose up -dThis starts:
- SingleStoreDB: localhost:3306 (user: root, password: test_password)
- SingleStore Studio: http://localhost:8081
- Kafka: localhost:9092
- Kafka UI: http://localhost:8080
Wait for services to be healthy:
docker compose psRun the full integration test:
uv run python scripts/test_e2e.pyThis test:
- Creates a test database with sample tables
- Inserts test rows into the source table
- Runs kafka-row-sender briefly
- Verifies all messages arrived in Kafka exactly once
- Cleans up
Test failure recovery behavior:
uv run python scripts/test_kafka_recovery.pydocker compose downuv sync --group dev# Lint
uv run ruff check src/
# Format
uv run ruff format src/
# Type check
uv run mypy src/┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Source Table │────▶│ Outbox Table │────▶│ Kafka │
│ (your data) │ │ (kafka_row_ │ │ │
│ │ │ sender_outbox) │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ │ │
▼ ▼ │
┌─────────────────┐ ┌─────────────────┐ │
│ Watermark │◀────│ OutboxWriter │ │
│ Table │ │ Worker │ │
└─────────────────┘ └─────────────────┘ │
│
┌─────────────────┐ │
│ KafkaRowSender │───-─────────┘
│ Worker │
└─────────────────┘
Licensed under the Apache License, Version 2.0. See LICENSE for details.