Skip to content

Commit bc85fa8

Browse files
committed
Add then to the Java DSL
Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent 3b36c05 commit bc85fa8

File tree

4 files changed

+238
-1
lines changed

4 files changed

+238
-1
lines changed

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.fluent.func.dsl;
1717

18+
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
1819
import io.serverlessworkflow.api.types.func.JavaContextFunction;
1920
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
2021
import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder;
@@ -62,6 +63,36 @@ public SELF when(String jqExpr) {
6263
return self();
6364
}
6465

66+
/**
67+
* Queue a {@code then(taskName)} to be applied on the concrete builder. Directs the workflow
68+
* engine to jump to the named task after this one completes.
69+
*
70+
* @param taskName the name of the next task to execute
71+
* @return this step for further chaining
72+
* @see <a
73+
* href="https://github.com/serverlessworkflow/specification/blob/main/dsl-reference.md#task">DSL
74+
* Reference - Task</a>
75+
*/
76+
public SELF then(String taskName) {
77+
postConfigurers.add(b -> ((TaskBaseBuilder<?>) b).then(taskName));
78+
return self();
79+
}
80+
81+
/**
82+
* Queue a {@code then(directive)} to be applied on the concrete builder. Directs the workflow
83+
* engine to apply the given flow directive after this task completes.
84+
*
85+
* @param directive the flow directive (e.g., {@link FlowDirectiveEnum#END})
86+
* @return this step for further chaining
87+
* @see <a
88+
* href="https://github.com/serverlessworkflow/specification/blob/main/dsl-reference.md#task">DSL
89+
* Reference - Task</a>
90+
*/
91+
public SELF then(FlowDirectiveEnum directive) {
92+
postConfigurers.add(b -> ((TaskBaseBuilder<?>) b).then(directive));
93+
return self();
94+
}
95+
6596
// ---------------------------------------------------------------------------
6697
// FuncTaskTransformations passthroughs: EXPORT (fn/context/filter + JQ)
6798
// ---------------------------------------------------------------------------

experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
package io.serverlessworkflow.fluent.func;
1717

1818
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.call;
19+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume;
1920
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emit;
2021
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.event;
2122
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
2223
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.get;
2324
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http;
2425
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen;
2526
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne;
26-
import static io.serverlessworkflow.fluent.spec.dsl.DSL.auth;
2727
import static io.serverlessworkflow.fluent.spec.dsl.DSL.use;
2828
import static org.junit.jupiter.api.Assertions.assertEquals;
2929
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -432,4 +432,95 @@ void call_with_preconfigured_http_spec() {
432432
.getUse());
433433
assertEquals(Map.of("foo", "bar"), http.getWith().getBody());
434434
}
435+
436+
@Test
437+
@DisplayName("function(name, fn).then(taskName) sets FlowDirective string on the task")
438+
void function_step_then_task_name_sets_flow_directive() {
439+
Workflow wf =
440+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
441+
.tasks(
442+
function("myfunction", String::trim, String.class).then("otherTask"),
443+
function("otherTask", String::strip, String.class))
444+
.build();
445+
446+
List<TaskItem> items = wf.getDo();
447+
assertEquals(2, items.size());
448+
449+
Task t = items.get(0).getTask();
450+
assertNotNull(t.getCallTask(), "CallTask expected");
451+
452+
CallJava callJava = (CallJava) t.getCallTask().get();
453+
assertNotNull(callJava.getThen(), "then() should be set on the task");
454+
assertEquals("otherTask", callJava.getThen().getString(), "then() should point to 'otherTask'");
455+
}
456+
457+
@Test
458+
@DisplayName("function(name, fn).then(FlowDirectiveEnum.END) sets END directive on the task")
459+
void function_step_then_flow_directive_enum_sets_end() {
460+
Workflow wf =
461+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
462+
.tasks(function("myfunction", String::trim, String.class).then(FlowDirectiveEnum.END))
463+
.build();
464+
465+
List<TaskItem> items = wf.getDo();
466+
assertEquals(1, items.size());
467+
468+
Task t = items.get(0).getTask();
469+
assertNotNull(t.getCallTask(), "CallTask expected");
470+
471+
CallJava callJava = (CallJava) t.getCallTask().get();
472+
assertNotNull(callJava.getThen(), "then() should be set on the task");
473+
assertEquals(
474+
FlowDirectiveEnum.END,
475+
callJava.getThen().getFlowDirectiveEnum(),
476+
"then() should be FlowDirectiveEnum.END");
477+
}
478+
479+
@Test
480+
@DisplayName(
481+
"consume(name, Consumer, Class).then(taskName) sets FlowDirective string on the task")
482+
void consume_step_then_task_name_sets_flow_directive() {
483+
Workflow wf =
484+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
485+
.tasks(
486+
consume("sendNewsletter", (String s) -> {}, String.class).then("otherTask"),
487+
function("nextTask", String::strip, String.class),
488+
function("otherTask", String::strip, String.class))
489+
.build();
490+
491+
List<TaskItem> items = wf.getDo();
492+
assertEquals(3, items.size());
493+
494+
Task t = items.get(0).getTask();
495+
assertNotNull(t.getCallTask(), "CallTask expected for consume step");
496+
497+
CallJava callJava = (CallJava) t.getCallTask().get();
498+
assertNotNull(callJava.getThen(), "then() should be set on the consume task");
499+
assertEquals("otherTask", callJava.getThen().getString(), "then() should point to 'otherTask'");
500+
}
501+
502+
@Test
503+
@DisplayName(
504+
"consume(name, Consumer, Class).then(FlowDirectiveEnum.END) sets END directive on the task")
505+
void consume_step_then_flow_directive_enum_sets_end() {
506+
Workflow wf =
507+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
508+
.tasks(
509+
consume("sendNewsletter", (String s) -> {}, String.class)
510+
.then(FlowDirectiveEnum.END))
511+
.build();
512+
513+
List<TaskItem> items = wf.getDo();
514+
assertEquals(1, items.size());
515+
516+
Task t = items.get(0).getTask();
517+
assertNotNull(t.getCallTask(), "CallTask expected for consume step");
518+
519+
CallJava callJava = (CallJava) t.getCallTask().get();
520+
assertNotNull(callJava.getThen(), "then() should be set on the consume task");
521+
assertEquals(
522+
FlowDirectiveEnum.END,
523+
callJava.getThen().getFlowDirectiveEnum(),
524+
"then() should be FlowDirectiveEnum.END");
525+
}
435526
}

