Skip to content
This repository was archived by the owner on Nov 3, 2024. It is now read-only.

Commit 9e0dfbc

Browse files
committed
feat: optimise kafka writing
1 parent 6d38c1a commit 9e0dfbc

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

internal/kafka/writer.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"go.uber.org/zap"
1010
"google.golang.org/protobuf/proto"
1111
"message-handler/internal/config"
12+
"time"
1213
)
1314

1415
const writeTopic = "message-handler"
@@ -24,12 +25,13 @@ type kafkaNotifier struct {
2425

2526
func NewKafkaNotifier(cfg *config.KafkaConfig, logger *zap.SugaredLogger) Notifier {
2627
w := &kafka.Writer{
27-
Addr: kafka.TCP(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)),
28-
Topic: writeTopic,
29-
Balancer: &kafka.LeastBytes{},
30-
Async: true,
31-
BatchSize: 1,
32-
ErrorLogger: kafka.LoggerFunc(logger.Errorw),
28+
Addr: kafka.TCP(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)),
29+
Topic: writeTopic,
30+
Balancer: &kafka.LeastBytes{},
31+
Async: true,
32+
BatchSize: 100,
33+
BatchTimeout: 100 * time.Millisecond,
34+
ErrorLogger: kafka.LoggerFunc(logger.Errorw),
3335
}
3436

3537
return &kafkaNotifier{w: w}

0 commit comments

Comments
 (0)