Skip to content

Commit 41731b7

Browse files
committed
chore: Create new example
Signed-off-by: Javier Aliaga <[email protected]>
1 parent b5283c5 commit 41731b7

File tree

5 files changed

+189
-6
lines changed

5 files changed

+189
-6
lines changed

examples/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ require (
1919
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2020
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2121
github.com/dapr/dapr v1.15.5 // indirect
22-
github.com/dapr/durabletask-go v0.6.5 // indirect
23-
github.com/dapr/kit v0.15.2 // indirect
22+
github.com/dapr/durabletask-go v0.7.3-0.20250705135742-20fd7583dc03 // indirect
23+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69 // indirect
2424
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2525
github.com/go-chi/chi/v5 v5.1.0 // indirect
2626
github.com/go-logr/logr v1.4.2 // indirect

examples/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
88
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
99
github.com/dapr/dapr v1.15.5 h1:bkCmcQQfaQ5C49P3l0elCzDr4/Oja5kitM3jStY+2RY=
1010
github.com/dapr/dapr v1.15.5/go.mod h1:wwopO8AD9CZOgVj4bsdXNmeQujMo0v3MLAqeaX+gb00=
11-
github.com/dapr/durabletask-go v0.6.5 h1:aWcxMfYudojpgRjJRdUr7yyZ7rGcvLtWXUuA4cGHBR0=
12-
github.com/dapr/durabletask-go v0.6.5/go.mod h1:nTZ5fCbJLnZbVdi6Z2YxdDF1OgQZL3LroogGuetrwuA=
13-
github.com/dapr/kit v0.15.2 h1:5H9IhKScU/SpE2Hxvr5vUlmYN1e2MJN15RoT8/KSziU=
14-
github.com/dapr/kit v0.15.2/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw=
11+
github.com/dapr/durabletask-go v0.7.3-0.20250705135742-20fd7583dc03 h1:/Sl6sJE3jdPh96KfGShshIF0xZlAjN36Buq9mcNFV7Q=
12+
github.com/dapr/durabletask-go v0.7.3-0.20250705135742-20fd7583dc03/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
13+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69 h1:I1Uoy3fn906AZZdG8+n8fHitgY7Wn9c+smz4WQdOy1Q=
14+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69/go.mod h1:6w2Pr38zOAtBn+ld/jknwI4kgMfwanCIcFVnPykdPZQ=
1515
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1616
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1717
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Dapr Parallel Workflow Example with go-sdk
2+
3+
## Step
4+
5+
### Prepare
6+
7+
- Dapr installed
8+
9+
### Run Workflow
10+
11+
<!-- STEP
12+
name: Run Workflow
13+
output_match_mode: substring
14+
expected_stdout_lines:
15+
- '== APP == Workflow(s) and activities registered.'
16+
- 'work item listener started'
17+
- '== APP == RetryN 1'
18+
- '== APP == RetryN 2'
19+
- '== APP == RetryN 3'
20+
- '== APP == RetryN 4'
21+
- '== APP == RetryN 1'
22+
- '== APP == RetryN 2'
23+
- '== APP == RetryN 3'
24+
- '== APP == RetryN 4'
25+
- '== APP == workflow status: COMPLETED'
26+
- '== APP == workflow terminated'
27+
- '== APP == workflow purged'
28+
29+
background: true
30+
sleep: 30
31+
timeout_seconds: 60
32+
-->
33+
34+
```bash
35+
dapr run --app-id workflow-taskexecutionid \
36+
--dapr-grpc-port 50001 \
37+
--log-level debug \
38+
--resources-path ./config \
39+
-- go run ./main.go
40+
```
41+
42+
<!-- END_STEP -->
43+
44+
## Result
45+
46+
```
47+
- '== APP == Workflow(s) and activities registered.'
48+
- 'work item listener started'
49+
- '== APP == RetryN 1'
50+
- '== APP == RetryN 2'
51+
- '== APP == RetryN 3'
52+
- '== APP == RetryN 4'
53+
- '== APP == RetryN 1'
54+
- '== APP == RetryN 2'
55+
- '== APP == RetryN 3'
56+
- '== APP == RetryN 4'
57+
- '== APP == workflow status: COMPLETED'
58+
- '== APP == workflow terminated'
59+
- '== APP == workflow purged'
60+
```
61+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: wf-store
5+
spec:
6+
type: state.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
11+
- name: redisPassword
12+
value: ""
13+
- name: actorStateStore
14+
value: "true"
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/dapr/go-sdk/workflow"
12+
)
13+
14+
func main() {
15+
w, err := workflow.NewWorker()
16+
if err != nil {
17+
log.Fatalf("failed to initialise worker: %v", err)
18+
}
19+
20+
if err := w.RegisterWorkflow(TaskExecutionIdWorkflow); err != nil {
21+
log.Fatalf("failed to register workflow: %v", err)
22+
}
23+
if err := w.RegisterActivity(RetryN); err != nil {
24+
log.Fatalf("failed to register activity: %v", err)
25+
}
26+
fmt.Println("Workflow(s) and activities registered.")
27+
28+
if err := w.Start(); err != nil {
29+
log.Fatalf("failed to start worker")
30+
}
31+
32+
wfClient, err := workflow.NewClient()
33+
if err != nil {
34+
log.Fatalf("failed to initialise client: %v", err)
35+
}
36+
ctx := context.Background()
37+
id, err := wfClient.ScheduleNewWorkflow(ctx, "TaskExecutionIdWorkflow", workflow.WithInput(5))
38+
if err != nil {
39+
log.Fatalf("failed to schedule a new workflow: %v", err)
40+
}
41+
42+
metadata, err := wfClient.WaitForWorkflowCompletion(ctx, id)
43+
if err != nil {
44+
log.Fatalf("failed to get workflow: %v", err)
45+
}
46+
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
47+
48+
err = wfClient.TerminateWorkflow(ctx, id)
49+
if err != nil {
50+
log.Fatalf("failed to terminate workflow: %v", err)
51+
}
52+
fmt.Println("workflow terminated")
53+
54+
err = wfClient.PurgeWorkflow(ctx, id)
55+
if err != nil {
56+
log.Fatalf("failed to purge workflow: %v", err)
57+
}
58+
fmt.Println("workflow purged")
59+
}
60+
61+
var eMap = sync.Map{}
62+
63+
func TaskExecutionIdWorkflow(ctx *workflow.WorkflowContext) (any, error) {
64+
var retries int
65+
if err := ctx.GetInput(&retries); err != nil {
66+
return 0, err
67+
}
68+
69+
var workBatch []int
70+
if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
71+
MaxAttempts: retries,
72+
InitialRetryInterval: 100 * time.Millisecond,
73+
BackoffCoefficient: 2,
74+
MaxRetryInterval: 1 * time.Second,
75+
}), workflow.ActivityInput(retries)).Await(&workBatch); err != nil {
76+
return 0, err
77+
}
78+
79+
if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
80+
MaxAttempts: retries,
81+
InitialRetryInterval: 100 * time.Millisecond,
82+
BackoffCoefficient: 2,
83+
MaxRetryInterval: 1 * time.Second,
84+
}), workflow.ActivityInput(retries)).Await(&workBatch); err != nil {
85+
return 0, err
86+
}
87+
88+
return 0, nil
89+
}
90+
91+
func RetryN(ctx workflow.ActivityContext) (any, error) {
92+
taskExecutionID := ctx.GetTaskExecutionID()
93+
counter, _ := eMap.LoadOrStore(taskExecutionID, &atomic.Int32{})
94+
var retries int32
95+
if err := ctx.GetInput(&retries); err != nil {
96+
return 0, err
97+
}
98+
99+
counter.(*atomic.Int32).Add(1)
100+
fmt.Println("RetryN ", counter.(*atomic.Int32).Load())
101+
102+
if counter.(*atomic.Int32).Load() < retries-1 {
103+
return nil, fmt.Errorf("failed")
104+
}
105+
106+
return nil, nil
107+
108+
}

0 commit comments

Comments
 (0)