Skip to content

Commit 13fc65e

Browse files
Merge pull request #2 from memory-overflow/dev/demo-video-cut
Dev/demo video cut
2 parents 7a438e3 + 83afb7e commit 13fc65e

File tree

16 files changed

+739
-323
lines changed

16 files changed

+739
-323
lines changed

container/memory_container/queue_container.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (q *queueContainer) ToFailedStatus(ctx context.Context, task *lighttasksche
137137
}
138138
log.Printf("failed task %s, reason %v", task.TaskId, reason)
139139
task.TaskStatus = lighttaskscheduler.TASK_STATUS_FAILED
140-
return
140+
return task, nil
141141
}
142142

143143
// ToExportStatus 转移到数据导出状态

example/actuator/example_actuator.go

Lines changed: 0 additions & 119 deletions
This file was deleted.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package addexample
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"math/rand"
9+
"sync"
10+
"time"
11+
12+
framework "github.com/memory-overflow/light-task-scheduler"
13+
)
14+
15+
// AddActuator 加法执行器,同步任务异步化的示例
16+
type AddActuator struct {
17+
runningTask sync.Map // taskId -> framework.AsyncTaskStatus 映射
18+
resultMap sync.Map // taskId -> int32 映射
19+
}
20+
21+
// Init 任务在被调度前的初始化工作
22+
func (a *AddActuator) Init(ctx context.Context, task *framework.Task) (
23+
newTask *framework.Task, err error) {
24+
// 无初始化工作
25+
return task, nil
26+
}
27+
28+
func (add *AddActuator) work(taskId string, a, b int32) {
29+
time.Sleep(time.Duration(rand.Intn(4000))*time.Millisecond + 2*time.Second) // 25% 概率超时
30+
if _, ok := add.runningTask.Load(taskId); !ok {
31+
// 任务可能因为超时被删除,不处理
32+
return
33+
}
34+
newStatus := framework.AsyncTaskStatus{
35+
TaskStatus: framework.TASK_STATUS_SUCCESS,
36+
Progress: 100.0,
37+
}
38+
add.resultMap.Store(taskId, a+b)
39+
add.runningTask.Store(taskId, newStatus)
40+
}
41+
42+
// Start 执行任务
43+
func (a *AddActuator) Start(ctx context.Context, ftask *framework.Task) (
44+
newTask *framework.Task, ignoreErr bool, err error) {
45+
if _, ok := a.runningTask.Load(ftask.TaskId); ok {
46+
// 任务已经在执行中,不能重复执行
47+
return ftask, false, fmt.Errorf("task %s is running", ftask.TaskId)
48+
}
49+
task, ok := ftask.TaskItem.(AddTask)
50+
if !ok {
51+
return ftask, false, fmt.Errorf("TaskItem not be set to AddTask")
52+
}
53+
ftask.TaskStatus = framework.TASK_STATUS_RUNNING
54+
ftask.TaskStartTime = time.Now()
55+
a.runningTask.Store(ftask.TaskId, framework.AsyncTaskStatus{
56+
TaskStatus: framework.TASK_STATUS_RUNNING,
57+
Progress: 0.0,
58+
})
59+
go a.work(ftask.TaskId, task.A, task.B)
60+
log.Printf("start success, taskid: %s\n", ftask.TaskId)
61+
return ftask, false, nil
62+
}
63+
64+
// ExportOutput 导出任务输出,自行处理任务结果
65+
func (a *AddActuator) ExportOutput(ctx context.Context, ftask *framework.Task) error {
66+
task, ok := ftask.TaskItem.(AddTask)
67+
if !ok {
68+
return fmt.Errorf("TaskItem not be set to AddTask")
69+
}
70+
res, ok := a.resultMap.Load(ftask.TaskId)
71+
if !ok {
72+
return fmt.Errorf("not found result for task %s", ftask.TaskId)
73+
}
74+
log.Printf("finished task %s: %d + %d = %d \n", ftask.TaskId, task.A, task.B, res)
75+
a.resultMap.Delete(ftask.TaskId) // delete result after export
76+
return nil
77+
}
78+
79+
// Stop 停止任务
80+
func (a *AddActuator) Stop(ctx context.Context, ftask *framework.Task) error {
81+
// 同步任务无法真正暂停,只能删除状态
82+
a.runningTask.Delete(ftask.TaskId)
83+
a.resultMap.Delete(ftask.TaskId)
84+
return nil
85+
}
86+
87+
// GetAsyncTaskStatus 批量获取任务状态
88+
func (a *AddActuator) GetAsyncTaskStatus(ctx context.Context, ftasks []framework.Task) (
89+
status []framework.AsyncTaskStatus, err error) {
90+
for _, ftask := range ftasks {
91+
fstatus, ok := a.runningTask.Load(ftask.TaskId)
92+
if !ok {
93+
status = append(status, framework.AsyncTaskStatus{
94+
TaskStatus: framework.TASK_STATUS_FAILED,
95+
FailedReason: errors.New("同步任务未找到"),
96+
Progress: float32(0.0),
97+
})
98+
} else {
99+
if fstatus.(framework.AsyncTaskStatus).TaskStatus != framework.TASK_STATUS_RUNNING {
100+
a.runningTask.Delete(ftask.TaskId) // delete task status after query if task finished
101+
}
102+
status = append(status, fstatus.(framework.AsyncTaskStatus))
103+
}
104+
}
105+
return status, nil
106+
}
107+
108+
// GetOutput ...
109+
func (e *AddActuator) GetOutput(ctx context.Context, task *framework.Task) (
110+
data interface{}, err error) {
111+
return nil, nil
112+
}
113+
114+
// GetOutput ...
115+
func (e *AddActuator) Delete(ctx context.Context, task *framework.Task) (err error) {
116+
return nil
117+
}