impl/test/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,18 @@
9494
<artifactId>grpc-netty</artifactId>
9595
<scope>test</scope>
9696
</dependency>
97+
<dependency>
98+
<groupId>io.serverlessworkflow</groupId>
99+
<artifactId>serverlessworkflow-experimental-fluent-func</artifactId>
100+
<scope>test</scope>
101+
<version>${project.version}</version>
102+
</dependency>
103+
<dependency>
104+
<groupId>io.serverlessworkflow</groupId>
105+
<artifactId>serverlessworkflow-experimental-lambda</artifactId>
106+
<scope>test</scope>
107+
<version>${project.version}</version>
108+
</dependency>
97109
</dependencies>
98110
<build>
99111
<resources>
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.test;
17+
18+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume;
19+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
20+
21+
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
22+
import io.serverlessworkflow.api.types.Workflow;
23+
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
24+
import io.serverlessworkflow.impl.WorkflowApplication;
25+
import io.serverlessworkflow.impl.WorkflowDefinition;
26+
import io.serverlessworkflow.impl.WorkflowModel;
27+
import java.util.Arrays;
28+
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.Test;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
public class WorkflowThenTest {
34+
35+
private static final Logger log = LoggerFactory.getLogger(WorkflowThenTest.class);
36+
@Test
37+
void consume_then_skips_next_task_and_jumps_to_target() {
38+
39+
Workflow wf =
40+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
41+
.tasks(
42+
consume(
43+
"sendNewsletter",
44+
(Object input) -> log.info("Consuming: {}", input),
45+
Object.class)
46+
.then("otherTask"),
47+
function("nextTask", v -> "nextTask: " + v, String.class),
48+
function("otherTask", v -> "otherTask: " + v, String.class))
49+
.build();
50+
51+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
52+
WorkflowDefinition def = app.workflowDefinition(wf);
53+
WorkflowModel model = def.instance("hello newsletter").start().join();
54+
55+
String output = model.asText().orElseThrow();
56+
Assertions.assertEquals("otherTask: hello newsletter", output);
57+
}
58+
}
59+
60+
@Test
61+
void function_then_skips_next_task_and_jumps_to_target() {
62+
63+
Workflow wf =
64+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
65+
.tasks(
66+
function("arrayFromString", input -> input.split(","), String.class)
67+
.then("otherTask"),
68+
function("nextTask", arr -> "nextTask: " + Arrays.toString(arr), String[].class),
69+
function("otherTask", arr -> "otherTask: " + Arrays.toString(arr), String[].class))
70+
.build();
71+
72+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
73+
WorkflowDefinition def = app.workflowDefinition(wf);
74+
String output = def.instance("hello,from,cncf").start().join().asText().orElseThrow();
75+
76+
Assertions.assertEquals("otherTask: [hello, from, cncf]", output);
77+
}
78+
}
79+
80+
@Test
81+
void function_then_end_directive_stops_workflow_execution() {
82+
83+
Workflow wf =
84+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
85+
.tasks(
86+
function("uppercase", String::toUpperCase, String.class)
87+
.then(FlowDirectiveEnum.END),
88+
function("lowercase", String::toLowerCase, String.class))
89+
.build();
90+
91+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
92+
WorkflowDefinition def = app.workflowDefinition(wf);
93+
String output =
94+
def.instance("Hello Alice, Hello Bob, Hello Everyone!")
95+
.start()
96+
.join()
97+
.asText()
98+
.orElseThrow();
99+
100+
Assertions.assertEquals("HELLO ALICE, HELLO BOB, HELLO EVERYONE!", output);
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)