Skip to content

Commit ee04d5c

Browse files
author
memory-overflow
committed
feat(weaver): 修改回掉和轮询的并发逻辑
1 parent f038c9e commit ee04d5c

File tree

1 file changed

+10
-7
lines changed

1 file changed

+10
-7
lines changed

task_scheduler.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,17 @@ func (s *TaskScheduler) Close() {
164164
s.cancel()
165165
}
166166

167-
func (s *TaskScheduler) checkProcessed(taskId string) bool {
167+
func (s *TaskScheduler) checkProcessed(t *Task) bool {
168168
if !s.enableProcessedCheck {
169169
return true
170170
}
171+
if t.TaskStatus == TASK_STATUS_RUNNING {
172+
return true
173+
}
171174
s.lock.Lock()
172175
defer s.lock.Unlock()
173176

174-
if _, ok := s.processedTask[taskId]; !ok {
177+
if _, ok := s.processedTask[t.TaskId]; !ok {
175178
if s.count >= s.bufflen {
176179
// 满了, 清除一个头部数据
177180
delete(s.processedTask, s.taskProcessedTime[s.head].taskId)
@@ -181,10 +184,10 @@ func (s *TaskScheduler) checkProcessed(taskId string) bool {
181184
s.head = 0
182185
}
183186
}
184-
s.processedTask[taskId] = true
187+
s.processedTask[t.TaskId] = true
185188
s.taskProcessedTime[s.tail] = processTime{
186189
t: time.Now(),
187-
taskId: taskId,
190+
taskId: t.TaskId,
188191
}
189192
s.tail++
190193
if s.tail >= s.bufflen {
@@ -345,7 +348,7 @@ func (s *TaskScheduler) updateTaskStatus() {
345348
func (s *TaskScheduler) updateCallbackTask() {
346349
for t := range s.config.CallbackReceiver.GetCallbackChannel(s.ctx) {
347350
// 可能是轮询已经处理过,或者重复回调
348-
if !s.checkProcessed(t.TaskId) {
351+
if !s.checkProcessed(&t) {
349352
continue
350353
}
351354
s.wg.Add(1)
@@ -404,7 +407,7 @@ func (s *TaskScheduler) updateOnce(ctx context.Context) {
404407
defer s.wg.Done()
405408
if st.TaskStatus == TASK_STATUS_FAILED {
406409
// 已经回调处理过
407-
if !s.checkProcessed(task.TaskId) {
410+
if !s.checkProcessed(&task) {
408411
return
409412
}
410413
// 失败可以重试
@@ -429,7 +432,7 @@ func (s *TaskScheduler) updateOnce(ctx context.Context) {
429432
}
430433
} else if st.TaskStatus == TASK_STATUS_SUCCESS {
431434
// 已经回调处理过
432-
if !s.checkProcessed(task.TaskId) {
435+
if !s.checkProcessed(&task) {
433436
return
434437
}
435438
s.export(ctx, &task)

0 commit comments

Comments
 (0)