Skip to content

Commit 979fd5c

Browse files
committed
fix(reposerver): context-aware revision lock to prevent convoy deadlock
Make Lock() in reposerver/repository/lock.go context-aware to prevent convoy deadlocks under rapid commit bursts. Previously, sync.Cond.Wait() blocked indefinitely with no cancellation, causing goroutines for newer revisions to pile up behind the current revision. Replace sync.Cond with sync.Mutex + chan struct{} broadcast channel and use select to wait on both the broadcast and ctx.Done(), allowing callers to cancel waiting via context. Resolves #26866 Signed-off-by: Mario Nebl <hello@mario-nebl.de>
1 parent 8142920 commit 979fd5c

File tree

5 files changed

+129
-47
lines changed

5 files changed

+129
-47
lines changed

reposerver/repository/lock.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package repository
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"sync"
@@ -17,20 +18,22 @@ type repositoryLock struct {
1718
stateByKey map[string]*repositoryState
1819
}
1920

20-
// Lock acquires lock unless lock is already acquired with the same commit and allowConcurrent is set to true
21+
// Lock acquires lock unless lock is already acquired with the same commit and allowConcurrent is set to true.
22+
// The context allows callers to cancel waiting for the lock, preventing convoy deadlocks when
23+
// goroutines for newer revisions pile up behind the current revision.
2124
// The init callback receives `clean` parameter which indicates if repo state must be cleaned after running non-concurrent operation.
2225
// The first init always runs with `clean` set to true because we cannot be sure about initial repo state.
23-
func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool, init func(clean bool) (io.Closer, error)) (io.Closer, error) {
26+
func (r *repositoryLock) Lock(ctx context.Context, path string, revision string, allowConcurrent bool, init func(clean bool) (io.Closer, error)) (io.Closer, error) {
2427
r.lock.Lock()
2528
state, ok := r.stateByKey[path]
2629
if !ok {
27-
state = &repositoryState{cond: &sync.Cond{L: &sync.Mutex{}}}
30+
state = &repositoryState{broadcast: make(chan struct{})}
2831
r.stateByKey[path] = state
2932
}
3033
r.lock.Unlock()
3134

3235
closer := utilio.NewCloser(func() error {
33-
state.cond.L.Lock()
36+
state.mu.Lock()
3437
notify := false
3538
state.processCount--
3639
var err error
@@ -40,45 +43,54 @@ func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool
4043
err = state.initCloser.Close()
4144
}
4245

43-
state.cond.L.Unlock()
4446
if notify {
45-
state.cond.Broadcast()
47+
close(state.broadcast)
48+
state.broadcast = make(chan struct{})
4649
}
50+
state.mu.Unlock()
4751
if err != nil {
4852
return fmt.Errorf("init closer failed: %w", err)
4953
}
5054
return nil
5155
})
5256

5357
for {
54-
state.cond.L.Lock()
58+
state.mu.Lock()
5559
if state.revision == "" {
5660
// no in progress operation for that repo. Go ahead.
5761
initCloser, err := init(!state.allowConcurrent)
5862
if err != nil {
59-
state.cond.L.Unlock()
63+
state.mu.Unlock()
6064
return nil, fmt.Errorf("failed to initialize repository resources: %w", err)
6165
}
6266
state.initCloser = initCloser
6367
state.revision = revision
6468
state.processCount = 1
6569
state.allowConcurrent = allowConcurrent
66-
state.cond.L.Unlock()
70+
state.mu.Unlock()
6771
return closer, nil
6872
} else if state.revision == revision && state.allowConcurrent && allowConcurrent {
6973
// same revision already processing and concurrent processing allowed. Increment process count and go ahead.
7074
state.processCount++
71-
state.cond.L.Unlock()
75+
state.mu.Unlock()
7276
return closer, nil
7377
}
74-
state.cond.Wait()
78+
ch := state.broadcast
79+
state.mu.Unlock()
80+
7581
// wait when all in-flight processes of this revision complete and try again
76-
state.cond.L.Unlock()
82+
select {
83+
case <-ch:
84+
// broadcast received, retry
85+
case <-ctx.Done():
86+
return nil, ctx.Err()
87+
}
7788
}
7889
}
7990

