Skip to content

Commit a0e0f8b

Browse files
committed
chore: Create new example
Signed-off-by: Javier Aliaga <[email protected]>
1 parent e5b6093 commit a0e0f8b

File tree

4 files changed

+184
-0
lines changed

4 files changed

+184
-0
lines changed

.github/workflows/validate_examples.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ jobs:
175175
"socket",
176176
"workflow",
177177
"workflow-parallel",
178+
"workflow-taskexecutionid"
178179
]
179180
steps:
180181
- name: Check out code onto GOPATH
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Dapr Parallel Workflow Example with go-sdk
2+
3+
## Step
4+
5+
### Prepare
6+
7+
- Dapr installed
8+
9+
### Run Workflow
10+
11+
<!-- STEP
12+
name: Run Workflow
13+
output_match_mode: substring
14+
expected_stdout_lines:
15+
- '== APP == Workflow(s) and activities registered.'
16+
- 'work item listener started'
17+
- '== APP == RetryN 1'
18+
- '== APP == RetryN 2'
19+
- '== APP == RetryN 3'
20+
- '== APP == RetryN 4'
21+
- '== APP == RetryN 1'
22+
- '== APP == RetryN 2'
23+
- '== APP == RetryN 3'
24+
- '== APP == RetryN 4'
25+
- '== APP == workflow status: COMPLETED'
26+
- '== APP == workflow terminated'
27+
- '== APP == workflow purged'
28+
29+
background: true
30+
sleep: 30
31+
timeout_seconds: 60
32+
-->
33+
34+
```bash
35+
dapr run --app-id workflow-taskexecutionid \
36+
--dapr-grpc-port 50001 \
37+
--log-level debug \
38+
--resources-path ./config \
39+
-- go run ./main.go
40+
```
41+
42+
<!-- END_STEP -->
43+
44+
## Result
45+
46+
```
47+
- '== APP == Workflow(s) and activities registered.'
48+
- 'work item listener started'
49+
- '== APP == RetryN 1'
50+
- '== APP == RetryN 2'
51+
- '== APP == RetryN 3'
52+
- '== APP == RetryN 4'
53+
- '== APP == RetryN 1'
54+
- '== APP == RetryN 2'
55+
- '== APP == RetryN 3'
56+
- '== APP == RetryN 4'
57+
- '== APP == workflow status: COMPLETED'
58+
- '== APP == workflow terminated'
59+
- '== APP == workflow purged'
60+
```
61+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: wf-store
5+
spec:
6+
type: state.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
11+
- name: redisPassword
12+
value: ""
13+
- name: actorStateStore
14+
value: "true"
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/dapr/go-sdk/workflow"
12+
)
13+
14+
func main() {
15+
w, err := workflow.NewWorker()
16+
if err != nil {
17+
log.Fatalf("failed to initialise worker: %v", err)
18+
}
19+
20+
if err := w.RegisterWorkflow(TaskExecutionIdWorkflow); err != nil {
21+
log.Fatalf("failed to register workflow: %v", err)
22+
}
23+
if err := w.RegisterActivity(RetryN); err != nil {
24+
log.Fatalf("failed to register activity: %v", err)
25+
}
26+
fmt.Println("Workflow(s) and activities registered.")
27+
28+
if err := w.Start(); err != nil {
29+
log.Fatalf("failed to start worker")
30+
}
31+
32+
wfClient, err := workflow.NewClient()
33+
if err != nil {
34+
log.Fatalf("failed to initialise client: %v", err)
35+
}
36+
ctx := context.Background()
37+
id, err := wfClient.ScheduleNewWorkflow(ctx, "TaskExecutionIdWorkflow", workflow.WithInput(5))
38+
if err != nil {
39+
log.Fatalf("failed to schedule a new workflow: %v", err)
40+
}
41+
42+
metadata, err := wfClient.WaitForWorkflowCompletion(ctx, id)
43+
if err != nil {
44+
log.Fatalf("failed to get workflow: %v", err)
45+
}
46+
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
47+
48+
err = wfClient.TerminateWorkflow(ctx, id)
49+
if err != nil {
50+
log.Fatalf("failed to terminate workflow: %v", err)
51+
}
52+
fmt.Println("workflow terminated")
53+
54+
err = wfClient.PurgeWorkflow(ctx, id)
55+
if err != nil {
56+
log.Fatalf("failed to purge workflow: %v", err)
57+
}
58+
fmt.Println("workflow purged")
59+
}
60+
61+
var eMap = sync.Map{}
62+
63+
func TaskExecutionIdWorkflow(ctx *workflow.WorkflowContext) (any, error) {
64+
var retries int
65+
if err := ctx.GetInput(&retries); err != nil {
66+
return 0, err
67+
}
68+
69+
var workBatch []int
70+
if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
71+
MaxAttempts: retries,
72+
InitialRetryInterval: 100 * time.Millisecond,
73+
BackoffCoefficient: 2,
74+
MaxRetryInterval: 1 * time.Second,
75+
}), workflow.ActivityInput(retries)).Await(&workBatch); err != nil {
76+
return 0, err
77+
}
78+
79+
if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
80+
MaxAttempts: retries,
81+
InitialRetryInterval: 100 * time.Millisecond,
82+
BackoffCoefficient: 2,
83+
MaxRetryInterval: 1 * time.Second,
84+
}), workflow.ActivityInput(retries)).Await(&workBatch); err != nil {
85+
return 0, err
86+
}
87+
88+
return 0, nil
89+
}
90+
91+
func RetryN(ctx workflow.ActivityContext) (any, error) {
92+
taskExecutionID := ctx.GetTaskExecutionID()
93+
counter, _ := eMap.LoadOrStore(taskExecutionID, &atomic.Int32{})
94+
var retries int32
95+
if err := ctx.GetInput(&retries); err != nil {
96+
return 0, err
97+
}
98+
99+
counter.(*atomic.Int32).Add(1)
100+
fmt.Println("RetryN ", counter.(*atomic.Int32).Load())
101+
102+
if counter.(*atomic.Int32).Load() < retries-1 {
103+
return nil, fmt.Errorf("failed")
104+
}
105+
106+
return nil, nil
107+
108+
}

0 commit comments

Comments
 (0)