Skip to content

Commit 23367fe

Browse files
author
willzhen
committed
Update readme
1 parent 9462a2b commit 23367fe

File tree

6 files changed

+113
-66
lines changed

6 files changed

+113
-66
lines changed

example/videocut_example/main.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,35 +13,41 @@ import (
1313
)
1414

1515
func main() {
16+
inputVideo := os.Args[1]
1617
go videocut.StartServer() // start video cut microservice
1718

19+
// 构建队列容器,队列长度 10000
1820
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
21+
// 构建裁剪任务执行器
1922
actuator := videocut.MakeVideoCutActuator()
2023
sch := lighttaskscheduler.MakeNewScheduler(
2124
context.Background(),
2225
container, actuator,
2326
lighttaskscheduler.Config{
24-
TaskLimit: 2,
27+
TaskLimit: 2, // 2 并发
2528
ScanInterval: 50 * time.Millisecond,
26-
TaskTimeout: 20 * time.Second, // 20s 超时
29+
TaskTimeout: 20 * time.Second, // 20s 超时时间
2730
},
2831
)
2932

33+
// 添加任务,把视频裁前 100s 剪成 10s 一个的视频
3034
var c chan os.Signal
31-
for i := 100; i < 200; i += 10 {
35+
for i := 0; i < 100; i += 10 {
3236
select {
3337
case <-c:
3438
return
3539
default:
36-
sch.AddTask(context.Background(),
40+
if err := sch.AddTask(context.Background(),
3741
lighttaskscheduler.Task{
38-
TaskId: strconv.Itoa(i),
42+
TaskId: strconv.Itoa(i), // 这里的任务 ID 是为了调度框架方便标识唯一任务的ID, 和微服务的任务ID不同,是映射关系
3943
TaskItem: videocut.VideoCutTask{
40-
InputVideo: "/data/workspace/ai-media/media-ai-ppl/video/benpaobaxiongdi_S2EP12_20150703.mp4",
41-
CutStartTime: 10,
42-
CutEndTime: float32(i),
44+
InputVideo: inputVideo,
45+
CutStartTime: float32(i),
46+
CutEndTime: float32(i + 10),
4347
},
44-
})
48+
}); err != nil {
49+
log.Printf("add task TaskId %s failed: %v\n", strconv.Itoa(i), err)
50+
}
4551
}
4652
}
4753

example/videocut_example/video_cut/video_cut_actuator.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
framework "github.com/memory-overflow/light-task-scheduler"
1414
)
1515

16-
// VideoCutActuator 示例执行器
16+
// VideoCutActuator 视频裁剪执行器
1717
type VideoCutActuator struct {
1818
EndPoint string
1919
}
@@ -39,11 +39,10 @@ func (v *VideoCutActuator) Init(ctx context.Context, ftask *framework.Task) (
3939
if task.CutStartTime > task.CutEndTime {
4040
return ftask, fmt.Errorf("error: CutStartTime after CutEndTime")
4141
}
42-
log.Println("hhhhh")
4342
return ftask, nil
4443
}
4544

