Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions reposerver/repository/lock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package repository

import (
"context"
"fmt"
"io"
"sync"
Expand All @@ -17,20 +18,25 @@ type repositoryLock struct {
stateByKey map[string]*repositoryState
}

// Lock acquires lock unless lock is already acquired with the same commit and allowConcurrent is set to true
// Lock acquires lock unless lock is already acquired with the same commit and allowConcurrent is set to true.
// The context allows callers to cancel waiting for the lock, preventing convoy deadlocks when
// goroutines for newer revisions pile up behind the current revision.
// The init callback receives `clean` parameter which indicates if repo state must be cleaned after running non-concurrent operation.
// The first init always runs with `clean` set to true because we cannot be sure about initial repo state.
func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool, init func(clean bool) (io.Closer, error)) (io.Closer, error) {
func (r *repositoryLock) Lock(ctx context.Context, path string, revision string, allowConcurrent bool, init func(clean bool) (io.Closer, error)) (io.Closer, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
r.lock.Lock()
state, ok := r.stateByKey[path]
if !ok {
state = &repositoryState{cond: &sync.Cond{L: &sync.Mutex{}}}
state = &repositoryState{broadcast: make(chan struct{})}
r.stateByKey[path] = state
}
r.lock.Unlock()

closer := utilio.NewCloser(func() error {
state.cond.L.Lock()
state.mu.Lock()
notify := false
state.processCount--
var err error
Expand All @@ -40,45 +46,54 @@ func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool
err = state.initCloser.Close()
}

state.cond.L.Unlock()
if notify {
state.cond.Broadcast()
close(state.broadcast)
state.broadcast = make(chan struct{})
}
state.mu.Unlock()
Comment on lines 49 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing the code to the following to avoid the following race condition? After unlocking state.mu, another goroutine could acquire the lock again and read the old broadcast channel reference just as it's being closed. This timing window is small but exists.

if notify {
    close(state.broadcast)
    state.mu.Unlock()
    // Create new channel while others are processing
    state.mu.Lock()
    state.broadcast = make(chan struct{})
    state.mu.Unlock()
} else {
    state.mu.Unlock()
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppapapetrou76

After unlocking state.mu, another goroutine could acquire the lock again and read the old broadcast channel reference just as it's being closed

in the PR code all reads/updates of the state.broadcast field
are between Lock and Unlock calls, so how can another goroutine read an obsolete reference?

if err != nil {
return fmt.Errorf("init closer failed: %w", err)
}
return nil
})

for {
state.cond.L.Lock()
state.mu.Lock()
if state.revision == "" {
// no in progress operation for that repo. Go ahead.
initCloser, err := init(!state.allowConcurrent)
if err != nil {
state.cond.L.Unlock()
state.mu.Unlock()
return nil, fmt.Errorf("failed to initialize repository resources: %w", err)
}
state.initCloser = initCloser
state.revision = revision
state.processCount = 1
state.allowConcurrent = allowConcurrent
state.cond.L.Unlock()
state.mu.Unlock()
return closer, nil
} else if state.revision == revision && state.allowConcurrent && allowConcurrent {
// same revision already processing and concurrent processing allowed. Increment process count and go ahead.
state.processCount++
state.cond.L.Unlock()
state.mu.Unlock()
return closer, nil
}
state.cond.Wait()
ch := state.broadcast
state.mu.Unlock()

// wait when all in-flight processes of this revision complete and try again
state.cond.L.Unlock()
select {
case <-ch:
// broadcast received, retry
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

type repositoryState struct {
cond *sync.Cond
mu sync.Mutex
broadcast chan struct{}
revision string
initCloser io.Closer
processCount int
Expand Down
84 changes: 71 additions & 13 deletions reposerver/repository/lock_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package repository

import (
"context"
"errors"
"io"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

utilio "github.com/argoproj/argo-cd/v3/util/io"
)
Expand Down Expand Up @@ -38,15 +41,15 @@ func TestLock_SameRevision(t *testing.T) {
initializedTimes := 0
init := numberOfInits(&initializedTimes)
closer1, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, init)
return lock.Lock(context.Background(), "myRepo", "1", true, init)
})

if !assert.True(t, done) {
return
}

closer2, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, init)
return lock.Lock(context.Background(), "myRepo", "1", true, init)
})

if !assert.True(t, done) {
Expand All @@ -66,15 +69,15 @@ func TestLock_DifferentRevisions(t *testing.T) {
init := numberOfInits(&initializedTimes)

closer1, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, init)
return lock.Lock(context.Background(), "myRepo", "1", true, init)
})

