A powerful and easy-to-use command-line interface for Apache Kafka operations.
- Features
- Installation
- Quick Start
- Commands
- Authentication
- Message Formats
- Environment Variables
- Common Use Cases
- Troubleshooting
- License
- âś… Consumer & Producer - Full support for consuming and producing messages
- âś… Multiple Authentication Methods - Plain, SCRAM-SHA-256/512, AWS IAM, OAuth Bearer
- âś… Flexible Message Formats - JSON, JavaScript, raw text, or custom formatters
- âś… Consumer Groups - Full consumer group support with offset management
- âś… Time-based Consumption - Read messages from specific timestamps
- âś… SSL/TLS Support - Secure connections to Kafka clusters
- âś… Topic Management - Create, delete, and inspect topics
- âś… Headers Support - Read and write message headers
- âś… GZIP Compression - Automatic compression support
- âś… TypeScript - Full TypeScript support
npm install -g kafka-console
npm install kafka-console
npx kafka-console [command]
kafka-console list --brokers localhost:9092
kafka-console consume my-topic --brokers localhost:9092
echo '{"message": "Hello Kafka!"}' | kafka-console produce my-topic --brokers localhost:9092
kafka-console consume <topic> [options]
Option | Description | Default |
---|---|---|
-g, --group <group> |
Consumer group name | kafka-console-consumer-{timestamp} |
-f, --from <from> |
Start position (timestamp/ISO date/0 for beginning) | latest |
-c, --count <count> |
Number of messages to read | unlimited |
-s, --skip <skip> |
Number of messages to skip | 0 |
-o, --output <file> |
Write output to file | stdout |
-d, --data-format <format> |
Message format (json/js/raw/custom) | json |
-p, --pretty |
Pretty print JSON output | false |
Consume from beginning and pretty print:
kafka-console consume my-topic --from 0 --pretty
Consume last 10 messages:
kafka-console consume my-topic --count 10
Consume from specific timestamp:
kafka-console consume my-topic --from "2024-01-01T00:00:00Z"
Consume with specific consumer group:
kafka-console consume my-topic --group my-consumer-group
Save output to file:
kafka-console consume my-topic --output messages.json
Extract specific fields with jq:
kafka-console consume my-topic | jq '.value.userId'
kafka-console produce <topic> [options]
Option | Description | Default |
---|---|---|
-i, --input <file> |
Read input from file | stdin |
-d, --data-format <format> |
Message format (json/js/raw/custom) | json |
-h, --header <header> |
Add message header (format: key:value) | none |
-w, --wait <ms> |
Wait time between messages | 0 |
Produce single message:
echo '{"user": "john", "action": "login"}' | kafka-console produce my-topic
Produce from file:
kafka-console produce my-topic --input messages.json
Produce with headers:
echo '{"data": "test"}' | kafka-console produce my-topic --header "source:api" --header "version:1.0"
Produce multiple messages from JSON array:
cat users.json | jq -c '.[]' | kafka-console produce my-topic
Produce with key (for partitioning):
echo '{"key": "user123", "value": {"name": "John"}}' | kafka-console produce my-topic
kafka-console topic:create my-new-topic
kafka-console topic:delete old-topic
kafka-console topic:offsets my-topic
kafka-console topic:offsets my-topic "2024-01-01T00:00:00Z"
kafka-console list
kafka-console list --all
kafka-console metadata
kafka-console config --resource topic --resourceName my-topic
kafka-console consume my-topic \
--brokers broker1:9093,broker2:9093 \
--ssl
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism plain \
--username myuser \
--password mypassword
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism scram-sha-256 \
--username myuser \
--password mypassword
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism aws \
--access-key-id AKIAXXXXXXXX \
--secret-access-key XXXXXXXXXX \
--session-token XXXXXXXXXX
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism oauthbearer \
--oauth-bearer "eyJhbGciOiJIUzI1NiIs..."
Messages are parsed as JSON:
echo '{"name": "Alice", "age": 30}' | kafka-console produce my-topic
Messages are sent as plain text:
echo "Plain text message" | kafka-console produce my-topic --data-format raw
Messages can contain JavaScript exports:
echo 'module.exports = { timestamp: Date.now() }' | kafka-console produce my-topic --data-format js
Create a custom formatter module:
// formatter/custom.js
module.exports = {
encode: (value) => Buffer.from(JSON.stringify(value)),
decode: (buffer) => JSON.parse(buffer.toString())
};
Use the custom formatter:
kafka-console consume my-topic --data-format ./formatter/custom.js
Set environment variables to avoid repeating common options:
export KAFKA_BROKERS=broker1:9092,broker2:9092
export KAFKA_USERNAME=myuser
export KAFKA_PASSWORD=mypassword
export KAFKA_MECHANISM=plain
export KAFKA_TIMEOUT=30000
All supported environment variables:
KAFKA_BROKERS
- Comma-separated list of brokersKAFKA_TIMEOUT
- Operation timeout in millisecondsKAFKA_MECHANISM
- SASL mechanismKAFKA_USERNAME
- SASL usernameKAFKA_PASSWORD
- SASL passwordKAFKA_AUTH_ID
- AWS authorization identityKAFKA_ACCESS_KEY_ID
- AWS access key IDKAFKA_SECRET_ACCESS_KEY
- AWS secret access keyKAFKA_SESSION_TOKEN
- AWS session tokenKAFKA_OAUTH_BEARER
- OAuth bearer token
kafka-console consume logs --group monitor-group --pretty
kafka-console consume events --from "$(date -d yesterday --iso-8601)"
kafka-console consume source-topic | kafka-console produce destination-topic
kafka-console consume all-events | jq 'select(.value.type == "ERROR")'
kafka-console consume my-topic --from 0 | wc -l
kafka-console consume large-topic --count 100 --pretty
kafka-console consume my-topic | jq '.headers'
Problem: Cannot connect to Kafka broker
Error: KafkaJSConnectionError: Connection timeout
Solution:
- Verify broker addresses are correct
- Check network connectivity:
telnet broker-host 9092
- Ensure security groups/firewalls allow connection
- For Docker: use host network or proper port mapping
Problem: Authentication failed
Error: KafkaJSProtocolError: SASL authentication failed
Solution:
- Verify credentials are correct
- Check SASL mechanism matches broker configuration
- Ensure SSL is enabled if required:
--ssl
Problem: Not receiving messages Solution:
- Check consumer group offset:
kafka-console topic:offsets my-topic --group my-group
- Reset to beginning:
--from 0
- Use a new consumer group name
Problem: JSON parsing errors
SyntaxError: Unexpected token...
Solution:
- Verify message format matches specified data-format
- Use
--data-format raw
for non-JSON messages - Check for malformed JSON with:
jq . < input.json
Problem: Slow message consumption Solution:
- Increase batch size in consumer configuration
- Use multiple consumer instances with same group
- Check network latency to brokers
Problem: SSL handshake failed Solution:
- Ensure
--ssl
flag is used - Verify broker SSL port (usually 9093)
- Check certificate validity
License The MIT License Copyright (c) 2024 Ivan Zakharchanka