8091
type repositoryState struct {
81-
cond *sync.Cond
92+
mu sync.Mutex
93+
broadcast chan struct{}
8294
revision string
8395
initCloser io.Closer
8496
processCount int
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package repository
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
13+
utilio "github.com/argoproj/argo-cd/v3/util/io"
14+
)
15+
16+
func TestLock_WaiterForDifferentRevision_CannotBeUnblocked(t *testing.T) {
17+
lock := NewRepositoryLock()
18+
init := func(_ bool) (io.Closer, error) {
19+
return utilio.NopCloser, nil
20+
}
21+
22+
// Acquire lock for revision "1"
23+
closer1, err := lock.Lock(context.Background(), "myRepo", "1", true, init)
24+
require.NoError(t, err)
25+
26+
// Try to acquire lock for revision "2" with a short timeout
27+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
28+
defer cancel()
29+
30+
_, err = lock.Lock(ctx, "myRepo", "2", true, init)
31+
assert.ErrorIs(t, err, context.DeadlineExceeded)
32+
33+
utilio.Close(closer1)
34+
}
35+
36+
func TestLock_ConvoyFormsUnderSequentialRevisions(t *testing.T) {
37+
lock := NewRepositoryLock()
38+
init := func(_ bool) (io.Closer, error) {
39+
return utilio.NopCloser, nil
40+
}
41+
42+
// Acquire lock for revision "A" — simulates a long-running operation
43+
closerA, err := lock.Lock(context.Background(), "myRepo", "A", true, init)
44+
require.NoError(t, err)
45+
46+
// Spawn 10 goroutines all waiting for revision "B" with short deadlines
47+
const n = 10
48+
var wg sync.WaitGroup
49+
errs := make([]error, n)
50+
51+
for i := range n {
52+
wg.Add(1)
53+
go func(idx int) {
54+
defer wg.Done()
55+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
56+
defer cancel()
57+
_, errs[idx] = lock.Lock(ctx, "myRepo", "B", true, init)
58+
}(i)
59+
}
60+
61+
wg.Wait()
62+
63+
// All goroutines should have exited via context cancellation
64+
for i, err := range errs {
65+
assert.ErrorIs(t, err, context.DeadlineExceeded, "goroutine %d should have been cancelled", i)
66+
}
67+
68+
utilio.Close(closerA)
69+
}

reposerver/repository/lock_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package repository
22