46-
// Run 执行任务
45+
// Start 执行任务
4746
func (e *VideoCutActuator) Start(ctx context.Context, ftask *framework.Task) (
4847
newTask *framework.Task, ignoreErr bool, err error) {
4948
task, ok := ftask.TaskItem.(VideoCutTask)

example/videocut_example/video_cut/video_cut_service.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func videoCut(w http.ResponseWriter, r *http.Request) {
107107
// 添加
108108
ctx, cancel := context.WithCancel(context.Background())
109109
status := taskStatus{
110-
cancel: cancel, // 支持 stop cancel 掉任务
110+
cancel: cancel, // 支持 stop cancel 任务
111111
status: TASK_STATUS_RUNNING,
112112
}
113113
runningTaskMap.Store(taskId, status)
@@ -157,6 +157,11 @@ func status(w http.ResponseWriter, r *http.Request) {
157157
w.Write(bs)
158158
}
159159

160+
// GetOutputVideoRequest ...
161+
type GetOutputVideoRequest struct {
162+
TaskId string
163+
}
164+
160165
// GetOutputVideoResponse ...
161166
type GetOutputVideoResponse struct {
162167
TaskId string
@@ -166,7 +171,7 @@ type GetOutputVideoResponse struct {
166171

167172
func getOutputVideo(w http.ResponseWriter, r *http.Request) {
168173
input, _ := ioutil.ReadAll(r.Body)
169-
var req StatusRequest
174+
var req GetOutputVideoRequest
170175
json.Unmarshal(input, &req)
171176
out, ok := outMap.Load(req.TaskId)
172177
var rsp GetOutputVideoResponse

readme.md

Lines changed: 82 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
- [轻量级任务调度框架](#轻量级任务调度框架)
2-
- [框架的设计思想和背景](#框架的设计思想和背景)
3-
- [任务系统的整体设计](#任务系统的整体设计)
4-
- [任务调度框架](#任务调度框架)
5-
- [任务容器分类](#任务容器分类)
6-
- [1. 同步任务和异步任务](#1-同步任务和异步任务)
7-
- [2. 可持久任务和不可持久化任务](#2-可持久任务和不可持久化任务)
8-
- [Usage](#usage)
9-
- [使用内存容器实现 a+b 任务调度](#使用内存容器实现-ab-任务调度)
10-
- [实现 a+b 任务执行器](#实现-ab-任务执行器)
11-
- [实现 a+b 任务容器](#实现-ab-任务容器)
12-
- [实现调度](#实现调度)
2+
- [框架的设计思想和背景](#框架的设计思想和背景)
3+
- [任务系统的整体设计](#任务系统的整体设计)
4+
- [任务调度框架](#任务调度框架)
5+
- [任务容器分类](#任务容器分类)
6+
- [1. 同步任务和异步任务](#1-同步任务和异步任务)
7+
- [2. 可持久任务和不可持久化任务](#2-可持久任务和不可持久化任务)
8+
- [Usage](#usage)
9+
- [使用内存容器实现视频裁剪异步任务调度](#使用内存容器实现视频裁剪异步任务调度)
10+
- [定义视频裁剪任务](#定义视频裁剪任务)
11+
- [实现视频裁剪任务执行器](#实现视频裁剪任务执行器)
12+
- [实现视频裁剪任务容器](#实现视频裁剪任务容器)
13+
- [实现调度](#实现调度)
1314

1415
# 轻量级任务调度框架
1516

@@ -38,7 +39,7 @@
3839
2. 任务执行器——真正的执行任务的逻辑。
3940
3. 任务调度器——对任务容器和任务执行器进行一些逻辑操作和逻辑配合,完成整体的任务调度流程。
4041

41-
其中,容器和执行器和业务相关、可以用一系列的接口(interface)来抽象,开发者根据自己的业务实现接口。任务调度流程比较估计,可以由框架实现。
42+
其中,容器和执行器和业务相关、可以用一系列的接口(interface)来抽象,开发者根据自己的业务实现接口。任务调度流程比较固定,可以由框架实现。
4243

4344
![image](https://user-images.githubusercontent.com/15645203/210739259-86ef6480-097f-4189-98ac-3fe670dbe40b.png)
4445

@@ -86,51 +87,83 @@
8687

8788
## Usage
8889

89-
### 使用内存容器实现 a+b 任务调度
90+
### 使用内存容器实现视频裁剪异步任务调度
9091

91-
有一个计算 a+b 的服务,由于该 a+b 是一种新的高维空间的计算规则,计算非常耗时耗资源,所以该服务设计成为异步的。该服务主要有三个接口
92-
1. /add, 输入 a, b,返回 taskId。
93-
2. /status,输入 taskId, 返回该任务的状态,是否已经完成,或者计算失败。
94-
3. /result,输入 taskId,如果任务已经完成,返回计算结果。
92+
首先实现一个异步裁剪的微服务,一共四个接口,需要先安装`ffmpeg`命令
93+
1. /VideoCut, 输入原视频, 裁剪开始时间,结束时间,返回 taskId。
94+
2. /Status,输入 taskId, 返回该任务的状态,是否已经完成,或者失败。
95+
3. /GetOutputVideo, 如果任务已经完成,输入 TaskId,返回已经完成的视频路径结果。
96+
4. /Stop, 如果任务执行时间过长,可以支持停止。
9597

96-
服务代码参考 [add_service.go](https://github.com/memory-overflow/light-task-scheduler/blob/main/example/add_service/add_service.go)
98+
服务代码参考 [video_cut_service.go](https://github.com/memory-overflow/light-task-scheduler/blob/dev/demo-video-cut/example/videocut_example/video_cut/video_cut_service.go)
9799

98-
现在我们通过本任务调度框架实现一个 a+b 任务调度系统,可以控制任务并发数,并且按照队列依次调度。
100+
现在我们通过本任务调度框架实现一个对裁剪任务进行调度系统,可以控制任务并发数,和任务超时时间。并且按照队列依次调度。
99101

100-
#### 实现 a+b 任务执行器
101-
首先,需要实现一个 a+b 任务的执行器,执行器实际上就是调用 a+b 服务的接口。执行器的实现参考[example_actuator.go](https://github.com/memory-overflow/light-task-scheduler/blob/main/example/actuator/example_actuator.go)
102+
#### 定义视频裁剪任务
103+
```go
104+
// VideoCutTask 视频裁剪任务结构
105+
type VideoCutTask struct {
106+
TaskId string
107+
CutStartTime, CutEndTime float32
108+
InputVideo string
109+
}
110+
```
102111

103-
#### 实现 a+b 任务容器
104-
这里,我们直接使用队列来作为任务容器,所以可以直接用框架预置的 queueContainer 作为任务容器,无需单独实现。
112+
#### 实现视频裁剪任务执行器
113+
实现一个视频裁剪任务的执行器,执行器实际上就是调用视频裁剪微服务的 API 接口。执行器的实现参考[video_cut_actuator.go](https://github.com/memory-overflow/light-task-scheduler/blob/dev/demo-video-cut/example/videocut_example/video_cut/video_cut_actuator.go), 这里对任务结果只是输出到 stdout 展示,不做后续更多处理了。
114+
115+
#### 实现视频裁剪任务容器
116+
这里,我们简单的直接使用队列来作为任务容器,所以可以直接用框架预置的 queueContainer 作为任务容器,无需单独实现。
105117

106118
#### 实现调度
107-
参考代码[main.go](https://github.com/memory-overflow/light-task-scheduler/blob/main/example/main.go)
119+
参考代码[main.go](https://github.com/memory-overflow/light-task-scheduler/blob/dev/demo-video-cut/example/videocut_example/main.go)`go run main.go xxx.mp4` 可以执行 demo(需要安装 ffmpeg 命令)。
108120

109121
```go
110-
// 构建任务容器,队列长度 10000
111-
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
112-
// 构建任务执行器
113-
actuator := actuator.MakeExampleActuator()
114-
// 构建调度器,自动开启调度
115-
sch := lighttaskscheduler.MakeNewScheduler(
116-
context.Background(),
117-
container, actuator,
118-
lighttaskscheduler.Config{
119-
TaskLimit: 5, // 任务并发限制
120-
ScanInterval: 100*time.Millisecond, // 系统扫描轮询周期,内存容器可以快速扫描,入股是 db 容器要注意配置合理的扫描间隔,防止对 db 造成比较大压力。
121-
})
122-
123-
// 添加任务
124-
for i := 0; i < 1000; i++ {
125-
sch.AddTask(context.Background(),
126-
lighttaskscheduler.Task{
127-
TaskId: strconv.Itoa(i), // 每个任务都需要绑定一个唯一 id
128-
TaskItem: task.ExampleTask{
129-
TaskId: uint32(i),
130-
A: r.Int31() % 1000,
131-
B: r.Int31() % 1000,
132-
},
133-
})
122+
func main() {
123+
inputVideo := os.Args[1]
124+
go videocut.StartServer() // start video cut microservice
125+
126+
// 构建队列容器,队列长度 10000
127+
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
128+
// 构建裁剪任务执行器
129+
actuator := videocut.MakeVideoCutActuator()
130+
sch := lighttaskscheduler.MakeNewScheduler(
131+
context.Background(),
132+
container, actuator,
133+
lighttaskscheduler.Config{
134+
TaskLimit: 2, // 2 并发
135+
ScanInterval: 50 * time.Millisecond,
136+
TaskTimeout: 20 * time.Second, // 20s 超时时间
137+
},
138+
)
139+
140+
// 添加任务,把视频裁前 100s 剪成 10s 一个的视频
141+
var c chan os.Signal
142+
for i := 0; i < 100; i += 10 {
143+
select {
144+
case <-c:
145+
return
146+
default:
147+
if err := sch.AddTask(context.Background(),
148+
lighttaskscheduler.Task{
149+
// 这里的任务 ID 是为了调度框架方便标识唯一任务的ID,
150+
// 和微服务的任务ID不同,是映射关系
151+
TaskId: strconv.Itoa(i),
152+
TaskItem: videocut.VideoCutTask{
153+
InputVideo: inputVideo,
154+
CutStartTime: float32(i),
155+
CutEndTime: float32(i + 10),
156+
},
157+
}); err != nil {
158+
log.Printf("add task TaskId %s failed: %v\n", strconv.Itoa(i), err)
159+
}
160+
}
161+
}
162+
163+
for range c {
164+
log.Println("stop Scheduling")
165+
sch.Close()
166+
return
167+
}
134168
}
135-
136169
```

task_actuator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import "context"
55
// TaskActuator 任务执行器接口
66
type TaskActuator interface {
77

8-
// Init 任务在被调度前的初始化工作
8+
// Init 任务在被加入调度系统前的初始化工作
99
Init(ctx context.Context, task *Task) (newTask *Task, err error)
1010

1111
// Start 开始执行任务,不要阻塞该方法,如果是同步任务,在单独的携程执行,执行器在内存中维护任务状态,转成异步任务,
@@ -16,7 +16,7 @@ type TaskActuator interface {
1616
// ExportOutput 导出任务输出,自行处理任务结果
1717
ExportOutput(ctx context.Context, task *Task) error
1818

19-
// GetOutput 获取任务数据
19+
// GetOutput 获取任务数据,调度框架不会调用该接口,提供给用户自由选择是否实现
2020
GetOutput(ctx context.Context, task *Task) (data interface{}, err error)
2121

2222
// Stop 停止任务

task_scheduler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ func MakeNewScheduler(
4747
// AddTask 添加一个任务,需要把任务转换成 lighttaskscheduler.Task 的通用形式
4848
// 注意一定要配置一个唯一的任务 id 标识
4949
func (s *TaskScheduler) AddTask(ctx context.Context, task Task) error {
50-
return s.Container.AddTask(ctx, task)
50+
newTask, err := s.Actuator.Init(ctx, &task) // 初始化任务
51+
if err != nil {
52+
return fmt.Errorf("task init failed: %v", err)
53+
}
54+
return s.Container.AddTask(ctx, *newTask)
5155
}
5256

5357
// Close 停止调度

0 commit comments

Comments
 (0)