@@ -113,6 +113,18 @@ type Writer struct {
113113 // The default is to try at most 10 times.
114114 MaxAttempts int
115115
116+ // WriteBackoffMin optionally sets the smallest amount of time the writer waits before
117+ // it attempts to write a batch of messages
118+ //
119+ // Default: 100ms
120+ WriteBackoffMin time.Duration
121+
122+ // WriteBackoffMax optionally sets the maximum amount of time the writer waits before
123+ // it attempts to write a batch of messages
124+ //
125+ // Default: 1s
126+ WriteBackoffMax time.Duration
127+
116128 // Limit on how many messages will be buffered before being sent to a
117129 // partition.
118130 //
@@ -360,13 +372,15 @@ type WriterStats struct {
360372 BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
361373 BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
362374
363- MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
364- MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
365- BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
366- ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
367- WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
368- RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
369- Async bool `metric:"kafka.writer.async" type:"gauge"`
375+ MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
376+ WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
377+ WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"`
378+ MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
379+ BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
380+ ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
381+ WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
382+ RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
383+ Async bool `metric:"kafka.writer.async" type:"gauge"`
370384
371385 Topic string `tag:"topic"`
372386
@@ -759,6 +773,20 @@ func (w *Writer) maxAttempts() int {
759773 return 10
760774}
761775
776+ func (w * Writer ) writeBackoffMin () time.Duration {
777+ if w .WriteBackoffMin > 0 {
778+ return w .WriteBackoffMin
779+ }
780+ return 100 * time .Millisecond
781+ }
782+
783+ func (w * Writer ) writeBackoffMax () time.Duration {
784+ if w .WriteBackoffMax > 0 {
785+ return w .WriteBackoffMax
786+ }
787+ return 1 * time .Second
788+ }
789+
762790func (w * Writer ) batchSize () int {
763791 if w .BatchSize > 0 {
764792 return w .BatchSize
@@ -829,26 +857,28 @@ func (w *Writer) stats() *writerStats {
829857func (w * Writer ) Stats () WriterStats {
830858 stats := w .stats ()
831859 return WriterStats {
832- Dials : stats .dials .snapshot (),
833- Writes : stats .writes .snapshot (),
834- Messages : stats .messages .snapshot (),
835- Bytes : stats .bytes .snapshot (),
836- Errors : stats .errors .snapshot (),
837- DialTime : stats .dialTime .snapshotDuration (),
838- BatchTime : stats .batchTime .snapshotDuration (),
839- WriteTime : stats .writeTime .snapshotDuration (),
840- WaitTime : stats .waitTime .snapshotDuration (),
841- Retries : stats .retries .snapshot (),
842- BatchSize : stats .batchSize .snapshot (),
843- BatchBytes : stats .batchSizeBytes .snapshot (),
844- MaxAttempts : int64 (w .MaxAttempts ),
845- MaxBatchSize : int64 (w .BatchSize ),
846- BatchTimeout : w .BatchTimeout ,
847- ReadTimeout : w .ReadTimeout ,
848- WriteTimeout : w .WriteTimeout ,
849- RequiredAcks : int64 (w .RequiredAcks ),
850- Async : w .Async ,
851- Topic : w .Topic ,
860+ Dials : stats .dials .snapshot (),
861+ Writes : stats .writes .snapshot (),
862+ Messages : stats .messages .snapshot (),
863+ Bytes : stats .bytes .snapshot (),
864+ Errors : stats .errors .snapshot (),
865+ DialTime : stats .dialTime .snapshotDuration (),
866+ BatchTime : stats .batchTime .snapshotDuration (),
867+ WriteTime : stats .writeTime .snapshotDuration (),
868+ WaitTime : stats .waitTime .snapshotDuration (),
869+ Retries : stats .retries .snapshot (),
870+ BatchSize : stats .batchSize .snapshot (),
871+ BatchBytes : stats .batchSizeBytes .snapshot (),
872+ MaxAttempts : int64 (w .MaxAttempts ),
873+ WriteBackoffMin : w .WriteBackoffMin ,
874+ WriteBackoffMax : w .WriteBackoffMax ,
875+ MaxBatchSize : int64 (w .BatchSize ),
876+ BatchTimeout : w .BatchTimeout ,
877+ ReadTimeout : w .ReadTimeout ,
878+ WriteTimeout : w .WriteTimeout ,
879+ RequiredAcks : int64 (w .RequiredAcks ),
880+ Async : w .Async ,
881+ Topic : w .Topic ,
852882 }
853883}
854884
@@ -1066,7 +1096,7 @@ func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
10661096 // guarantees to abort, but may be better to avoid long wait times
10671097 // on close.
10681098 //
1069- delay := backoff (attempt , 100 * time . Millisecond , 1 * time . Second )
1099+ delay := backoff (attempt , ptw . w . writeBackoffMin (), ptw . w . writeBackoffMax () )
10701100 ptw .w .withLogger (func (log Logger ) {
10711101 log .Printf ("backing off %s writing %d messages to %s (partition: %d)" , delay , len (batch .msgs ), key .topic , key .partition )
10721102 })
0 commit comments