Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/matryer/is v1.4.0
github.com/segmentio/kafka-go v0.4.30
github.com/segmentio/kafka-go v0.4.31
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc=
github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
github.com/segmentio/kafka-go v0.4.30 h1:jIHLImr9J3qycgwHR+cw1x9eLLLYNntpuYPBPjsOc3A=
github.com/segmentio/kafka-go v0.4.30/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU=
github.com/segmentio/kafka-go v0.4.31 h1:+ImsrkJRju9j1D9U44rvRGRlpsI9GnwD8s9WTFagNLQ=
github.com/segmentio/kafka-go v0.4.31/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
13 changes: 7 additions & 6 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ func NewProducer(cfg Config) (Producer, error) {

func (p *segmentProducer) newWriter(cfg Config) error {
p.writer = &kafka.Writer{
Addr: kafka.TCP(cfg.Servers...),
Topic: cfg.Topic,
BatchSize: 1,
WriteTimeout: cfg.DeliveryTimeout,
RequiredAcks: cfg.Acks,
MaxAttempts: 3,
Addr: kafka.TCP(cfg.Servers...),
Topic: cfg.Topic,
BatchSize: 1,
WriteTimeout: cfg.DeliveryTimeout,
RequiredAcks: cfg.Acks,
MaxAttempts: 3,
AllowAutoTopicCreation: true,
}
err := p.configureSecurity(cfg)
if err != nil {
Expand Down