diff --git a/consumer_integration_test.go b/consumer_integration_test.go index 1c6283a..458dede 100644 --- a/consumer_integration_test.go +++ b/consumer_integration_test.go @@ -128,13 +128,14 @@ func waitForMessage(consumer kafka.Consumer, timeout time.Duration) (*skafka.Mes func sendTestMessages(t *testing.T, cfg kafka.Config, from int, to int) { is := is.New(t) writer := skafka.Writer{ - Addr: skafka.TCP(cfg.Servers...), - Topic: cfg.Topic, - BatchSize: 1, - BatchTimeout: 10 * time.Millisecond, - WriteTimeout: cfg.DeliveryTimeout, - RequiredAcks: cfg.Acks, - MaxAttempts: 2, + Addr: skafka.TCP(cfg.Servers...), + Topic: cfg.Topic, + BatchSize: 1, + BatchTimeout: 10 * time.Millisecond, + WriteTimeout: cfg.DeliveryTimeout, + RequiredAcks: cfg.Acks, + MaxAttempts: 2, + AllowAutoTopicCreation: true, } defer writer.Close() diff --git a/go.mod b/go.mod index 823127a..7673c0c 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 github.com/matryer/is v1.4.0 - github.com/segmentio/kafka-go v0.4.30 + github.com/segmentio/kafka-go v0.4.31 ) require ( diff --git a/go.sum b/go.sum index 301877b..56dd804 100644 --- a/go.sum +++ b/go.sum @@ -121,8 +121,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= -github.com/segmentio/kafka-go v0.4.30 h1:jIHLImr9J3qycgwHR+cw1x9eLLLYNntpuYPBPjsOc3A= -github.com/segmentio/kafka-go v0.4.30/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU= +github.com/segmentio/kafka-go v0.4.31 h1:+ImsrkJRju9j1D9U44rvRGRlpsI9GnwD8s9WTFagNLQ= +github.com/segmentio/kafka-go v0.4.31/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/producer.go b/producer.go index 2df4e24..561b15f 100644 --- a/producer.go +++ b/producer.go @@ -56,12 +56,13 @@ func NewProducer(cfg Config) (Producer, error) { func (p *segmentProducer) newWriter(cfg Config) error { p.writer = &kafka.Writer{ - Addr: kafka.TCP(cfg.Servers...), - Topic: cfg.Topic, - BatchSize: 1, - WriteTimeout: cfg.DeliveryTimeout, - RequiredAcks: cfg.Acks, - MaxAttempts: 3, + Addr: kafka.TCP(cfg.Servers...), + Topic: cfg.Topic, + BatchSize: 1, + WriteTimeout: cfg.DeliveryTimeout, + RequiredAcks: cfg.Acks, + MaxAttempts: 3, + AllowAutoTopicCreation: true, } err := p.configureSecurity(cfg) if err != nil {