|
6 | 6 | "fmt" |
7 | 7 | "net/url" |
8 | 8 | "sync" |
| 9 | + "sync/atomic" |
9 | 10 | "testing" |
10 | 11 |
|
11 | 12 | "github.com/gobuffalo/pop/v6" |
@@ -225,54 +226,44 @@ func (ts *IndexWorkerTestSuite) TestOutOfBandIndexRemoval() { |
225 | 226 | assert.True(ts.T(), found, "The removed index should have been recreated") |
226 | 227 | } |
227 | 228 |
|
228 | | -// Test concurrent access - only one worker should create indexes |
| 229 | +// Test concurrent access - workers coordinate via advisory lock |
229 | 230 | func (ts *IndexWorkerTestSuite) TestConcurrentWorkers() { |
230 | 231 | ctx := context.Background() |
231 | 232 |
|
232 | | - // Number of concurrent workers |
233 | | - numWorkers := 3 |
| 233 | + numWorkers := 5 |
234 | 234 | var wg sync.WaitGroup |
235 | 235 | wg.Add(numWorkers) |
236 | 236 |
|
237 | | - // Track which workers actually created indexes |
238 | | - results := make(chan error, numWorkers) |
| 237 | + var successCount, lockSkipCount, errorCount int32 |
239 | 238 |
|
240 | 239 | for i := 0; i < numWorkers; i++ { |
241 | | - go func(workerID int) { |
| 240 | + go func() { |
242 | 241 | defer wg.Done() |
243 | 242 |
|
244 | 243 | // Each worker needs its own logger to avoid race conditions |
245 | 244 | logger := logrus.NewEntry(logrus.New()) |
246 | 245 | logger.Logger.SetLevel(logrus.DebugLevel) |
247 | 246 |
|
248 | | - // CreateIndexes returns nil on success or ErrAdvisoryLockAlreadyAcquired |
249 | 247 | err := CreateIndexes(ctx, ts.config, logger) |
250 | | - results <- err |
251 | | - }(i) |
| 248 | + switch { |
| 249 | + case err == nil: |
| 250 | + atomic.AddInt32(&successCount, 1) |
| 251 | + case errors.Is(err, ErrAdvisoryLockAlreadyAcquired): |
| 252 | + atomic.AddInt32(&lockSkipCount, 1) |
| 253 | + default: |
| 254 | + atomic.AddInt32(&errorCount, 1) |
| 255 | + ts.T().Errorf("Unexpected error from CreateIndexes: %v", err) |
| 256 | + } |
| 257 | + }() |
252 | 258 | } |
253 | 259 |
|
254 | | - // Wait for all workers to complete |
255 | 260 | wg.Wait() |
256 | | - close(results) |
257 | | - |
258 | | - // Count how many workers acquired the lock |
259 | | - lockCount := 0 |
260 | | - lockSkipCount := 0 |
261 | | - for err := range results { |
262 | | - if err == nil { |
263 | | - lockCount++ |
264 | | - } else if errors.Is(err, ErrAdvisoryLockAlreadyAcquired) { |
265 | | - lockSkipCount++ |
266 | | - } else { |
267 | | - ts.T().Errorf("Unexpected error from CreateIndexes: %v", err) |
268 | | - } |
269 | | - } |
270 | 261 |
|
271 | | - // Only one worker should have acquired the lock and created indexes |
272 | | - assert.Equal(ts.T(), 1, lockCount, "Only one worker should acquire the lock and create indexes") |
273 | | - assert.Equal(ts.T(), numWorkers-1, lockSkipCount, "Other workers should skip due to lock") |
| 262 | + assert.GreaterOrEqual(ts.T(), successCount, int32(1), "At least one worker should succeed") |
| 263 | + assert.Equal(ts.T(), int32(0), errorCount, "No unexpected errors should occur") |
| 264 | + assert.Equal(ts.T(), int32(numWorkers), successCount+lockSkipCount, "All workers should either succeed or skip due to lock") |
274 | 265 |
|
275 | | - // Verify all indexes were created successfully |
| 266 | + // Verify indexes were created correctly regardless of which worker did it |
276 | 267 | indexes := getUsersIndexes(ts.namespace) |
277 | 268 | existingIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes)) |
278 | 269 | require.NoError(ts.T(), err) |
|
0 commit comments