Skip to content

Commit ef228e2

Browse files
committed
chore: Add idempotency example
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 9b177cf commit ef228e2

File tree

6 files changed

+213
-3
lines changed

6 files changed

+213
-3
lines changed

examples/src/main/java/io/dapr/examples/workflows/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ Those examples contain the following workflow patterns:
5353
4. [External Event Pattern](#external-event-pattern)
5454
5. [Child-workflow Pattern](#child-workflow-pattern)
5555
6. [Compensation Pattern](#compensation-pattern)
56+
6. [Suspend/resume Pattern](#suspendresume-pattern)
57+
7. [Idempotency Pattern](#idempotency-pattern)
5658

5759
### Chaining Pattern
5860
In the chaining pattern, a sequence of activities executes in a specific order.
@@ -707,4 +709,6 @@ The client log:
707709
```text
708710
Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255
709711
workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed.
710-
```
712+
```
713+
714+
### Idempotency Pattern
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.examples.workflows.idempotency;
15+
16+
import io.dapr.workflows.client.DaprWorkflowClient;
17+
import io.dapr.workflows.client.WorkflowInstanceStatus;
18+
import io.dapr.workflows.client.WorkflowRuntimeStatus;
19+
20+
import java.util.ArrayList;
21+
import java.util.concurrent.TimeoutException;
22+
23+
public class IdempotencyClient {
24+
/**
25+
* The main method to start the client.
26+
*
27+
* @param args Input arguments (unused).
28+
* @throws InterruptedException If program has been interrupted.
29+
*/
30+
public static void main(String[] args) {
31+
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
32+
String instanceId = client.scheduleNewWorkflow(IdempotentWorkflow.class);
33+
System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId);
34+
WorkflowInstanceStatus workflowInstanceStatus =
35+
client.waitForInstanceCompletion(instanceId, null, true);
36+
37+
if (workflowInstanceStatus == null) {
38+
System.out.printf("workflow instance with ID: %s not found%n", instanceId);
39+
return;
40+
}
41+
42+
if (workflowInstanceStatus.getRuntimeStatus() != WorkflowRuntimeStatus.COMPLETED) {
43+
System.out.printf("workflow instance with ID: %s failed", instanceId);
44+
return;
45+
}
46+
47+
var result = workflowInstanceStatus.readOutputAs(ArrayList.class);
48+
System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result.toString());
49+
50+
} catch (TimeoutException | InterruptedException e) {
51+
throw new RuntimeException(e);
52+
}
53+
}
54+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.examples.workflows.idempotency;
15+
16+
import io.dapr.workflows.runtime.WorkflowRuntime;
17+
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
18+
19+
public class IdempotencyWorker {
20+
/**
21+
* The main method of this app.
22+
*
23+
* @param args The port the app will listen on.
24+
* @throws Exception An Exception.
25+
*/
26+
public static void main(String[] args) throws Exception {
27+
// Register the Workflow with the builder.
28+
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(IdempotentWorkflow.class);
29+
builder.registerActivity(IdempotentActivity.class);
30+
31+
// Build and then start the workflow runtime pulling and executing tasks
32+
WorkflowRuntime runtime = builder.build();
33+
System.out.println("Start workflow runtime");
34+
runtime.start();
35+
}
36+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2023 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.examples.workflows.idempotency;
15+
16+
import io.dapr.workflows.WorkflowActivity;
17+
import io.dapr.workflows.WorkflowActivityContext;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
25+
public class IdempotentActivity implements WorkflowActivity {
26+
27+
Logger logger = LoggerFactory.getLogger(IdempotentActivity.class);
28+
29+
@Override
30+
public Object run(WorkflowActivityContext ctx) {
31+
32+
logger.info("[{}] Starting Activity {} ", ctx.getTaskExecutionId(), ctx.getName());
33+
var limit = ctx.getInput(Integer.class);
34+
35+
var counter = IdempotentWorkflow.getKeyStore().getOrDefault(ctx.getTaskExecutionId(), new AtomicInteger(0));
36+
if (counter.get() != limit) {
37+
logger.info("Task execution key[{}] with limit {}, incrementing counter {}",ctx.getTaskExecutionId(), limit, counter.get());
38+
IdempotentWorkflow.getKeyStore().put(ctx.getTaskExecutionId(), new AtomicInteger(counter.incrementAndGet()));
39+
40+
throw new IllegalStateException("Task execution key not found");
41+
}
42+
43+
return counter.get();
44+
}
45+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2023 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.examples.workflows.idempotency;
15+
16+
import java.time.Duration;
17+
import java.util.ArrayList;
18+
import java.util.Map;
19+
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
23+
import io.dapr.workflows.Workflow;
24+
import io.dapr.workflows.WorkflowStub;
25+
import io.dapr.workflows.WorkflowTaskOptions;
26+
import io.dapr.workflows.WorkflowTaskRetryPolicy;
27+
28+
29+
public class IdempotentWorkflow implements Workflow {
30+
31+
32+
private static Map<String, AtomicInteger> keyStore;
33+
34+
35+
public static Map<String, AtomicInteger> getKeyStore() {
36+
if (keyStore == null) {
37+
synchronized (IdempotentWorkflow.class) {
38+
if (keyStore == null) {
39+
keyStore = new ConcurrentHashMap<>();
40+
}
41+
}
42+
}
43+
return keyStore;
44+
}
45+
46+
47+
@Override
48+
public WorkflowStub create() {
49+
return ctx -> {
50+
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
51+
52+
var result = new ArrayList<Integer>();
53+
54+
WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder()
55+
.setMaxNumberOfAttempts(10)
56+
.setFirstRetryInterval(Duration.ofSeconds(1))
57+
.setMaxRetryInterval(Duration.ofSeconds(10))
58+
.setBackoffCoefficient(2.0)
59+
.setRetryTimeout(Duration.ofSeconds(10))
60+
.build());
61+
62+
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 3, options, Integer.class).await());
63+
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 2, options, Integer.class).await());
64+
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 1, options, Integer.class).await());
65+
66+
result.forEach(r -> ctx.getLogger().info("Result: " + r));
67+
68+
ctx.complete(result);
69+
};
70+
}
71+
}

sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@
1212
*/
1313
package io.dapr.it.testcontainers;
1414

15-
import java.util.HashMap;
1615
import java.util.Map;
16+
import java.util.concurrent.ConcurrentHashMap;
1717

1818
public class KeyStore {
1919

20-
private final Map<String, Boolean> keyStore = new HashMap<>();
20+
private final Map<String, Boolean> keyStore = new ConcurrentHashMap<>();
2121

2222
private static KeyStore instance;
2323

0 commit comments

Comments
 (0)