Skip to content

Commit 712facd

Browse files
authored
fix: watching for renamed or removed files (#111)
1 parent 7e14e04 commit 712facd

File tree

2 files changed

+199
-59
lines changed

2 files changed

+199
-59
lines changed

pkg/command/runner.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,20 @@ var ErrNoCommandForPath = errors.New("no command for path")
2525
// - Filesystem notifications / watching.
2626
// - Concurrent command execution.
2727
type Runner struct {
28-
rule *rule.Rule
29-
watcher *fsnotify.Watcher
30-
cancelFunc context.CancelFunc
31-
path string
32-
listeners []chan<- Event
33-
mu sync.Mutex
28+
rule *rule.Rule
29+
watcher *fsnotify.Watcher
30+
watchedFiles map[string]struct{}
31+
cancelFunc context.CancelFunc
32+
path string
33+
listeners []chan<- Event
34+
mu sync.Mutex
3435
}
3536

3637
// NewRunner creates a new [Runner].
3738
func NewRunner(path string, opts ...RunnerOpt) (*Runner, error) {
3839
cr := &Runner{
39-
path: path,
40+
path: path,
41+
watchedFiles: make(map[string]struct{}),
4042
}
4143

4244
var err error
@@ -130,6 +132,15 @@ func WithRules(rs []*rule.Rule) RunnerOpt {
130132
}
131133
}
132134

135+
// isFileWatched returns true if the file matched the profile's source expression.
136+
func (cr *Runner) isFileWatched(filePath string) bool {
137+
if _, isWatched := cr.watchedFiles[filePath]; isWatched {
138+
return true
139+
}
140+
141+
return false
142+
}
143+
133144
func (cr *Runner) GetCurrentProfile() *profile.Profile {
134145
return cr.rule.GetProfile()
135146
}
@@ -244,12 +255,15 @@ func (cr *Runner) Watch() error {
244255
return fmt.Errorf("walk %q: %w", cr.path, err)
245256
}
246257

258+
cr.watchedFiles = make(map[string]struct{})
247259
if ok, matchedFiles := p.MatchFiles(cr.path, files); ok {
248260
for _, file := range matchedFiles {
249-
err := cr.watcher.Add(file)
261+
err := cr.watcher.Add(filepath.Dir(file))
250262
if err != nil {
251263
return fmt.Errorf("add path to watcher: %w", err)
252264
}
265+
266+
cr.watchedFiles[file] = struct{}{}
253267
}
254268
}
255269

@@ -278,6 +292,10 @@ func (cr *Runner) RunOnEvent() {
278292
return
279293
}
280294

295+
if !cr.isFileWatched(evt.Name) {
296+
continue
297+
}
298+
281299
// Ignore events that are not related to file content changes.
282300
if evt.Has(fsnotify.Chmod) {
283301
continue
@@ -361,7 +379,7 @@ func (cr *Runner) RunContext(ctx context.Context) Output {
361379

362380
// Cancel any currently running command.
363381
if cr.cancelFunc != nil {
364-
cr.broadcast(EventCancel{})
382+
// Note: The cancel event is broadcast by the canceled goroutine.
365383
cr.cancelFunc()
366384
}
367385

pkg/command/runner_test.go

Lines changed: 172 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -243,32 +243,167 @@ func TestCommandRunner_CancellationBehavior(t *testing.T) {
243243
})
244244
}
245245

246+
func TestCommandRunner_FileWatcher(t *testing.T) {
247+
t.Parallel()
248+
249+
tcs := map[string]struct {
250+
fileOperation func(*testing.T, string)
251+
wantEvents int
252+
}{
253+
"simple file modification": {
254+
fileOperation: func(t *testing.T, testFile string) {
255+
t.Helper()
256+
257+
require.NoError(t, os.WriteFile(testFile, []byte("test: modified"), 0o644))
258+
},
259+
wantEvents: 1,
260+
},
261+
"file removal and recreation": {
262+
fileOperation: func(t *testing.T, testFile string) {
263+
t.Helper()
264+
265+
require.NoError(t, os.Remove(testFile))
266+
time.Sleep(50 * time.Millisecond)
267+
require.NoError(t, os.WriteFile(testFile, []byte("test: \"1\""), 0o644))
268+
time.Sleep(50 * time.Millisecond)
269+
require.NoError(t, os.WriteFile(testFile, []byte("test: \"2\""), 0o644))
270+
},
271+
wantEvents: 3,
272+
},
273+
"file rename and recreation": {
274+
fileOperation: func(t *testing.T, testFile string) {
275+
t.Helper()
276+
277+
require.NoError(t, os.Rename(testFile, testFile+".bak"))
278+
time.Sleep(50 * time.Millisecond)
279+
require.NoError(t, os.Rename(testFile+".bak", testFile))
280+
time.Sleep(50 * time.Millisecond)
281+
require.NoError(t, os.WriteFile(testFile, []byte("test: \"1\""), 0o644))
282+
time.Sleep(50 * time.Millisecond)
283+
require.NoError(t, os.WriteFile(testFile, []byte("test: \"2\""), 0o644))
284+
},
285+
wantEvents: 3,
286+
},
287+
}
288+
289+
for name, tc := range tcs {
290+
t.Run(name, func(t *testing.T) {
291+
t.Parallel()
292+
293+
// Create a temporary directory for testing
294+
tempDir := t.TempDir()
295+
296+
// Create a test file to watch
297+
testFile := filepath.Join(tempDir, "test.yaml")
298+
require.NoError(t, os.WriteFile(testFile, []byte("test: initial"), 0o644))
299+
300+
p, err := profile.New("echo",
301+
profile.WithArgs("File event"),
302+
profile.WithSource(`files.filter(f, pathExt(f) == ".yaml")`),
303+
profile.WithReload(`fs.event.has(fs.WRITE, fs.CREATE, fs.REMOVE, fs.RENAME)`),
304+
)
305+
require.NoError(t, err)
306+
307+
// Create a runner with a fast command
308+
runner, err := command.NewRunner(filepath.Dir(testFile), command.WithProfile("echo", p))
309+
require.NoError(t, err)
310+
311+
// Start watching
312+
require.NoError(t, runner.Watch())
313+
defer runner.Close()
314+
315+
// Channel to collect command events
316+
results := make(chan command.Event, 10)
317+
runner.Subscribe(results)
318+
319+
// Start RunOnEvent in a goroutine
320+
go runner.RunOnEvent()
321+
322+
// Give it a moment to start watching
323+
time.Sleep(50 * time.Millisecond)
324+
325+
// Perform the file operation
326+
tc.fileOperation(t, testFile)
327+
328+
// Collect events for a reasonable duration
329+
events := collectEventsWithTimeout(results, tc.wantEvents, 5*time.Second)
330+
require.Len(t, events, tc.wantEvents, testFile)
331+
332+
// Verify we got alternating start/end events
333+
for i, event := range events {
334+
switch i % 2 {
335+
case 0: // Even indices should be start events
336+
assert.IsType(t, command.EventStart(command.TypeRun), event)
337+
case 1: // Odd indices should be end events
338+
assert.IsType(t, command.EventEnd{}, event)
339+
}
340+
}
341+
})
342+
}
343+
}
344+
246345
func TestCommandRunner_ConcurrentFileEvents(t *testing.T) {
247346
t.Parallel()
248347

348+
// Helper function for file write operations
349+
writeFileOp := func(t *testing.T, testFile string, i int) {
350+
t.Helper()
351+
352+
content := fmt.Sprintf("test: data-%d", i)
353+
require.NoError(t, os.WriteFile(testFile, []byte(content), 0o644))
354+
}
355+
356+
// Helper function for file remove and re-add operations
357+
removeAddFileOp := func(t *testing.T, testFile string, i int) {
358+
t.Helper()
359+
360+
require.NoError(t, os.Remove(testFile))
361+
362+
time.Sleep(5 * time.Millisecond) // Small delay between remove and add
363+
364+
content := fmt.Sprintf("test: recreated-%d", i)
365+
require.NoError(t, os.WriteFile(testFile, []byte(content), 0o644))
366+
}
367+
249368
tcs := map[string]struct {
250-
commandSleepTime string
251-
profileReload string
252-
numFileEvents int
253-
collectDuration time.Duration
369+
fileOp func(*testing.T, string, int)
370+
commandSleepTime string
371+
profileReload string
372+
numFileEvents int
373+
collectDuration time.Duration
374+
expectCancellation bool
254375
}{
255-
"rapid file events with slow command": {
256-
numFileEvents: 5,
257-
commandSleepTime: "0.2", // 200ms sleep
258-
collectDuration: 3 * time.Second,
259-
profileReload: `fs.event.has(fs.WRITE, fs.CREATE)`,
376+
"rapid file events with slow command should cause cancellation": {
377+
numFileEvents: 5,
378+
commandSleepTime: "0.3",
379+
collectDuration: 3 * time.Second,
380+
profileReload: `fs.event.has(fs.WRITE, fs.CREATE)`,
381+
fileOp: writeFileOp,
382+
expectCancellation: true,
260383
},
261-
"fewer file events with faster command": {
262-
numFileEvents: 3,
263-
commandSleepTime: "0.1", // 100ms sleep
264-
collectDuration: 2 * time.Second,
265-
profileReload: `pathExt(file) == ".yaml"`,
384+
"fewer file events with faster command should complete all": {
385+
numFileEvents: 2,
386+
commandSleepTime: "0.05",
387+
collectDuration: 2 * time.Second,
388+
profileReload: `pathExt(file) == ".yaml"`,
389+
fileOp: writeFileOp,
390+
expectCancellation: false,
266391
},
267392
"file events with path filtering": {
268-
numFileEvents: 4,
269-
commandSleepTime: "0.1",
270-
collectDuration: 2 * time.Second,
271-
profileReload: `pathBase(file) != "ignored.yaml"`,
393+
numFileEvents: 4,
394+
commandSleepTime: "0.1",
395+
collectDuration: 2 * time.Second,
396+
profileReload: `pathBase(file) != "ignored.yaml"`,
397+
fileOp: writeFileOp,
398+
expectCancellation: false,
399+
},
400+
"file remove and re-add events with cancellation": {
401+
numFileEvents: 4,
402+
commandSleepTime: "0.2",
403+
collectDuration: 3 * time.Second,
404+
profileReload: `fs.event.has(fs.WRITE, fs.CREATE, fs.REMOVE)`,
405+
fileOp: removeAddFileOp,
406+
expectCancellation: true,
272407
},
273408
}
274409

@@ -308,11 +443,10 @@ func TestCommandRunner_ConcurrentFileEvents(t *testing.T) {
308443
// Give it a moment to start watching
309444
time.Sleep(50 * time.Millisecond)
310445

311-
// Trigger multiple rapid file events by writing to the file quickly
446+
// Trigger multiple rapid file events using the specified operation
312447
for i := range tc.numFileEvents {
313-
content := fmt.Sprintf("test: data-%d", i)
314-
require.NoError(t, os.WriteFile(testFile, []byte(content), 0o644))
315-
time.Sleep(10 * time.Millisecond) // Small delay between writes
448+
tc.fileOp(t, testFile, i)
449+
time.Sleep(10 * time.Millisecond)
316450
}
317451

318452
// Collect all events for a specified duration
@@ -349,43 +483,31 @@ func TestCommandRunner_ConcurrentFileEvents(t *testing.T) {
349483
// Wait for collection to complete
350484
<-collectionDone
351485

352-
// 1. We should get at least one successful command completion
353-
assert.GreaterOrEqual(t, len(outputs), 1,
354-
"should get at least one command result")
486+
require.GreaterOrEqual(t, len(outputs), 1, "should get at least one completed command")
487+
require.GreaterOrEqual(t, startEvents, 1, "should get at least one start event")
355488

356-
// 2. We should see some start events
357-
assert.GreaterOrEqual(t, startEvents, 1,
358-
"should get at least one start event")
489+
assert.LessOrEqual(t, len(outputs), startEvents,
490+
"completed commands (%d) should not exceed started commands (%d)",
491+
len(outputs), startEvents)
359492

360-
// 3. If we have multiple outputs, we should see some cancellations
361-
if len(outputs) > 1 {
493+
if tc.expectCancellation {
494+
// With multiple rapid file events and slow commands, some should be canceled
362495
assert.GreaterOrEqual(t, cancelEvents, 1,
363-
"should see some cancellations when multiple commands run")
496+
"should see cancellations with rapid events and slow commands")
497+
assert.Greater(t, startEvents, len(outputs),
498+
"should have more starts (%d) than completions (%d) due to cancellations",
499+
startEvents, len(outputs))
364500
}
365501

366-
// 4. The final result should not be a cancellation error
367-
if len(outputs) > 0 {
368-
lastOutput := outputs[len(outputs)-1]
369-
if lastOutput.Error != nil {
370-
assert.NotContains(t, lastOutput.Error.Error(), "context canceled",
371-
"final command should not be canceled")
372-
}
502+
lastOutput := outputs[len(outputs)-1]
503+
if lastOutput.Error != nil {
504+
assert.NotContains(t, lastOutput.Error.Error(), "context canceled",
505+
"final completed command should not be canceled")
373506
}
374507

375-
// 5. We shouldn't have more completed commands than we have start events
376-
// (basic sanity check)
377-
assert.LessOrEqual(t, len(outputs), startEvents,
378-
"completed commands should not exceed started commands")
379-
380508
// Log the results for debugging
381-
t.Logf("Events: %d starts, %d ends, %d cancels from %d file events",
509+
t.Logf("Events: %d starts, %d ends, %d cancels from %d file operations",
382510
startEvents, len(outputs), cancelEvents, tc.numFileEvents)
383-
384-
// Additional logging to help understand platform differences
385-
if startEvents > tc.numFileEvents*2 {
386-
t.Logf("Note: File system generated %d start events for %d file writes (%.1fx multiplier)",
387-
startEvents, tc.numFileEvents, float64(startEvents)/float64(tc.numFileEvents))
388-
}
389511
})
390512
}
391513
}

0 commit comments

Comments
 (0)