33
import (
4+
"context"
45
"errors"
56
"io"
67
"testing"
@@ -38,15 +39,15 @@ func TestLock_SameRevision(t *testing.T) {
3839
initializedTimes := 0
3940
init := numberOfInits(&initializedTimes)
4041
closer1, done := lockQuickly(func() (io.Closer, error) {
41-
return lock.Lock("myRepo", "1", true, init)
42+
return lock.Lock(context.Background(), "myRepo", "1", true, init)
4243
})
4344

4445
if !assert.True(t, done) {
4546
return
4647
}
4748

4849
closer2, done := lockQuickly(func() (io.Closer, error) {
49-
return lock.Lock("myRepo", "1", true, init)
50+
return lock.Lock(context.Background(), "myRepo", "1", true, init)
5051
})
5152

5253
if !assert.True(t, done) {
@@ -66,15 +67,15 @@ func TestLock_DifferentRevisions(t *testing.T) {
6667
init := numberOfInits(&initializedTimes)
6768

6869
closer1, done := lockQuickly(func() (io.Closer, error) {
69-
return lock.Lock("myRepo", "1", true, init)
70+
return lock.Lock(context.Background(), "myRepo", "1", true, init)
7071
})
7172

7273
if !assert.True(t, done) {
7374
return
7475
}
7576

7677
_, done = lockQuickly(func() (io.Closer, error) {
77-
return lock.Lock("myRepo", "2", true, init)
78+
return lock.Lock(context.Background(), "myRepo", "2", true, init)
7879
})
7980

8081
if !assert.False(t, done) {
@@ -84,7 +85,7 @@ func TestLock_DifferentRevisions(t *testing.T) {
8485
utilio.Close(closer1)
8586

8687
_, done = lockQuickly(func() (io.Closer, error) {
87-
return lock.Lock("myRepo", "2", true, init)
88+
return lock.Lock(context.Background(), "myRepo", "2", true, init)
8889
})
8990

9091
if !assert.True(t, done) {
@@ -98,15 +99,15 @@ func TestLock_NoConcurrentWithSameRevision(t *testing.T) {
9899
init := numberOfInits(&initializedTimes)
99100

100101
closer1, done := lockQuickly(func() (io.Closer, error) {
101-
return lock.Lock("myRepo", "1", false, init)
102+
return lock.Lock(context.Background(), "myRepo", "1", false, init)
102103
})
103104

104105
if !assert.True(t, done) {
105106
return
106107
}
107108

108109
_, done = lockQuickly(func() (io.Closer, error) {
109-
return lock.Lock("myRepo", "1", false, init)
110+
return lock.Lock(context.Background(), "myRepo", "1", false, init)
110111
})
111112

112113
if !assert.False(t, done) {
@@ -120,7 +121,7 @@ func TestLock_FailedInitialization(t *testing.T) {
120121
lock := NewRepositoryLock()
121122

122123
closer1, done := lockQuickly(func() (io.Closer, error) {
123-
return lock.Lock("myRepo", "1", true, func(_ bool) (io.Closer, error) {
124+
return lock.Lock(context.Background(), "myRepo", "1", true, func(_ bool) (io.Closer, error) {
124125
return utilio.NopCloser, errors.New("failed")
125126
})
126127
})
@@ -132,7 +133,7 @@ func TestLock_FailedInitialization(t *testing.T) {
132133
assert.Nil(t, closer1)
133134

134135
closer2, done := lockQuickly(func() (io.Closer, error) {
135-
return lock.Lock("myRepo", "1", true, func(_ bool) (io.Closer, error) {
136+
return lock.Lock(context.Background(), "myRepo", "1", true, func(_ bool) (io.Closer, error) {
136137
return utilio.NopCloser, nil
137138
})
138139
})
@@ -149,15 +150,15 @@ func TestLock_SameRevisionFirstNotConcurrent(t *testing.T) {
149150
initializedTimes := 0
150151
init := numberOfInits(&initializedTimes)
151152
closer1, done := lockQuickly(func() (io.Closer, error) {
152-
return lock.Lock("myRepo", "1", false, init)
153+
return lock.Lock(context.Background(), "myRepo", "1", false, init)
153154
})
154155

155156
if !assert.True(t, done) {
156157
return
157158
}
158159

159160
_, done = lockQuickly(func() (io.Closer, error) {
160-
return lock.Lock("myRepo", "1", true, init)
161+
return lock.Lock(context.Background(), "myRepo", "1", true, init)
161162
})
162163

163164
if !assert.False(t, done) {
@@ -177,7 +178,7 @@ func TestLock_CleanForNonConcurrent(t *testing.T) {
177178
return utilio.NopCloser, nil
178179
}
179180
closer, done := lockQuickly(func() (io.Closer, error) {
180-
return lock.Lock("myRepo", "1", true, init)
181+
return lock.Lock(context.Background(), "myRepo", "1", true, init)
181182
})
182183

183184
assert.True(t, done)
@@ -186,7 +187,7 @@ func TestLock_CleanForNonConcurrent(t *testing.T) {
186187
utilio.Close(closer)
187188

188189
closer, done = lockQuickly(func() (io.Closer, error) {
189-
return lock.Lock("myRepo", "1", true, init)
190+
return lock.Lock(context.Background(), "myRepo", "1", true, init)
190191
})
191192

192193
assert.True(t, done)

0 commit comments

Comments
 (0)