Skip to content

Commit fd9764d

Browse files
committed
feature: add stop loop watch function for with one or with batch event.
1 parent 3120428 commit fd9764d

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

src/storage/stream/loop/loop_watch.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc,
147147
ctx := context.Background()
148148

149149
select {
150+
case <-opts.StopNotifier:
151+
cancel()
152+
blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name)
153+
return
154+
150155
// wait for another retry
151156
case <-retrySignal:
152157
// wait for a well and then do the retry work.
@@ -266,6 +271,11 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
266271
// ticks, but no events received, loop next round to get events.
267272
continue
268273
}
274+
275+
case <-opts.StopNotifier:
276+
ticker.Stop()
277+
blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name)
278+
return
269279
}
270280

271281
// break the for loop to handle event for now.
@@ -336,6 +346,11 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context,
336346
blog.Warnf("%s job, received cancel loop watch %s signal, exit loop, exit loop", opts.Name,
337347
opts.WatchOpt.Collection)
338348
return
349+
350+
case <-opts.StopNotifier:
351+
blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name)
352+
return
353+
339354
default:
340355
}
341356

src/storage/stream/types/types.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,16 @@ type LoopOptions struct {
350350
WatchOpt *WatchOptions
351351
TokenHandler TokenHandler
352352
RetryOptions *RetryOptions
353+
354+
// StopNotifier is used when user need to stop loop events and release related resources.
355+
// It's a optional option. when it's not set(as is nil), then the loop will not exit forever.
356+
// Otherwise, user can use it to stop loop events.
357+
// When a user want to stop the loop, the only thing that a user need to do is to just
358+
// **close** this stop notifier channel.
359+
// Attention:
360+
// Close this notifier channel is the only way to stop loop correctly.
361+
// Do not send data to this channel.
362+
StopNotifier <-chan struct{}
353363
}
354364

355365
type LoopOneOptions struct {
@@ -393,6 +403,11 @@ func (lo *LoopOneOptions) Validate() error {
393403
}
394404
}
395405

406+
if lo.LoopOptions.StopNotifier == nil {
407+
// if not set, then set never stop loop as default
408+
lo.LoopOptions.StopNotifier = make(<-chan struct{})
409+
}
410+
396411
return nil
397412
}
398413

@@ -449,6 +464,11 @@ func (lo *LoopBatchOptions) Validate() error {
449464
}
450465
}
451466

467+
if lo.LoopOptions.StopNotifier == nil {
468+
// if not set, then set never stop loop as default
469+
lo.LoopOptions.StopNotifier = make(<-chan struct{})
470+
}
471+
452472
return nil
453473
}
454474

0 commit comments

Comments
 (0)