Skip to content

Commit 4edf6d1

Browse files
committed
Reset slice progress after a potentially successful write
1 parent 16112ef commit 4edf6d1

File tree

8 files changed

+54
-33
lines changed

8 files changed

+54
-33
lines changed

service/history/queuev2/queue_base.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ func (q *queueBase) insertSingleTask(task task.Task) bool {
278278
return q.virtualQueueManager.InsertSingleTaskToRootQueue(task)
279279
}
280280

281-
func (q *queueBase) removeScheduledTasksAfter(t time.Time) {
282-
q.virtualQueueManager.RemoveScheduledTasksAfter(t)
281+
func (q *queueBase) removeScheduledTasksAfter(key persistence.HistoryTaskKey) {
282+
q.virtualQueueManager.RemoveScheduledTasksAfter(key)
283283
}
284284

285285
func (q *queueBase) updateQueueState(ctx context.Context) {

service/history/queuev2/queue_scheduled.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT
175175
}
176176

177177
if !nextReadTime.IsZero() {
178-
q.base.removeScheduledTasksAfter(nextReadTime)
178+
q.base.removeScheduledTasksAfter(persistence.NewHistoryTaskKey(nextReadTime, 0))
179179
q.notify(nextReadTime)
180180
}
181181

service/history/queuev2/virtual_queue.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type (
7070
// InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice..
7171
InsertSingleTask(task task.Task) bool
7272
// RemoveScheduledTasksAfter removes the scheduled tasks after the given time
73-
RemoveScheduledTasksAfter(time.Time)
73+
RemoveScheduledTasksAfter(persistence.HistoryTaskKey)
7474
}
7575

7676
VirtualQueueOptions struct {
@@ -452,31 +452,22 @@ func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool {
452452
return true
453453
}
454454

455-
func (q *virtualQueueImpl) RemoveScheduledTasksAfter(t time.Time) {
455+
func (q *virtualQueueImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) {
456456
q.Lock()
457457
defer q.Unlock()
458458

459459
for e := q.virtualSlices.Front(); e != nil; e = e.Next() {
460-
s := e.Value.(VirtualSlice)
461-
r := s.GetState().Range
462-
463-
if t.Before(r.InclusiveMinTaskKey.GetScheduledTime()) {
464-
continue
465-
}
466-
467-
s.CancelTasks(func(task task.Task) bool {
468-
taskTime := task.GetTaskKey().GetScheduledTime()
469-
return taskTime.After(t) || taskTime.Equal(t)
470-
})
471-
472-
q.monitor.SetSlicePendingTaskCount(s, s.GetPendingTaskCount())
460+
slice := e.Value.(VirtualSlice)
461+
slice.CancelTasksAfter(key)
462+
q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount())
473463
}
474464
}
475465

476466
func (q *virtualQueueImpl) submitTask(now time.Time, task task.Task) error {
477467
if persistence.IsTaskCorrupted(task) {
478468
q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task))
479469
q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter)
470+
480471
task.Ack()
481472
return nil
482473
}

service/history/queuev2/virtual_queue_manager.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/uber/cadence/common/log"
3535
"github.com/uber/cadence/common/log/tag"
3636
"github.com/uber/cadence/common/metrics"
37+
"github.com/uber/cadence/common/persistence"
3738
"github.com/uber/cadence/common/quotas"
3839
"github.com/uber/cadence/service/history/task"
3940
)
@@ -58,7 +59,7 @@ type (
5859
AddNewVirtualSliceToRootQueue(VirtualSlice)
5960
// Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice.
6061
InsertSingleTaskToRootQueue(task.Task) bool
61-
RemoveScheduledTasksAfter(time.Time)
62+
RemoveScheduledTasksAfter(persistence.HistoryTaskKey)
6263
}
6364

6465
virtualQueueManagerImpl struct {
@@ -232,13 +233,14 @@ func (m *virtualQueueManagerImpl) InsertSingleTaskToRootQueue(t task.Task) bool
232233
return false
233234
}
234235

235-
func (m *virtualQueueManagerImpl) RemoveScheduledTasksAfter(t time.Time) {
236+
func (m *virtualQueueManagerImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) {
236237
m.Lock()
237238
defer m.Unlock()
238239
for _, vq := range m.virtualQueues {
239-
vq.RemoveScheduledTasksAfter(t)
240+
vq.RemoveScheduledTasksAfter(key)
240241
}
241242
}
243+
242244
func (m *virtualQueueManagerImpl) appendOrMergeSlice(vq VirtualQueue, s VirtualSlice) {
243245
now := m.timeSource.Now()
244246
newVirtualSliceState := s.GetState()

service/history/queuev2/virtual_queue_manager_mock.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/queuev2/virtual_queue_mock.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/queuev2/virtual_slice.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type (
4343
GetPendingTaskCount() int
4444
Clear()
4545
PendingTaskStats() PendingTaskStats
46-
CancelTasks(predicate func(task.Task) bool)
46+
CancelTasksAfter(key persistence.HistoryTaskKey)
4747

4848
TrySplitByTaskKey(persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool)
4949
TrySplitByPredicate(Predicate) (VirtualSlice, VirtualSlice, bool)
@@ -197,13 +197,40 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState {
197197
return s.state
198198
}
199199

200-
func (s *virtualSliceImpl) CancelTasks(predicate func(task.Task) bool) {
200+
func (s *virtualSliceImpl) CancelTasksAfter(key persistence.HistoryTaskKey) {
201201
taskMap := s.pendingTaskTracker.GetTasks()
202202
for _, task := range taskMap {
203-
if predicate(task) {
203+
if task.GetTaskKey().Compare(key) >= 0 {
204204
task.Cancel()
205205
}
206206
}
207+
208+
if len(s.progress) == 0 {
209+
s.progress = []*GetTaskProgress{
210+
{
211+
Range: Range{
212+
InclusiveMinTaskKey: key,
213+
ExclusiveMaxTaskKey: s.state.Range.ExclusiveMaxTaskKey,
214+
},
215+
NextPageToken: nil,
216+
NextTaskKey: key,
217+
},
218+
}
219+
return
220+
}
221+
222+
for i, progress := range s.progress {
223+
if progress.NextTaskKey.Compare(key) > 0 {
224+
s.progress[i] = &GetTaskProgress{
225+
Range: Range{
226+
InclusiveMinTaskKey: key,
227+
ExclusiveMaxTaskKey: progress.Range.ExclusiveMaxTaskKey,
228+
},
229+
NextPageToken: nil,
230+
NextTaskKey: key,
231+
}
232+
}
233+
}
207234
}
208235

209236
func (s *virtualSliceImpl) TrySplitByTaskKey(taskKey persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) {

service/history/queuev2/virtual_slice_mock.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)