Skip to content

Commit d7039d1

Browse files
author
Matt Whelan
committed
Flexible retry backoff policies
Extends the existing retry limit mechanism to allow for delays between retries. Includes implementations for fixed delays and exponential backoff.
1 parent f0f0a08 commit d7039d1

File tree

6 files changed

+323
-71
lines changed

6 files changed

+323
-71
lines changed

crdb/common.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
package crdb
1616

17-
import "context"
17+
import (
18+
"context"
19+
"time"
20+
)
1821

1922
// Tx abstracts the operations needed by ExecuteInTx so that different
2023
// frameworks (e.g. go's sql package, pgx, gorm) can be used with ExecuteInTx.
@@ -60,8 +63,10 @@ func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error) {
6063
return err
6164
}
6265

63-
maxRetries := numRetriesFromContext(ctx)
64-
retryCount := 0
66+
// establish the retry policy
67+
retryPolicy := getRetryPolicy(ctx)
68+
// set up the retry policy state
69+
retryFunc := retryPolicy.NewRetry()
6570
for {
6671
releaseFailed := false
6772
err = fn()
@@ -86,9 +91,16 @@ func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error) {
8691
return newTxnRestartError(rollbackErr, err)
8792
}
8893

89-
retryCount++
90-
if maxRetries > 0 && retryCount > maxRetries {
91-
return newMaxRetriesExceededError(err, maxRetries)
94+
delay, retryErr := retryFunc(err)
95+
if retryErr != nil {
96+
return retryErr
97+
}
98+
if delay > 0 {
99+
select {
100+
case <-time.After(delay):
101+
case <-ctx.Done():
102+
return ctx.Err()
103+
}
92104
}
93105
}
94106
}

crdb/retry.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
15+
package crdb
16+
17+
import (
18+
"time"
19+
)
20+
21+
// RetryFunc owns the state for a transaction retry operation. Usually, this is
22+
// just the retry count. RetryFunc is not assumed to be safe for concurrent use.
23+
type RetryFunc func(err error) (time.Duration, error)
24+
25+
// RetryPolicy constructs a new instance of a RetryFunc for each transaction
26+
// it is used with. Instances of RetryPolicy can likely be immutable and
27+
// should be safe for concurrent calls to NewRetry.
28+
type RetryPolicy interface {
29+
NewRetry() RetryFunc
30+
}
31+
32+
type LimitBackoffRetryPolicy struct {
33+
RetryLimit int
34+
Delay time.Duration
35+
}
36+
37+
func (l *LimitBackoffRetryPolicy) NewRetry() RetryFunc {
38+
tryCount := 0
39+
return func(err error) (time.Duration, error) {
40+
tryCount++
41+
if tryCount > l.RetryLimit {
42+
return 0, newMaxRetriesExceededError(err, l.RetryLimit)
43+
}
44+
return l.Delay, nil
45+
}
46+
}
47+
48+
// ExpBackoffRetryPolicy implements RetryPolicy using an exponential backoff with optional
49+
// saturation.
50+
type ExpBackoffRetryPolicy struct {
51+
RetryLimit int
52+
BaseDelay time.Duration
53+
MaxDelay time.Duration
54+
}
55+
56+
// NewRetry implements RetryPolicy
57+
func (l *ExpBackoffRetryPolicy) NewRetry() RetryFunc {
58+
tryCount := 0
59+
return func(err error) (time.Duration, error) {
60+
tryCount++
61+
if tryCount > l.RetryLimit {
62+
return 0, newMaxRetriesExceededError(err, l.RetryLimit)
63+
}
64+
delay := l.BaseDelay << (tryCount - 1)
65+
if l.MaxDelay > 0 && delay > l.MaxDelay {
66+
return l.MaxDelay, nil
67+
}
68+
if delay < l.BaseDelay {
69+
// We've overflowed.
70+
if l.MaxDelay > 0 {
71+
return l.MaxDelay, nil
72+
}
73+
// There's no max delay. Giving up is probably better in
74+
// practice than using a 290-year MAX_INT delay.
75+
return 0, newMaxRetriesExceededError(err, tryCount)
76+
}
77+
return delay, nil
78+
}
79+
}
80+
81+
// Vargo converts a go-retry style Delay provider into a RetryPolicy
82+
func Vargo(fn func() VargoBackoff) RetryPolicy {
83+
return &vargoAdapter{
84+
DelegateFactory: fn,
85+
}
86+
}
87+
88+
type VargoBackoff interface {
89+
Next() (next time.Duration, stop bool)
90+
}
91+
92+
// vargoAdapter adapts backoff policies in the style of sethvargo/go-retry
93+
type vargoAdapter struct {
94+
DelegateFactory func() VargoBackoff
95+
}
96+
97+
func (b *vargoAdapter) NewRetry() RetryFunc {
98+
delegate := b.DelegateFactory()
99+
return func(err error) (time.Duration, error) {
100+
d, stop := delegate.Next()
101+
if stop {
102+
return 0, err
103+
}
104+
return d, nil
105+
}
106+
}

crdb/retry_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
15+
package crdb
16+
17+
import (
18+
"testing"
19+
"time"
20+
)
21+
22+
func assertDelays(t *testing.T, policy RetryPolicy, expectedDelays []time.Duration) {
23+
actualDelays := make([]time.Duration, 0, len(expectedDelays))
24+
rf := policy.NewRetry()
25+
for {
26+
delay, err := rf(nil)
27+
if err != nil {
28+
break
29+
}
30+
31+
actualDelays = append(actualDelays, delay)
32+
if len(actualDelays) > len(expectedDelays) {
33+
t.Fatalf("too many retries: expected %d", len(expectedDelays))
34+
}
35+
}
36+
if len(actualDelays) != len(expectedDelays) {
37+
t.Errorf("wrong number of retries: expected %d, got %d", len(expectedDelays), len(actualDelays))
38+
}
39+
for i, delay := range actualDelays {
40+
expected := expectedDelays[i]
41+
if delay != expected {
42+
t.Errorf("wrong delay at index %d: expected %d, got %d", i, expected, delay)
43+
}
44+
}
45+
}
46+
47+
func TestLimitBackoffRetryPolicy(t *testing.T) {
48+
policy := &LimitBackoffRetryPolicy{
49+
RetryLimit: 3,
50+
Delay: 1 * time.Second,
51+
}
52+
assertDelays(t, policy, []time.Duration{
53+
1 * time.Second,
54+
1 * time.Second,
55+
1 * time.Second,
56+
})
57+
}
58+
59+
func TestExpBackoffRetryPolicy(t *testing.T) {
60+
policy := &ExpBackoffRetryPolicy{
61+
RetryLimit: 5,
62+
BaseDelay: 1 * time.Second,
63+
MaxDelay: 5 * time.Second,
64+
}
65+
assertDelays(t, policy, []time.Duration{
66+
1 * time.Second,
67+
2 * time.Second,
68+
4 * time.Second,
69+
5 * time.Second,
70+
5 * time.Second,
71+
})
72+
}

0 commit comments

Comments
 (0)