if !assert.True(t, done) {
return
}

_, done = lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "2", true, init)
return lock.Lock(context.Background(), "myRepo", "2", true, init)
})

if !assert.False(t, done) {
Expand All @@ -84,7 +87,7 @@ func TestLock_DifferentRevisions(t *testing.T) {
utilio.Close(closer1)

_, done = lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "2", true, init)
return lock.Lock(context.Background(), "myRepo", "2", true, init)
})

if !assert.True(t, done) {
Expand All @@ -98,15 +101,15 @@ func TestLock_NoConcurrentWithSameRevision(t *testing.T) {
init := numberOfInits(&initializedTimes)

closer1, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", false, init)
return lock.Lock(context.Background(), "myRepo", "1", false, init)
})

if !assert.True(t, done) {
return
}

_, done = lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", false, init)
return lock.Lock(context.Background(), "myRepo", "1", false, init)
})

if !assert.False(t, done) {
Expand All @@ -120,7 +123,7 @@ func TestLock_FailedInitialization(t *testing.T) {
lock := NewRepositoryLock()

closer1, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, func(_ bool) (io.Closer, error) {
return lock.Lock(context.Background(), "myRepo", "1", true, func(_ bool) (io.Closer, error) {
return utilio.NopCloser, errors.New("failed")
})
})
Expand All @@ -132,7 +135,7 @@ func TestLock_FailedInitialization(t *testing.T) {
assert.Nil(t, closer1)

closer2, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, func(_ bool) (io.Closer, error) {
return lock.Lock(context.Background(), "myRepo", "1", true, func(_ bool) (io.Closer, error) {
return utilio.NopCloser, nil
})
})
Expand All @@ -144,20 +147,75 @@ func TestLock_FailedInitialization(t *testing.T) {
utilio.Close(closer2)
}

func TestLock_WaiterForDifferentRevision_CannotBeUnblocked(t *testing.T) {
lock := NewRepositoryLock()
init := func(_ bool) (io.Closer, error) {
return utilio.NopCloser, nil
}

// Acquire lock for revision "1"
closer1, err := lock.Lock(context.Background(), "myRepo", "1", true, init)
require.NoError(t, err)

// Try to acquire lock for revision "2" with a short timeout
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

_, err = lock.Lock(ctx, "myRepo", "2", true, init)
require.ErrorIs(t, err, context.DeadlineExceeded)

utilio.Close(closer1)
}

func TestLock_ConvoyFormsUnderSequentialRevisions(t *testing.T) {
lock := NewRepositoryLock()
init := func(_ bool) (io.Closer, error) {
return utilio.NopCloser, nil
}

// Acquire lock for revision "A" — simulates a long-running operation
closerA, err := lock.Lock(context.Background(), "myRepo", "A", true, init)
require.NoError(t, err)

// Spawn 100 goroutines all waiting for revision "B" with short deadlines
const n = 100
var wg sync.WaitGroup
errs := make([]error, n)

for i := range n {
wg.Add(1)
go func(idx int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, errs[idx] = lock.Lock(ctx, "myRepo", "B", true, init)
}(i)
}

wg.Wait()

// All goroutines should have exited via context cancellation
for i, err := range errs {
require.ErrorIs(t, err, context.DeadlineExceeded, "goroutine %d should have been cancelled", i)
}

utilio.Close(closerA)
}

func TestLock_SameRevisionFirstNotConcurrent(t *testing.T) {
lock := NewRepositoryLock()
initializedTimes := 0
init := numberOfInits(&initializedTimes)
closer1, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", false, init)
return lock.Lock(context.Background(), "myRepo", "1", false, init)
})

if !assert.True(t, done) {
return
}

_, done = lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, init)
return lock.Lock(context.Background(), "myRepo", "1", true, init)
})

if !assert.False(t, done) {
Expand All @@ -177,7 +235,7 @@ func TestLock_CleanForNonConcurrent(t *testing.T) {
return utilio.NopCloser, nil
}
closer, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, init)
return lock.Lock(context.Background(), "myRepo", "1", true, init)
})

assert.True(t, done)
Expand All @@ -186,7 +244,7 @@ func TestLock_CleanForNonConcurrent(t *testing.T) {
utilio.Close(closer)

closer, done = lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, init)
return lock.Lock(context.Background(), "myRepo", "1", true, init)
})

assert.True(t, done)
Expand Down
Loading
Loading