example/add_example/add/add_task.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package addexample
2+
3+
import "time"
4+
5+
// AddTask add 任务结构
6+
type AddTask struct {
7+
StartTime time.Time
8+
A, B int32
9+
}

example/main.go renamed to example/add_example/main.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,36 @@ import (
1010

1111
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
1212
memeorycontainer "github.com/memory-overflow/light-task-scheduler/container/memory_container"
13-
"github.com/memory-overflow/light-task-scheduler/example/actuator"
14-
service "github.com/memory-overflow/light-task-scheduler/example/add_service"
15-
"github.com/memory-overflow/light-task-scheduler/example/task"
13+
14+
addexample "github.com/memory-overflow/light-task-scheduler/example/add_example/add"
1615
)
1716

1817
func main() {
19-
go service.StartServer()
2018
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
21-
actuator := actuator.MakeExampleActuator()
19+
actuator := &addexample.AddActuator{}
2220
sch := lighttaskscheduler.MakeNewScheduler(
2321
context.Background(),
2422
container, actuator,
2523
lighttaskscheduler.Config{
2624
TaskLimit: 5,
27-
ScanInterval: 50 * time.Millisecond})
25+
ScanInterval: 50 * time.Millisecond,
26+
TaskTimeout: 5 * time.Second, // 5s 超时
27+
},
28+
)
2829

2930
var c chan os.Signal
3031
r := rand.New(rand.NewSource(time.Now().UnixNano()))
31-
for i := 0; i < 1000; i++ {
32+
for i := 0; i < 10; i++ {
3233
select {
3334
case <-c:
3435
return
3536
default:
3637
sch.AddTask(context.Background(),
3738
lighttaskscheduler.Task{
3839
TaskId: strconv.Itoa(i),
39-
TaskItem: task.ExampleTask{
40-
TaskId: uint32(i),
41-
A: r.Int31() % 1000,
42-
B: r.Int31() % 1000,
40+
TaskItem: addexample.AddTask{
41+
A: r.Int31() % 1000,
42+
B: r.Int31() % 1000,
4343
},
4444
})
4545
}

0 commit comments

Comments
 (0)