@@ -152,11 +152,13 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
152
152
方案:
153
153
- 采用回调的方式,避免深度递归。
154
154
- 使用 activeRunQueue 记录待运行检查的节点(可能可以运行),并控制并发数量。
155
- - 每次添加新节点,以及节点运行结束后,均会执行一次 processNextNode 方法。 processNextNode 方法,如果没触发跳出条件,则必定会取一个 activeRunQueue 继续检查处理。
155
+ - 每次添加新节点,以及节点运行结束后,均会执行一次 processActiveNode 方法。 processActiveNode 方法,如果没触发跳出条件,则必定会取一个 activeRunQueue 继续检查处理。
156
156
- checkNodeCanRun 会检查该节点状态
157
157
- 没满足运行条件:跳出函数
158
158
- 运行:执行节点逻辑,并返回结果,将 target node 加入到 activeRunQueue 中,等待队列处理。
159
159
- 跳过:执行跳过逻辑,并将其后续的 target node 也进行一次检查。
160
+ 特殊情况:
161
+ - 触发交互节点后,需要跳过所有 skip 节点,避免后续执行了 skipNode。
160
162
*/
161
163
class WorkflowQueue {
162
164
// Workflow variables
@@ -176,6 +178,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
176
178
177
179
// Queue variables
178
180
private activeRunQueue = new Set < string > ( ) ;
181
+ private skipNodeQueue : { node : RuntimeNodeItemType ; skippedNodeIdList : Set < string > } [ ] = [ ] ;
179
182
private runningNodeCount = 0 ;
180
183
private maxConcurrency : number ;
181
184
private resolve : ( e : WorkflowQueue ) => void ;
@@ -198,13 +201,17 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
198
201
}
199
202
this . activeRunQueue . add ( nodeId ) ;
200
203
201
- this . processNextNode ( ) ;
204
+ this . processActiveNode ( ) ;
202
205
}
203
206
// Process next active node
204
- private processNextNode ( ) {
207
+ private processActiveNode ( ) {
205
208
// Finish
206
209
if ( this . activeRunQueue . size === 0 && this . runningNodeCount === 0 ) {
207
- this . resolve ( this ) ;
210
+ if ( this . skipNodeQueue . length > 0 && ! this . nodeInteractiveResponse ) {
211
+ this . processSkipNodes ( ) ;
212
+ } else {
213
+ this . resolve ( this ) ;
214
+ }
208
215
return ;
209
216
}
210
217
@@ -224,12 +231,26 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
224
231
225
232
this . checkNodeCanRun ( node ) . finally ( ( ) => {
226
233
this . runningNodeCount -- ;
227
- this . processNextNode ( ) ;
234
+ this . processActiveNode ( ) ;
228
235
} ) ;
229
236
}
230
237
// 兜底,除非极端情况,否则不可能触发
231
238
else {
232
- this . processNextNode ( ) ;
239
+ this . processActiveNode ( ) ;
240
+ }
241
+ }
242
+
243
+ private addSkipNode ( node : RuntimeNodeItemType , skippedNodeIdList : Set < string > ) {
244
+ this . skipNodeQueue . push ( { node, skippedNodeIdList } ) ;
245
+ }
246
+ private processSkipNodes ( ) {
247
+ const skipItem = this . skipNodeQueue . shift ( ) ;
248
+ if ( skipItem ) {
249
+ this . checkNodeCanRun ( skipItem . node , skipItem . skippedNodeIdList ) . finally ( ( ) => {
250
+ this . processActiveNode ( ) ;
251
+ } ) ;
252
+ } else {
253
+ this . processActiveNode ( ) ;
233
254
}
234
255
}
235
256
@@ -695,9 +716,9 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
695
716
nodeRunResult . result
696
717
) ;
697
718
698
- await Promise . all (
699
- nextStepSkipNodes . map ( ( node ) => this . checkNodeCanRun ( node , skippedNodeIdList ) )
700
- ) ;
719
+ nextStepSkipNodes . forEach ( ( node ) => {
720
+ this . addSkipNode ( node , skippedNodeIdList ) ;
721
+ } ) ;
701
722
702
723
// Run next nodes
703
724
nextStepActiveNodes . forEach ( ( node ) => {
0 commit comments