Skip to content

Commit 39010b3

Browse files
committed
chore: Update examples readme
Signed-off-by: Javier Aliaga <[email protected]>
1 parent b471799 commit 39010b3

File tree

4 files changed

+149
-11
lines changed

4 files changed

+149
-11
lines changed

daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -249,34 +249,33 @@ Exiting DemoWorkflowClient.
249249

250250
## Advanced features
251251

252-
### Task Execution Keys
252+
### Task Execution Ids
253253

254-
Task execution keys are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for:
254+
Task execution ids are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for:
255+
256+
**Idempotency**: Ensuring activities are not executed multiple times for the same task
255257

256-
1. **Idempotency**: Ensuring activities are not executed multiple times for the same task
257-
2. **State Management**: Tracking the state of activity execution
258-
3. **Error Handling**: Managing retries and failures in a controlled manner
259258

260259
Here's an example of how to use task execution keys in your workflow activities:
261260

262261
```java
263-
public class TaskExecutionKeyActivity implements WorkflowActivity {
262+
public class TaskExecutionIdActivity implements WorkflowActivity {
264263
@Override
265264
public Object run(WorkflowActivityContext ctx) {
266265
// Get the task execution key for this activity
267-
String taskExecutionKey = ctx.getTaskExecutionKey();
266+
String taskExecutionId = ctx.getTaskExecutionId();
268267

269268
// Use the key to implement idempotency or state management
270269
// For example, check if this task has already been executed
271-
if (isTaskAlreadyExecuted(taskExecutionKey)) {
272-
return getPreviousResult(taskExecutionKey);
270+
if (isTaskAlreadyExecuted(taskExecutionId)) {
271+
return getPreviousResult(taskExecutionId);
273272
}
274273

275274
// Execute the activity logic
276275
Object result = executeActivityLogic();
277276

278277
// Store the result with the task execution key
279-
storeResult(taskExecutionKey, result);
278+
storeResult(taskExecutionId, result);
280279

281280
return result;
282281
}

examples/src/main/java/io/dapr/examples/workflows/README.md

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,4 +711,141 @@ Started a new external-event model workflow with instance ID: 23410d96-1afe-4698
711711
workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed.
712712
```
713713

