From 83a58d44582ca70b852df19afa673112aa049ee7 Mon Sep 17 00:00:00 2001 From: Mason Date: Fri, 13 Jun 2025 09:53:22 -0400 Subject: [PATCH 1/3] Add taskName and input to retry handler Signed-off-by: Mason --- .../workflows/WorkflowTaskRetryContext.java | 38 +++++++++++++++---- .../runtime/DefaultWorkflowContext.java | 18 ++++++--- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java index 15fa3fd50..6164c2406 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryContext.java @@ -14,16 +14,17 @@ package io.dapr.workflows; import io.dapr.workflows.client.WorkflowFailureDetails; -import io.dapr.workflows.runtime.DefaultWorkflowContext; import java.time.Duration; public class WorkflowTaskRetryContext { - private final DefaultWorkflowContext workflowContext; + private final WorkflowContext workflowContext; private final int lastAttemptNumber; private final WorkflowFailureDetails lastFailure; private final Duration totalRetryTime; + private final String taskName; + private final Object input; /** * Constructor for WorkflowTaskRetryContext. @@ -32,28 +33,34 @@ public class WorkflowTaskRetryContext { * @param lastAttemptNumber The number of the previous attempt * @param lastFailure The failure details from the most recent failure * @param totalRetryTime The amount of time spent retrying + * @param taskName The name of the task + * @param input The input of the task */ public WorkflowTaskRetryContext( - DefaultWorkflowContext workflowContext, + WorkflowContext workflowContext, int lastAttemptNumber, WorkflowFailureDetails lastFailure, - Duration totalRetryTime) { + Duration totalRetryTime, + String taskName, + Object input) { this.workflowContext = workflowContext; this.lastAttemptNumber = lastAttemptNumber; this.lastFailure = lastFailure; this.totalRetryTime = totalRetryTime; + this.taskName = taskName; + this.input = input; } /** * Gets the context of the current workflow. * *

The workflow context can be used in retry handlers to schedule timers (via the - * {@link DefaultWorkflowContext#createTimer} methods) for implementing delays between retries. It can also be - * used to implement time-based retry logic by using the {@link DefaultWorkflowContext#getCurrentInstant} method. + * {@link WorkflowContext#createTimer} methods) for implementing delays between retries. It can also be + * used to implement time-based retry logic by using the {@link WorkflowContext#getCurrentInstant} method. * * @return the context of the parent workflow */ - public DefaultWorkflowContext getWorkflowContext() { + public WorkflowContext getWorkflowContext() { return this.workflowContext; } @@ -85,4 +92,21 @@ public Duration getTotalRetryTime() { return this.totalRetryTime; } + /** + * Gets the name of the task. + * + * @return the name of the task + */ + public String getTaskName() { + return taskName; + } + + /** + * Gets the input of the task. + * + * @return the task's input + */ + public Object getInput() { + return input; + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index d11e1fe77..7e46ab8f9 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -164,7 +164,7 @@ public boolean isReplaying() { * {@inheritDoc} */ public Task callActivity(String name, Object input, WorkflowTaskOptions options, Class returnType) { - TaskOptions taskOptions = toTaskOptions(options); + TaskOptions taskOptions = toTaskOptions(options, name, input); return this.innerContext.callActivity(name, input, taskOptions, returnType); } @@ -208,7 +208,7 @@ public T getInput(Class targetType) { @Override public Task callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID, @Nullable WorkflowTaskOptions options, Class returnType) { - TaskOptions taskOptions = toTaskOptions(options); + TaskOptions taskOptions = toTaskOptions(options, name, input); return this.innerContext.callSubOrchestrator(name, input, instanceID, taskOptions, returnType); } @@ -237,13 +237,13 @@ public UUID newUuid() { return this.innerContext.newUUID(); } - private TaskOptions toTaskOptions(WorkflowTaskOptions options) { + private TaskOptions toTaskOptions(WorkflowTaskOptions options, String taskName, Object input) { if (options == null) { return null; } RetryPolicy retryPolicy = toRetryPolicy(options.getRetryPolicy()); - RetryHandler retryHandler = toRetryHandler(options.getRetryHandler()); + RetryHandler retryHandler = toRetryHandler(options.getRetryHandler(), taskName, input); return new TaskOptions(retryPolicy, retryHandler); } @@ -276,9 +276,13 @@ private RetryPolicy toRetryPolicy(WorkflowTaskRetryPolicy workflowTaskRetryPolic * Converts a {@link WorkflowTaskRetryHandler} to a {@link RetryHandler}. * * @param workflowTaskRetryHandler The {@link WorkflowTaskRetryHandler} being converted + * @param taskName The name of the task + * @param input The input object passed to the task * @return A {@link RetryHandler} */ - private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHandler) { + private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHandler, + String taskName, + Object input) { if (workflowTaskRetryHandler == null) { return null; } @@ -288,7 +292,9 @@ private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHa this, retryContext.getLastAttemptNumber(), new DefaultWorkflowFailureDetails(retryContext.getLastFailure()), - retryContext.getTotalRetryTime() + retryContext.getTotalRetryTime(), + taskName, + input ); return workflowTaskRetryHandler.handle(workflowRetryContext); From fcb50864c7561e8e4d9b49588ed741e9a6335ce6 Mon Sep 17 00:00:00 2001 From: Mason Date: Mon, 16 Jun 2025 12:36:12 -0400 Subject: [PATCH 2/3] Add retry handler example Signed-off-by: Mason --- .../retryhandler/DemoRetryHandler.java | 48 +++++++++++++++++++ .../retryhandler/DemoRetryHandlerClient.java | 47 ++++++++++++++++++ .../retryhandler/DemoRetryHandlerWorker.java | 36 ++++++++++++++ .../retryhandler/DemoRetryWorkflow.java | 43 +++++++++++++++++ .../retryhandler/FailureActivity.java | 44 +++++++++++++++++ 5 files changed, 218 insertions(+) create mode 100644 examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandler.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerClient.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerWorker.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryWorkflow.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/retryhandler/FailureActivity.java diff --git a/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandler.java b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandler.java new file mode 100644 index 000000000..7c4277049 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandler.java @@ -0,0 +1,48 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.retryhandler; + +import io.dapr.workflows.WorkflowContext; +import io.dapr.workflows.WorkflowTaskRetryContext; +import io.dapr.workflows.WorkflowTaskRetryHandler; +import org.slf4j.Logger; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +public class DemoRetryHandler implements WorkflowTaskRetryHandler { + + @Override + public boolean handle(WorkflowTaskRetryContext retryContext) { + WorkflowContext workflowContext = retryContext.getWorkflowContext(); + Logger logger = retryContext.getWorkflowContext().getLogger(); + Object input = retryContext.getInput(); + String taskName = retryContext.getTaskName(); + + if(taskName.equalsIgnoreCase(FailureActivity.class.getName())) { + logger.info("FailureActivity Input: {}", input); + Instant timestampInput = (Instant) input; + // Add a second to ensure, it is 100% passed the time to success + Instant timeToSuccess = timestampInput.plusSeconds(FailureActivity.TIME_TO_SUCCESS + 1); + long timeToWait = timestampInput.until(timeToSuccess, TimeUnit.SECONDS.toChronoUnit()); + + logger.info("Waiting {} seconds before retrying.", timeToWait); + workflowContext.createTimer(Duration.ofSeconds(timeToWait)).await(); + logger.info("Send request to FailureActivity"); + } + + return true; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerClient.java b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerClient.java new file mode 100644 index 000000000..e233cc537 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerClient.java @@ -0,0 +1,47 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.retryhandler; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeoutException; + +public class DemoRetryHandlerClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(DemoRetryWorkflow.class); + System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId); + + // Block until the orchestration completes. Then print the final status, which includes the output. + WorkflowInstanceStatus workflowInstanceStatus = client.waitForInstanceCompletion( + instanceId, + Duration.ofSeconds(30), + true); + + System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, + workflowInstanceStatus.readOutputAs(Instant.class)); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerWorker.java b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerWorker.java new file mode 100644 index 000000000..3a0f215fa --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryHandlerWorker.java @@ -0,0 +1,36 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.retryhandler; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class DemoRetryHandlerWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoRetryWorkflow.class); + builder.registerActivity(FailureActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + runtime.start(); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryWorkflow.java new file mode 100644 index 000000000..f6629634f --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/DemoRetryWorkflow.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.retryhandler; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryHandler; +import org.slf4j.Logger; + +import java.time.Instant; + +public class DemoRetryWorkflow implements Workflow { + + @Override + public WorkflowStub create() { + return context -> { + Logger logger = context.getLogger(); + logger.info("Starting RetryWorkflow: {}", context.getName()); + + WorkflowTaskRetryHandler retryHandler = new DemoRetryHandler(); + WorkflowTaskOptions taskOptions = new WorkflowTaskOptions(retryHandler); + + logger.info("RetryWorkflow is calling Activity: {}", FailureActivity.class.getName()); + Instant currentTime = context.getCurrentInstant(); + Instant result = context.callActivity(FailureActivity.class.getName(), currentTime, taskOptions, Instant.class).await(); + + logger.info("RetryWorkflow finished with: {}", result); + context.complete(result); + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/retryhandler/FailureActivity.java b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/FailureActivity.java new file mode 100644 index 000000000..e5ac60b76 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/retryhandler/FailureActivity.java @@ -0,0 +1,44 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.retryhandler; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; + +public class FailureActivity implements WorkflowActivity { + + private static final Logger LOGGER = LoggerFactory.getLogger(FailureActivity.class); + public static final long TIME_TO_SUCCESS = 10; + + @Override + public Object run(WorkflowActivityContext ctx) { + LOGGER.info("Starting Activity: {}", ctx.getName()); + + Instant timestamp = ctx.getInput(Instant.class); + + LOGGER.info("Input timestamp: {}", timestamp); + if(timestamp.plusSeconds(TIME_TO_SUCCESS).isBefore(Instant.now())) { + LOGGER.info("Completing Activity: {}", ctx.getName()); + return Instant.now(); + } + + LOGGER.info("Throwing exception for Activity: {}", ctx.getName()); + + throw new RuntimeException("Failure!"); + } +} From 23039f18c093d8aa61792e3e3efc9c2b227389b2 Mon Sep 17 00:00:00 2001 From: Mason Date: Mon, 16 Jun 2025 14:22:52 -0400 Subject: [PATCH 3/3] Update example README file Signed-off-by: Mason --- .../java/io/dapr/examples/workflows/README.md | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/examples/src/main/java/io/dapr/examples/workflows/README.md b/examples/src/main/java/io/dapr/examples/workflows/README.md index b90726080..dc36ae4dc 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/README.md +++ b/examples/src/main/java/io/dapr/examples/workflows/README.md @@ -53,6 +53,8 @@ Those examples contain the following workflow patterns: 4. [External Event Pattern](#external-event-pattern) 5. [Child-workflow Pattern](#child-workflow-pattern) 6. [Compensation Pattern](#compensation-pattern) +7. [Suspend/Resume Pattern](#suspendresume-pattern) +8. [RetryHandler](#retryhandler) ### Chaining Pattern In the chaining pattern, a sequence of activities executes in a specific order. @@ -707,4 +709,94 @@ The client log: ```text Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. +``` + +### RetryHandler + +When an activity or child workflow fails, Dapr supports auto retry mechanisms such as a `WorkflowTaskRetryHandler` and +`WorkflowTaskRetryPolicy`. An example of `WorkflowTaskRetryPolicy` in use can be found in the child workflow example. + +A `WorkflowTaskRetryHandler` allows you to have complete control over whether an activity or child workflow retries or fails. +This is done by implemented the handle method within this interface. + +The example RetryHandler below allows for unlimited retries. If a task of type `FailureActivity` fails, it pulls out the +input passed to the activity, an `Instant` in this case, and then uses that to calculate a backoff time. +```java +public class DemoRetryHandler implements WorkflowTaskRetryHandler { + + @Override + public boolean handle(WorkflowTaskRetryContext retryContext) { + WorkflowContext workflowContext = retryContext.getWorkflowContext(); + Logger logger = retryContext.getWorkflowContext().getLogger(); + Object input = retryContext.getInput(); + String taskName = retryContext.getTaskName(); + + if(taskName.equalsIgnoreCase(FailureActivity.class.getName())) { + logger.info("FailureActivity Input: {}", input); + Instant timestampInput = (Instant) input; + // Add a second to ensure, it is 100% passed the time to success + Instant timeToSuccess = timestampInput.plusSeconds(FailureActivity.TIME_TO_SUCCESS + 1); + long timeToWait = timestampInput.until(timeToSuccess, TimeUnit.SECONDS.toChronoUnit()); + + logger.info("Waiting {} seconds before retrying.", timeToWait); + workflowContext.createTimer(Duration.ofSeconds(timeToWait)).await(); + logger.info("Send request to FailureActivity"); + } + + return true; + } +} +``` + +Start the workflow and client using the following commands: + + +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.retryhandler.DemoRetryHandlerWorker +``` + +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.retryhandler.DemoRetryHandlerClient +``` + + + +The worker logs: +```text +== APP == 2025-06-16 14:13:42,821 {HH:mm:ss.SSS} [pool-2-thread-1] INFO io.dapr.workflows.WorkflowContext - Starting RetryWorkflow: io.dapr.examples.workflows.retryhandler.DemoRetryWorkflow +== APP == 2025-06-16 14:13:42,821 {HH:mm:ss.SSS} [pool-2-thread-1] INFO io.dapr.workflows.WorkflowContext - RetryWorkflow is calling Activity: io.dapr.examples.workflows.retryhandler.FailureActivity +== APP == 2025-06-16 14:13:42,851 {HH:mm:ss.SSS} [pool-2-thread-1] INFO i.d.e.w.retryhandler.FailureActivity - Starting Activity: io.dapr.examples.workflows.retryhandler.FailureActivity +== APP == 2025-06-16 14:13:42,861 {HH:mm:ss.SSS} [pool-2-thread-1] INFO i.d.e.w.retryhandler.FailureActivity - Input timestamp: 2025-06-16T18:13:42.820Z +== APP == 2025-06-16 14:13:42,861 {HH:mm:ss.SSS} [pool-2-thread-1] INFO i.d.e.w.retryhandler.FailureActivity - Throwing exception for Activity: io.dapr.examples.workflows.retryhandler.FailureActivity +== APP == 2025-06-16 14:13:42,901 {HH:mm:ss.SSS} [pool-2-thread-1] INFO io.dapr.workflows.WorkflowContext - FailureActivity Input: 2025-06-16T18:13:42.820Z +== APP == 2025-06-16 14:13:42,901 {HH:mm:ss.SSS} [pool-2-thread-1] INFO io.dapr.workflows.WorkflowContext - Waiting 11 seconds before retrying. +== APP == Jun 16, 2025 2:13:52 PM io.dapr.durabletask.TaskOrchestrationExecutor$ContextImplTask$RetriableTask shouldRetry +== APP == INFO: shouldRetryBasedOnHandler: true +== APP == 2025-06-16 14:13:53,052 {HH:mm:ss.SSS} [pool-2-thread-1] INFO i.d.e.w.retryhandler.FailureActivity - Starting Activity: io.dapr.examples.workflows.retryhandler.FailureActivity +== APP == 2025-06-16 14:13:53,052 {HH:mm:ss.SSS} [pool-2-thread-1] INFO i.d.e.w.retryhandler.FailureActivity - Input timestamp: 2025-06-16T18:13:42.820Z +== APP == 2025-06-16 14:13:53,052 {HH:mm:ss.SSS} [pool-2-thread-1] INFO i.d.e.w.retryhandler.FailureActivity - Completing Activity: io.dapr.examples.workflows.retryhandler.FailureActivity +== APP == Jun 16, 2025 2:13:53 PM io.dapr.durabletask.TaskOrchestrationExecutor$ContextImplTask$RetriableTask shouldRetry +== APP == INFO: shouldRetryBasedOnHandler: true +``` + +The client log: +```text +Started a new external-event workflow with instance ID: 9f3c70b6-329d-4715-95ed-6ec9bc55ca39 +workflow instance with ID: 9f3c70b6-329d-4715-95ed-6ec9bc55ca39 completed with result: 2025-06-16T18:06:24.068590500Z ``` \ No newline at end of file