diff --git a/kq/pusher.go b/kq/pusher.go index 4fe5607..a12727e 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -27,10 +27,13 @@ type ( func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { producer := &kafka.Writer{ - Addr: kafka.TCP(addrs...), - Topic: topic, + Addr: kafka.TCP(addrs...), + Topic: topic, + //todo move the follwoing to config kpusherConfig? Balancer: &kafka.LeastBytes{}, Compression: kafka.Snappy, + //if this is not set, the writer will not create a nonexistent topic + AllowAutoTopicCreation: true, } pusher := &Pusher{ producer: producer,