714-
### Idempotency Pattern
714+
### Idempotency Pattern
715+
716+
The idempotency pattern ensures that activities can be safely retried without causing unintended side effects. This pattern is crucial when dealing with potentially unreliable external services or when network failures might cause activities to be retried. In this example, we demonstrate how to use task execution IDs to implement idempotent activities that return consistent results across retries.
717+
718+
The `IdempotentWorkflow` class defines the workflow with retry policies and calls activities with specific limits. The workflow uses a shared key store to track task execution attempts. See the code snippet below:
719+
```java
720+
public class IdempotentWorkflow implements Workflow {
721+
722+
private static Map<String, AtomicInteger> keyStore;
723+
724+
public static Map<String, AtomicInteger> getKeyStore() {
725+
if (keyStore == null) {
726+
synchronized (IdempotentWorkflow.class) {
727+
if (keyStore == null) {
728+
keyStore = new ConcurrentHashMap<>();
729+
}
730+
}
731+
}
732+
return keyStore;
733+
}
734+
735+
@Override
736+
public WorkflowStub create() {
737+
return ctx -> {
738+
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
739+
740+
var result = new ArrayList<Integer>();
741+
742+
WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder()
743+
.setMaxNumberOfAttempts(10)
744+
.setFirstRetryInterval(Duration.ofSeconds(1))
745+
.setMaxRetryInterval(Duration.ofSeconds(10))
746+
.setBackoffCoefficient(2.0)
747+
.setRetryTimeout(Duration.ofSeconds(10))
748+
.build());
749+
750+
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 3, options, Integer.class).await());
751+
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 2, options, Integer.class).await());
752+
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 1, options, Integer.class).await());
753+
754+
result.forEach(r -> ctx.getLogger().info("Result: " + r));
755+
756+
ctx.complete(result);
757+
};
758+
}
759+
}
760+
```
761+
762+
The `IdempotentActivity` class implements the idempotency logic using task execution IDs. Each task execution has a unique ID that remains consistent across retries, allowing the activity to track its state and ensure idempotent behavior. See the code snippet below:
763+
```java
764+
public class IdempotentActivity implements WorkflowActivity {
765+
766+
Logger logger = LoggerFactory.getLogger(IdempotentActivity.class);
767+
768+
@Override
769+
public Object run(WorkflowActivityContext ctx) {
770+
771+
logger.info("[{}] Starting Activity {} ", ctx.getTaskExecutionId(), ctx.getName());
772+
var limit = ctx.getInput(Integer.class);
773+
774+
var counter = IdempotentWorkflow.getKeyStore().getOrDefault(ctx.getTaskExecutionId(), new AtomicInteger(0));
775+
if (counter.get() != limit) {
776+
logger.info("Task execution key[{}] with limit {}, incrementing counter {}",ctx.getTaskExecutionId(), limit, counter.get());
777+
IdempotentWorkflow.getKeyStore().put(ctx.getTaskExecutionId(), new AtomicInteger(counter.incrementAndGet()));
778+
779+
throw new IllegalStateException("Task execution key not found");
780+
}
781+
782+
return counter.get();
783+
}
784+
}
785+
```
786+
787+
<!-- STEP
788+
name: Run Idempotency Pattern workflow
789+
match_order: none
790+
output_match_mode: substring
791+
expected_stdout_lines:
792+
- 'Starting Workflow: io.dapr.examples.workflows.idempotency.IdempotentWorkflow'
793+
- 'Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity'
794+
- 'Task execution key'
795+
- 'incrementing counter'
796+
- 'Result: 3'
797+
- 'Result: 2'
798+
- 'Result: 1'
799+
background: true
800+
sleep: 60
801+
timeout_seconds: 60
802+
-->
803+
804+
Execute the following script in order to run IdempotencyWorker:
805+
```sh
806+
dapr run --app-id idempotencyworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyWorker
807+
```
808+
809+
Once running, execute the following script in order to run IdempotencyClient:
810+
```sh
811+
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyClient
812+
```
813+
<!-- END_STEP -->
814+
815+
The worker logs will show how the activity handles retries idempotently:
816+
```text
817+
== APP == 2023-11-07 15:30:22,145 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.idempotency.IdempotentWorkflow
818+
== APP == 2023-11-07 15:30:22,189 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
819+
== APP == 2023-11-07 15:30:22,192 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 0
820+
== APP == 2023-11-07 15:30:22,198 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
821+
== APP == 2023-11-07 15:30:22,199 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 1
822+
== APP == 2023-11-07 15:30:22,205 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
823+
== APP == 2023-11-07 15:30:22,206 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 2
824+
== APP == 2023-11-07 15:30:22,212 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
825+
== APP == 2023-11-07 15:30:22,226 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 3
826+
== APP == 2023-11-07 15:30:22,230 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
827+
== APP == 2023-11-07 15:30:22,231 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 0
828+
== APP == 2023-11-07 15:30:22,236 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
829+
== APP == 2023-11-07 15:30:22,237 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 1
830+
== APP == 2023-11-07 15:30:22,242 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
831+
== APP == 2023-11-07 15:30:22,255 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 2
832+
== APP == 2023-11-07 15:30:22,259 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
833+
== APP == 2023-11-07 15:30:22,262 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_003] with limit 1, incrementing counter 0
834+
== APP == 2023-11-07 15:30:22,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
835+
== APP == 2023-11-07 15:30:22,280 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 1
836+
```
837+
838+
The client logs:
839+
```text
840+
Started a new chaining model workflow with instance ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g
841+
workflow instance with ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g completed with result: [3, 2, 1]
842+
```
843+
844+
Key Points:
845+
1. **Task Execution ID**: Each activity call has a unique task execution ID that remains consistent across retries
846+
2. **Retry Policy**: The workflow defines a retry policy with exponential backoff and maximum attempts
847+
3. **State Tracking**: The activity uses a shared key store to track the execution state for each task execution ID
848+
4. **Controlled Failure**: The activity intentionally fails until it reaches the specified limit, demonstrating retry behavior
849+
5. **Idempotent Result**: Once the limit is reached, subsequent retries with the same task execution ID return the same result
850+
851+
This pattern is essential for building resilient workflows that can handle transient failures without causing duplicate operations or inconsistent state.

sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.dapr.it.testcontainers;
1515

16+
import io.dapr.it.testcontainers.workflows.TestWorkflowPayload;
1617
import io.dapr.workflows.WorkflowActivity;
1718
import io.dapr.workflows.WorkflowActivityContext;
1819

sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.dapr.it.testcontainers;
1515

1616
import io.dapr.durabletask.Task;
17+
import io.dapr.it.testcontainers.workflows.TestWorkflowPayload;
1718
import io.dapr.workflows.Workflow;
1819
import io.dapr.workflows.WorkflowStub;
1920
import io.dapr.workflows.WorkflowTaskOptions;

0 commit comments

Comments
 (0)