diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index ca3777064b9..bbafbf252f0 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -227,7 +227,7 @@ func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) { if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil { return nil, err } - if err := ctx.WaitForExternalEvent("businessEvent", time.Second*60).Await(&output); err != nil { + if err := ctx.WaitForExternalEvent("businessEvent", time.Minute*60).Await(&output); err != nil { return nil, err } @@ -475,7 +475,7 @@ func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) { if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil { return nil, err } - if err := ctx.WaitForExternalEvent("businessEvent", time.Second*60).Await(&output); err != nil { + if err := ctx.WaitForExternalEvent("businessEvent", time.Minute*60).Await(&output); err != nil { return nil, err } @@ -666,6 +666,7 @@ def main(): if non_existent_id_error in err._message: print('Instance Successfully Purged') + sleep(10000) wfr.shutdown() @@ -685,139 +686,79 @@ if __name__ == '__main__': - `WorkflowRuntime`: Allows you to register workflows and workflow activities - `DaprWorkflowContext`: Allows you to [create workflows]({{% ref "#write-the-workflow" %}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{% ref "#write-the-workflow-activities" %}}) -- API calls. In the example below, these calls start, terminate, get status, pause, resume, raise event, and purge the workflow. - -```javascript -import { TaskHubGrpcClient } from "@microsoft/durabletask-js"; -import { WorkflowState } from "./WorkflowState"; -import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index"; -import { TWorkflow } from "../../types/workflow/Workflow.type"; -import { getFunctionName } from "../internal"; -import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption"; - -/** DaprWorkflowClient class defines client operations for managing workflow instances. */ - -export default class DaprWorkflowClient { - private readonly _innerClient: TaskHubGrpcClient; - - /** Initialize a new instance of the DaprWorkflowClient. - */ - constructor(options: Partial = {}) { - const grpcEndpoint = generateEndpoint(options); - options.daprApiToken = getDaprApiToken(options); - this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options); - } +- API calls. The following example is a simple project consuming the workflow APIs: - private buildInnerClient(hostAddress: string, options: Partial): TaskHubGrpcClient { - let innerOptions = options?.grpcOptions; - if (options.daprApiToken !== undefined && options.daprApiToken !== "") { - innerOptions = { - ...innerOptions, - interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])], - }; - } - return new TaskHubGrpcClient(hostAddress, innerOptions); - } +```bash +mkdir my-wf && cd my-wf +npm init -y +npm i @dapr/dapr @microsoft/durabletask-js +npm i -D typescript ts-node @types/node +``` - /** - * Schedule a new workflow using the DurableTask client. - */ - public async scheduleNewWorkflow( - workflow: TWorkflow | string, - input?: any, - instanceId?: string, - startAt?: Date, - ): Promise { - if (typeof workflow === "string") { - return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt); - } - return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt); - } +Create the following `tsconfig.json` file: - /** - * Terminate the workflow associated with the provided instance id. - * - * @param {string} workflowInstanceId - Workflow instance id to terminate. - * @param {any} output - The optional output to set for the terminated workflow instance. - */ - public async terminateWorkflow(workflowInstanceId: string, output: any) { - await this._innerClient.terminateOrchestration(workflowInstanceId, output); - } +```json +{ + "compilerOptions": { + "target": "ES2020", + "module": "CommonJS", + "moduleResolution": "Node", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "outDir": "dist" + }, + "include": ["src"] +} +``` - /** - * Fetch workflow instance metadata from the configured durable store. - */ - public async getWorkflowState( - workflowInstanceId: string, - getInputsAndOutputs: boolean, - ): Promise { - const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs); - if (state !== undefined) { - return new WorkflowState(state); - } - } +Create the following `src/app.ts` file: - /** - * Waits for a workflow to start running - */ - public async waitForWorkflowStart( - workflowInstanceId: string, - fetchPayloads = true, - timeoutInSeconds = 60, - ): Promise { - const state = await this._innerClient.waitForOrchestrationStart( - workflowInstanceId, - fetchPayloads, - timeoutInSeconds, - ); - if (state !== undefined) { - return new WorkflowState(state); - } - } +```typescript +import { + WorkflowRuntime, + WorkflowActivityContext, + WorkflowContext, + DaprWorkflowClient, + TWorkflow +} from "@dapr/dapr"; - /** - * Waits for a workflow to complete running - */ - public async waitForWorkflowCompletion( - workflowInstanceId: string, - fetchPayloads = true, - timeoutInSeconds = 60, - ): Promise { - const state = await this._innerClient.waitForOrchestrationCompletion( - workflowInstanceId, - fetchPayloads, - timeoutInSeconds, - ); - if (state != undefined) { - return new WorkflowState(state); - } - } +const workflowClient = new DaprWorkflowClient(); +const workflowRuntime = new WorkflowRuntime(); - /** - * Sends an event notification message to an awaiting workflow instance - */ - public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) { - this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload); - } +// simple activity +const hello = async (_: WorkflowActivityContext, name: string) => `Hello ${name}!`; - /** - * Purges the workflow instance state from the workflow state store. - */ - public async purgeWorkflow(workflowInstanceId: string): Promise { - const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId); - if (purgeResult !== undefined) { - return purgeResult.deletedInstanceCount > 0; - } - return false; - } +// simple workflow: call the activity 3 times +const sequence: TWorkflow = async function* (ctx: WorkflowContext): any { + const out: string[] = []; + out.push(yield ctx.callActivity(hello, "Tokyo")); + out.push(yield ctx.callActivity(hello, "Seattle")); + out.push(yield ctx.callActivity(hello, "London")); + out.push(yield ctx.waitForExternalEvent("continue")); + return out; +}; + +async function main() { + workflowRuntime.registerWorkflow(sequence).registerActivity(hello); + await workflowRuntime.start(); + + const id = await workflowClient.scheduleNewWorkflow(sequence); + console.log("Scheduled:", id); + + workflowClient.raiseEvent(id, "continue", "Go go go!"); + + const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30); + console.log("Done:", state?.runtimeStatus, "output:", state?.serializedOutput); + + await new Promise(f => setTimeout(f, 100000)); + + await workflowRuntime.stop(); + await workflowClient.stop(); - /** - * Closes the inner DurableTask client and shutdown the GRPC channel. - */ - public async stop() { - await this._innerClient.stop(); - } } + +main().catch((e) => { console.error(e); }); ``` {{% /tab %}} @@ -960,6 +901,7 @@ import ( "errors" "fmt" "log" + "strconv" "time" "github.com/dapr/durabletask-go/workflow" @@ -1004,7 +946,7 @@ func main() { // "start". This is useful for increasing the throughput of creating // workflows. // workflow.WithStartTime(time.Now()) - instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) + instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput("1")) if err != nil { log.Fatalf("failed to start workflow: %v", err) } @@ -1058,9 +1000,7 @@ func main() { fmt.Printf("stage: %d\n", stage) - waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - _, err = wclient.WaitForWorkflowCompletion(waitCtx, instanceID) - cancel() + _, err = wclient.WaitForWorkflowCompletion(ctx, instanceID) if err != nil { log.Fatalf("failed to wait for workflow: %v", err) } @@ -1090,7 +1030,7 @@ func main() { fmt.Printf("stage: %d\n", stage) // Terminate workflow test - id, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) + id, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput("1")) if err != nil { log.Fatalf("failed to start workflow: %v", err) } @@ -1114,27 +1054,28 @@ func main() { } fmt.Println("workflow purged") + <-ctx.Done() cancel() fmt.Println("workflow worker successfully shutdown") } func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) { - var input int + var input string if err := ctx.GetInput(&input); err != nil { return nil, err } var output string - if err := ctx.CallActivity(BusinessActivity, task.WithActivityInput(input)).Await(&output); err != nil { + if err := ctx.CallActivity(BusinessActivity, workflow.WithActivityInput(input)).Await(&output); err != nil { return nil, err } - err := ctx.WaitForSingleEvent("businessEvent", time.Second*60).Await(&output) + err := ctx.WaitForExternalEvent("businessEvent", time.Minute*60).Await(&output) if err != nil { return nil, err } - if err := ctx.CallActivity(BusinessActivity, task.WithActivityInput(input)).Await(&output); err != nil { + if err := ctx.CallActivity(BusinessActivity, workflow.WithActivityInput(input)).Await(&output); err != nil { return nil, err } @@ -1150,13 +1091,18 @@ func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) { return output, nil } -func BusinessActivity(ctx task.ActivityContext) (any, error) { - var input int +func BusinessActivity(ctx workflow.ActivityContext) (any, error) { + var input string if err := ctx.GetInput(&input); err != nil { return "", err } - stage += input + iinput, err := strconv.Atoi(input) + if err != nil { + return "", err + } + + stage += iinput return fmt.Sprintf("Stage: %d", stage), nil } @@ -1179,6 +1125,250 @@ Because of how replay-based workflows execute, you'll write logic that does thin {{% /alert %}} +## Testing your workflow + +After authoring your workflow, test it using the Dapr CLI: + +{{< tabpane text=true >}} + +{{% tab "Python" %}} + +#### Run the workflow application + +```bash +dapr run --app-id workflow-app python3 app.py +``` +Make sure the application is running: + +```bash +dapr list +``` + +#### Run the workflow +```bash +dapr workflow run hello_world_wf --app-id workflow-app --input 'hello world' --instance-id test-run +``` + +#### Check the workflow status +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 -o wide +``` + +#### Check completed workflows +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 --filter-status COMPLETED -o wide +``` + +#### View workflow history +```bash +dapr workflow history --app-id workflow-app test-run +``` + +{{% /tab %}} + +{{% tab "Javascript" %}} + +#### Run the workflow application + +```bash +dapr run --app-id workflow-app npx ts-node src/app.ts +``` +Make sure the application is running: + +```bash +dapr list +``` + +#### Run the workflow +```bash +dapr workflow run sequence --app-id workflow-app --input 'hello world' --instance-id test-run +``` + +#### Check the workflow status +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 -o wide +``` + +#### Raise the waiting external event +```bash +dapr workflow raise-event --app-id workflow-app test-run/businessEvent +``` + +#### Check completed workflows +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 --filter-status COMPLETED -o wide +``` + +#### View workflow history +```bash +dapr workflow history --app-id workflow-app test-run +``` + +{{% /tab %}} + +{{% tab ".NET" %}} + +#### Run the workflow application + +```bash +dapr run --app-id workflow-app dotnet run +``` +Make sure the application is running: + +```bash +dapr list +``` + +#### Run the workflow +```bash +dapr workflow run OrderProcessingWorkflow --app-id workflow-app --instance-id test-run --input '{"name": "Paperclips", "totalCost": 99.95}' +``` + +#### Check the workflow status +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 -o wide +``` + +#### Raise the waiting external event +```bash +dapr workflow raise-event --app-id workflow-app test-run/incoming-purchase-order --input '{"name": "Paperclips", "totalCost": 99.95}' +``` + +#### Check completed workflows +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 --filter-status COMPLETED -o wide +``` + +#### View workflow history +```bash +dapr workflow history --app-id workflow-app test-run +``` + +{{% /tab %}} + +{{% tab "Java" %}} + +#### Run the workflow application + +```bash +dapr run --app-id workflow-app -- java -jar target/WorkflowService-0.0.1-SNAPSHOT.jar +``` + +Make sure the application is running: + +```bash +dapr list +``` + +#### Run the workflow +```bash +dapr workflow run DemoWorkflow --app-id workflow-app --instance-id test-run --input "input data" +``` + +#### Check the workflow status +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 -o wide +``` + +#### Raise the waiting external event +```bash +dapr workflow raise-event --app-id workflow-app test-run/TestEvent --input 'TestEventPayload' +dapr workflow raise-event --app-id workflow-app test-run/event1 --input 'TestEvent 1 Payload' +dapr workflow raise-event --app-id workflow-app test-run/event2 --input 'TestEvent 2 Payload' +dapr workflow raise-event --app-id workflow-app test-run/event3 --input 'TestEvent 3 Payload' +``` + +#### Check completed workflows +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 --filter-status COMPLETED -o wide +``` + +#### View workflow history +```bash +dapr workflow history --app-id workflow-app test-run +``` + +{{% /tab %}} + +{{% tab "Go" %}} + +#### Run the workflow application +```bash +dapr run --app-id workflow-app go run main.go +``` + +Make sure the application is running: + +```bash +dapr list +``` + +#### Run the workflow +```bash +dapr workflow run BusinessWorkflow --app-id workflow-app --input '1' --instance-id test-run +``` + +#### Check the workflow status +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 -o wide +``` + +#### Raise the waiting external event +```bash +dapr workflow raise-event --app-id workflow-app test-run/businessEvent +``` + +#### Check completed workflows +```bash +dapr workflow list --app-id workflow-app --connection-string=redis://127.0.0.1:6379 --filter-status COMPLETED -o wide +``` + +#### View workflow history +```bash +dapr workflow history test-run --app-id workflow-app +``` + +{{% /tab %}} + +{{< /tabpane >}} + + +### Monitor Workflow Execution + +```bash +dapr workflow list --app-id workflow-app --filter-status RUNNING -o wide +``` + +```bash +dapr workflow list --app-id workflow-app --filter-status FAILED -o wide +``` + +```bash +dapr workflow list --app-id workflow-app --filter-status COMPLETED -o wide +``` + +### Test External Events + +```bash +# Raise an event your workflow is waiting for +dapr workflow raise-event /ApprovalReceived \ + --app-id workflow-app \ + --input '{"approved": true, "approver": "manager@company.com"}' +``` + +### Debug Failed Workflows + +```bash +# List failed workflows +dapr workflow list --app-id workflow-app --filter-status FAILED --output wide + +# Get detailed history of a failed workflow +dapr workflow history --app-id workflow-app --output json + +# Re-run the workflow after fixing issues +dapr workflow rerun --app-id workflow-app --input '' +``` + ## Next steps Now that you've authored a workflow, learn how to manage it. diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index 8b5dd89f855..db41b48205a 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -6,7 +6,7 @@ weight: 6000 description: Manage and run workflows --- -Now that you've [authored the workflow and its activities in your application]({{% ref howto-author-workflow.md %}}), you can start, terminate, and get information about the workflow using HTTP API calls. For more information, read the [workflow API reference]({{% ref workflow_api.md %}}). +Now that you've [authored the workflow and its activities in your application]({{% ref howto-author-workflow.md %}}), you can start, terminate, and get information about the workflow using the CLI or API calls. For more information, read the [workflow API reference]({{% ref workflow_api.md %}}). {{< tabpane text=true >}} @@ -69,6 +69,296 @@ dapr scheduler import -f workflow-reminders-backup.bin - Create workflow reminders via the Workflow API. - Manage reminders (list, get, delete, backup/restore) with the dapr scheduler CLI. +## Managing Workflows with the Dapr CLI + +The Dapr CLI provides commands for managing workflow instances in both self-hosted and Kubernetes environments. + +### Prerequisites + +- Dapr CLI version 1.16.2 or later +- A running Dapr application that has registered a workflow +- For database operations: network access to your actor state store + +### Basic Workflow Operations + +#### Start a Workflow + +```bash +# Using the `orderprocessing` application, start a new workflow instance with input data +dapr workflow run OrderProcessingWorkflow \ + --app-id orderprocessing \ + --input '{"orderId": "12345", "amount": 100.50}' + +# Start with a new workflow with a specific instance ID +dapr workflow run OrderProcessingWorkflow \ + --app-id orderprocessing \ + --instance-id order-12345 \ + --input '{"orderId": "12345"}' + +# Schedule a new workflow to start at 10:00:00 AM on December 25, 2024, Coordinated Universal Time (UTC). +dapr workflow run OrderProcessingWorkflow \ + --app-id orderprocessing \ + --start-time "2024-12-25T10:00:00Z" +``` + +#### List Workflow Instances + +```bash +# List all workflows for an app +dapr workflow list --app-id orderprocessing + +# Filter by status +dapr workflow list --app-id orderprocessing --filter-status RUNNING + +# Filter by workflow name +dapr workflow list --app-id orderprocessing --filter-name OrderProcessingWorkflow + +# Filter by age (workflows started in last 24 hours) +dapr workflow list --app-id orderprocessing --filter-max-age 24h + +# Get detailed output +dapr workflow list --app-id orderprocessing --output wide +``` + +#### View Workflow History + +```bash +# Get execution history +dapr workflow history order-12345 --app-id orderprocessing + +# Get history in JSON format +dapr workflow history order-12345 --app-id orderprocessing --output json +``` + +#### Control Workflow Execution + +```bash +# Suspend a running workflow +dapr workflow suspend order-12345 \ + --app-id orderprocessing \ + --reason "Waiting for manual approval" + +# Resume a suspended workflow +dapr workflow resume order-12345 \ + --app-id orderprocessing \ + --reason "Approved by manager" + +# Terminate a workflow +dapr workflow terminate order-12345 \ + --app-id orderprocessing \ + --output '{"reason": "Cancelled by customer"}' +``` + +#### Raise External Events + +```bash +# Raise an event for a waiting workflow +dapr workflow raise-event order-12345/PaymentReceived \ + --app-id orderprocessing \ + --input '{"paymentId": "pay-67890", "amount": 100.50}' +``` + +#### Re-run Workflows + +```bash +# Re-run from the beginning +dapr workflow rerun order-12345 --app-id orderprocessing + +# Re-run from a specific event +dapr workflow rerun order-12345 \ + --app-id orderprocessing \ + --event-id 5 + +# Re-run with a new instance ID +dapr workflow rerun order-12345 \ + --app-id orderprocessing \ + --new-instance-id order-12345-retry +``` + +#### Purge Completed Workflows + +Note that purging a workflow from the CLI will also delete all associated Scheduler reminders. + +{{% alert title="Important" color="warning" %}} +It is required that a workflow client is running in the application to perform purge operations. +The workflow client connection is required in order to preserve the workflow state machine integrity and prevent corruption. +Errors like the following suggest that the workflow client is not running: +``` +failed to purge orchestration state: rpc error: code = FailedPrecondition desc = failed to purge orchestration state: failed to lookup actor: api error: code = FailedPrecondition desc = did not find address for actor +``` +{{% /alert %}} + +```bash +# Purge a specific instance +dapr workflow purge order-12345 --app-id orderprocessing + +# Purge all completed workflows older than 30 days +dapr workflow purge --app-id orderprocessing --all-older-than 720h + +# Purge all terminal workflows (use with caution!) +dapr workflow purge --app-id orderprocessing --all +``` + +### Kubernetes Operations + +All commands support the `-k` flag for Kubernetes deployments: + +```bash +# List workflows in Kubernetes +dapr workflow list \ + --kubernetes \ + --namespace production \ + --app-id orderprocessing + +# Suspend a workflow in Kubernetes +dapr workflow suspend order-12345 \ + --kubernetes \ + --namespace production \ + --app-id orderprocessing \ + --reason "Maintenance window" +``` + +### Advanced: Direct Database Access + +For advanced operations like listing and purging workflows, you can connect directly to the actor state store database. This is useful for: + +- Querying workflows across multiple app instances +- Bulk operations on workflow metadata +- Custom filtering beyond what the API provides + +#### Self-Hosted Mode + +In self-hosted mode, the CLI can automatically discover your state store configuration: + +```bash +# The CLI reads your component configuration automatically +dapr workflow list --app-id orderprocessing --connection-string=redis://127.0.0.1:6379 +``` + +To override with a specific connection string: + +```bash +# PostgreSQL +dapr workflow list \ + --app-id orderprocessing \ + --connection-string "host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable" \ + --table-name actor-store + +# MySQL +dapr workflow list \ + --app-id orderprocessing \ + --connection-string "dapr:dapr@tcp(localhost:3306)/dapr?parseTime=true" \ + --table-name actor-store + +# SQL Server +dapr workflow list \ + --app-id orderprocessing \ + --connection-string "sqlserver://dapr:Pass@word1@localhost:1433?database=dapr" \ + --table-name abc + +# Redis +dapr workflow list \ + --app-id orderprocessing \ + --connection-string=redis://user:mypassword@127.0.0.1:6379 \ +``` + +#### Kubernetes Mode with Port Forwarding + +In Kubernetes, you need to establish connectivity to your database: + +**Step 1: Port forward to your database service** + +```bash +# PostgreSQL +kubectl port-forward service/postgres 5432:5432 -n production + +# MySQL +kubectl port-forward service/mysql 3306:3306 -n production + +# SQL Server +kubectl port-forward service/mssql 1433:1433 -n production + +# Redis +kubectl port-forward service/redis 6379:6379 -n production +``` + +**Step 2: Use the CLI with the connection string** + +```bash +# PostgreSQL example +dapr workflow list \ + --kubernetes \ + --namespace production \ + --app-id orderprocessing \ + --connection-string "host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable" \ + --table-name workflows + +# Purge old workflows +dapr workflow purge \ + --kubernetes \ + --namespace production \ + --app-id orderprocessing \ + --connection-string "host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable" \ + --table-name workflows \ + --all-older-than 2160h # 90 days +``` + +**Step 3: Stop port forwarding when done** + +```bash +# Press Ctrl+C to stop the port forward +``` + +#### Connection String Formats by Database + +**PostgreSQL / CockroachDB** +``` +host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable connect_timeout=10 +``` + +**MySQL** +``` +username:password@tcp(host:port)/database?parseTime=true&loc=UTC +``` + +**SQL Server** +``` +sqlserver://username:password@host:port?database=dbname&encrypt=false +``` + +**MongoDB** +``` +mongodb://username:password@localhost:27017/database +``` + +**Redis** +``` +redis://127.0.0.1:6379 +``` + +### Workflow Management Best Practices + +1. **Regular Cleanup**: Schedule periodic purge operations for completed workflows + ```bash + # Weekly cron job to purge workflows older than 90 days + dapr workflow purge --app-id orderprocessing --all-older-than 2160h + ``` + +2. **Monitor Running Workflows**: Use filtered lists to track long-running instances + ```bash + dapr workflow list --app-id orderprocessing --filter-status RUNNING --filter-max-age 24h + ``` + +3. **Use Instance IDs**: Assign meaningful instance IDs for easier tracking + ```bash + dapr workflow run OrderWorkflow --app-id orderprocessing --instance-id "order-$(date +%s)" + ``` + +4. **Export for Analysis**: Export workflow data for analysis + ```bash + dapr workflow list --app-id orderprocessing --output json > workflows.json + ``` + {{% /tab %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md index e5d77c2c18e..47893aab80a 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md @@ -189,6 +189,45 @@ This number may be larger or smaller depending on retries or concurrency. | Raise event | 3 records | | Start child workflow | 8 records | +#### Direct Database Access + +For advanced operations, you can access workflow data directly: + +```bash +# Port forward to a postgres database in Kubernetes +kubectl port-forward service/postgres 5432:5432 + +# Query workflows directly +dapr workflow list \ + --app-id myapp \ + --connection-string "host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable" \ + --table-name workflows +``` + +```bash +# Port forward to redis database in Kubernetes +kubectl port-forward service/redis 6379:6379 + +# Query workflows directly +dapr workflow list \ + --app-id myapp \ + --connection-string redis://127.0.0.1:6379 \ + --table-name workflows +``` + +### Supported State Stores + +The workflow engine supports these state stores: +- PostgreSQL +- MySQL +- SQL Server +- SQLite +- Oracle Database +- CockroachDB +- MongoDB +- Redis + + ## Workflow scalability Because Dapr Workflows are internally implemented using actors, Dapr Workflows have the same scalability characteristics as actors. diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md index 2d218235090..58c7df2b8d7 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md @@ -24,6 +24,62 @@ There are several different kinds of tasks that a workflow can schedule, includi - [Child workflows]({{% ref "workflow-features-concepts.md#child-workflows" %}}) for breaking larger workflows into smaller pieces - [External event waiters]({{% ref "workflow-features-concepts.md#external-events" %}}) for blocking workflows until they receive external event signals. These tasks are described in more details in their corresponding sections. +## Workflow Instance Management + +### Querying Workflow State + +You can query workflow instances using the CLI: + +```bash +# Find all running workflows +dapr workflow list --app-id myapp --filter-status RUNNING + +# Find workflows by name +dapr workflow list --app-id myapp --filter-name OrderProcessing + +# Find recent workflows (last 2 hours) +dapr workflow list --app-id myapp --filter-max-age 2h + +# Get detailed JSON output +dapr workflow list --app-id myapp --output json +``` + +### Workflow History + +View the complete execution history: + +```bash +dapr workflow history wf-12345 --app-id myapp --output json +``` + +This shows all events, activities, and state transitions. + +## External Events + +### Raising Events via CLI + +```bash +dapr workflow raise-event wf-12345/ApprovalReceived \ + --app-id myapp \ + --input '{"approved": true, "comments": "Approved by manager"}' +``` + +## Workflow Suspension and Resumption + +### Using the CLI + +```bash +# Suspend for manual intervention +dapr workflow suspend wf-12345 \ + --app-id myapp \ + --reason "Awaiting customer response" + +# Resume when ready +dapr workflow resume wf-12345 \ + --app-id myapp \ + --reason "Customer responded" +``` + ### Workflow identity Each workflow you define has a type name, and individual executions of a workflow require a unique _instance ID_. Workflow instance IDs can be generated by your app code, which is useful when workflows correspond to business entities like documents or jobs, or can be auto-generated UUIDs. A workflow's instance ID is useful for debugging and also for managing workflows using the [Workflow APIs]({{% ref workflow_api.md %}}). diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md index 5c79019fafd..cce02d3316b 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md @@ -114,6 +114,42 @@ Want to put workflows to the test? Walk through the following quickstart and tut Want to skip the quickstarts? Not a problem. You can try out the workflow building block directly in your application. After [Dapr is installed]({{% ref install-dapr-cli.md %}}), you can begin using workflows, starting with [how to author a workflow]({{% ref howto-author-workflow.md %}}). +## Managing Workflows + +Dapr provides comprehensive workflow management capabilities through both the HTTP API and the CLI. + +### Workflow Lifecycle Operations + +**Start Workflows** +```bash +dapr workflow run MyWorkflow --app-id myapp --input '{"key": "value"}' +``` + +**Monitor Workflows** +```bash +# List active workflows for a given application +dapr workflow list --app-id myapp --filter-status RUNNING + +# View execution history +dapr workflow history --app-id myapp +``` + +**Control Workflows** +```bash +# Suspend, resume, or terminate +dapr workflow suspend --app-id myapp +dapr workflow resume --app-id myapp +dapr workflow terminate --app-id myapp +``` + +**Maintenance Operations** +```bash +# Purge completed workflows +dapr workflow purge --app-id myapp --all-older-than 720h +``` + +See [How-To: Manage workflows]({{< ref howto-manage-workflow.md >}}) for detailed instructions. + ## Limitations - **State stores:** You can only use state stores which support workflows, as [described here]({{% ref supported-state-stores %}}). diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index a0f0e66bbd4..8158ddfdbbc 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -307,6 +307,16 @@ In addition to the challenges mentioned in [the previous pattern]({{% ref "workf Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example: +```bash +# Start the workflow +dapr workflow run DataProcessingWorkflow \ + --app-id processor \ + --input '{"items": ["item1", "item2", "item3"]}' + +# Monitor parallel execution +dapr workflow history --app-id processor --output json +``` + {{< tabpane text=true >}} {{% tab "Python" %}} diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 72473207839..35839f5c511 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -1827,7 +1827,7 @@ func main() { TotalCost: totalCost, } - id, err := wfClient.ScheduleWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload)) + id, err := wfClient.ScheduleWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload), workflow.WithInstanceID("order-"+time.Now().Format("20060102150405"))) if err != nil { log.Fatalf("failed to start workflow: %v", err) } @@ -1847,6 +1847,7 @@ func main() { fmt.Printf("workflow status: %v\n", respFetch.String()) fmt.Println("Purchase of item is complete") + select {} } func restockInventory(daprClient client.Client, inventory []InventoryItem) error { @@ -1881,6 +1882,48 @@ import ( "github.com/dapr/go-sdk/client" ) +type OrderPayload struct { + ItemName string `json:"item_name"` + TotalCost int `json:"total_cost"` + Quantity int `json:"quantity"` +} + +type OrderResult struct { + Processed bool `json:"processed"` +} + +type InventoryItem struct { + ItemName string `json:"item_name"` + PerItemCost int `json:"per_item_cost"` + Quantity int `json:"quantity"` +} + +type InventoryRequest struct { + RequestID string `json:"request_id"` + ItemName string `json:"item_name"` + Quantity int `json:"quantity"` +} + +type InventoryResult struct { + Success bool `json:"success"` + InventoryItem InventoryItem `json:"inventory_item"` +} + +type PaymentRequest struct { + RequestID string `json:"request_id"` + ItemBeingPurchased string `json:"item_being_purchased"` + Amount int `json:"amount"` + Quantity int `json:"quantity"` +} + +type ApprovalRequired struct { + Approval bool `json:"approval"` +} + +type Notification struct { + Message string `json:"message"` +} + // OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process. func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { orderID := ctx.ID() @@ -2064,6 +2107,107 @@ func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) { {{< /tabpane >}} + +## Step 5: Manage Your Workflow + +Now that your workflow is running, let's learn how to manage it using the Dapr CLI. + +### View Running Workflows + +Open a separate terminal and run the following CLI commands. + +```bash +# List all workflows +dapr workflow list --app-id order-processor --connection-string=redis://127.0.0.1:6379 -o wide +``` + +You should see output like: + +``` +NAMESPACE APP ID NAME INSTANCE ID CREATED LAST UPDATE STATUS +default order-processor OrderProcessingWorkflow e4d3807c 2025-11-07T12:29:37Z 2025-11-07T12:29:52Z COMPLETED +``` + +### Check Workflow History + +View the detailed execution history of your workflow: + +```bash +dapr workflow history e4d3807c --app-id order-processor +``` + +You should see output like: + +``` +TYPE NAME EVENTID ELAPSED STATUS DETAILS +ExecutionStarted OrderProcessingWorkflow - Age:1.1m RUNNING orchestration start +OrchestratorStarted - - 13.4ms RUNNING replay cycle start +TaskScheduled NotifyActivity 0 1.3ms RUNNING activity=NotifyActivity +TaskCompleted - - 2.6ms RUNNING eventId=0 +OrchestratorStarted - - 2.6ms RUNNING replay cycle start +TaskScheduled VerifyInventoryActivity 1 637.6µs RUNNING activity=VerifyInventoryActivity +TaskCompleted - - 2.4ms RUNNING eventId=1 +OrchestratorStarted - - 1.7ms RUNNING replay cycle start +TaskScheduled ProcessPaymentActivity 2 439.3µs RUNNING activity=ProcessPaymentActivity +TaskCompleted - - 1.6ms RUNNING eventId=2 +OrchestratorStarted - - 1.5ms RUNNING replay cycle start +TaskScheduled UpdateInventoryActivity 3 311.2µs RUNNING activity=UpdateInventoryActivity +TaskCompleted - - 2.4ms RUNNING eventId=3 +OrchestratorStarted - - 2.7ms RUNNING replay cycle start +TaskScheduled NotifyActivity 4 354.1µs RUNNING activity=NotifyActivity +TaskCompleted - - 2.5ms RUNNING eventId=4 +OrchestratorStarted - - 1.6ms RUNNING replay cycle start +ExecutionCompleted - 5 517.1µs COMPLETED execDuration=38.7ms +``` + +### Interact with Your Workflow + +#### Raise an External Event + +If your workflow is waiting for an [external event]({{% ref "workflow-patterns.md#external-system-interaction" %}}), you can raise one. +It takes a single argument in the format of `/`. + +```bash +dapr workflow raise-event e4d3807c/ApprovalEvent \ + --app-id order-processor \ + --input '{"paymentId": "pay-123", "amount": 100.00}' +``` + +#### Suspend and Resume + +```bash +# Suspend a workflow +dapr workflow suspend e4d3807c \ + --app-id order-processor \ + --reason "Waiting for inventory" + +# Resume when ready +dapr workflow resume e4d3807c \ + --app-id order-processor \ + --reason "Inventory received" +``` + +### Clean Up + +After testing, purge completed workflows. + +{{% alert title="Important" color="warning" %}} +It is required that a workflow client is running in the application to perform purge operations. +The workflow client connection is required in order to preserve the workflow state machine integrity and prevent corruption. +Errors like the following suggest that the workflow client is not running: +``` +failed to purge orchestration state: rpc error: code = FailedPrecondition desc = failed to purge orchestration state: failed to lookup actor: api error: code = FailedPrecondition desc = did not find address for actor +``` +{{% /alert %}} + +```bash +# Purge a specific workflow +dapr workflow purge e4d3807c --app-id order-processor --connection-string=redis://127.0.0.1:6379 + +# Or purge all completed workflows +dapr workflow purge --app-id order-processor --connection-string=redis://127.0.0.1:6379 --all-older-than 1h +``` + ## Tell us what you think! We're continuously working to improve our Quickstart examples and value your feedback. Did you find this Quickstart helpful? Do you have suggestions for improvement? @@ -2075,5 +2219,6 @@ Join the discussion in our [discord channel](https://discord.com/channels/778680 - Set up Dapr Workflow with any programming language using [HTTP instead of an SDK]({{% ref howto-manage-workflow.md %}}) - Walk through a more in-depth [.NET SDK example workflow](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - Learn more about [Workflow as a Dapr building block]({{% ref workflow-overview %}}) +``` {{< button text="Explore Dapr tutorials >>" page="getting-started/tutorials/_index.md" >}} diff --git a/daprdocs/content/en/reference/cli/dapr-workflow.md b/daprdocs/content/en/reference/cli/dapr-workflow.md new file mode 100644 index 00000000000..f490a8a7ecd --- /dev/null +++ b/daprdocs/content/en/reference/cli/dapr-workflow.md @@ -0,0 +1,217 @@ +--- +type: docs +title: "workflow CLI command" +linkTitle: "workflow" +description: "Detailed information on the workflow CLI command" +--- + +Manage Dapr workflow instances. + +## Commands + +| Command | Description | +|---------|-------------| +| dapr workflow run | Start a new workflow instance | +| dapr workflow list | List workflow instances | +| dapr workflow history | Get workflow execution history | +| dapr workflow purge | Purge workflow instances | +| dapr workflow suspend | Suspend a workflow | +| dapr workflow resume | Resume a workflow | +| dapr workflow terminate | Terminate a workflow | +| dapr workflow raise-event | Raise an external event | +| dapr workflow rerun | Re-run a workflow | + +## Flags + +``` + -a, --app-id string The app ID owner of the workflow instance + -h, --help help for workflow + -k, --kubernetes Target a Kubernetes dapr installation + -n, --namespace string Namespace to perform workflow operation on (default "default") +``` + +## Examples + +### List workflows +```bash +dapr workflow list --app-id myapp +``` + +### Start a workflow +```bash +dapr workflow run MyWorkflow --app-id myapp --input '{"key": "value"}' +``` + +### Kubernetes mode +```bash +dapr workflow list -k -n production --app-id myapp +``` + +## List workflow instances for a given application. + +## Usage + +```bash +dapr workflow list [flags] +``` + +## Flags + +| Name | Type | Description | +|------|------|-------------| +| `--app-id`, `-a` | string | (Required) The app ID owner of the workflow instances | +| `--filter-name`, `-w` | string | Filter workflows by name | +| `--filter-status`, `-s` | string | Filter by status: RUNNING, COMPLETED, FAILED, CANCELED, TERMINATED, PENDING, SUSPENDED | +| `--filter-max-age`, `-m` | string | Filter workflows started within duration or timestamp (e.g., "300ms", "1.5h", "2023-01-02T15:04:05") | +| `--output`, `-o` | string | Output format: short, wide, yaml, json (default "short") | +| `--connection-string`, `-c` | string | Connection string to the actor state store | +| `--table-name`, `-t` | string | Table or collection name used as the actor state store | +| `--kubernetes`, `-k` | bool | Target a Kubernetes Dapr installation | +| `--namespace`, `-n` | string | Kubernetes namespace (default "default") | + +## Examples + +### Basic usage +```bash +dapr workflow list --app-id myapp +``` + +### Filter by status +```bash +dapr workflow list --app-id myapp --filter-status RUNNING +``` + +### Filter by workflow name +```bash +dapr workflow list --app-id myapp --filter-name OrderProcessing +``` + +### Filter by age +```bash +# Workflows from last 24 hours +dapr workflow list --app-id myapp --filter-max-age 24h + +# Workflows after specific date +dapr workflow list --app-id myapp --filter-max-age 2024-01-01T00:00:00Z +``` + +### JSON output +```bash +dapr workflow list --app-id myapp --output json +``` + +### Kubernetes with port forwarding +```bash +# Terminal 1: Port forward to database +kubectl port-forward service/postgres 5432:5432 -n production + +# Terminal 2: List workflows with direct database access +dapr workflow list \ + --kubernetes \ + --namespace production \ + --app-id myapp \ + --connection-string "host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable" \ + --table-name workflows +``` + +## Connection String Formats + +### PostgreSQL / CockroachDB +``` +host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable +``` + +### MySQL +``` +dapr:dapr@tcp(localhost:3306)/dapr?parseTime=true +``` + +### SQL Server +``` +sqlserver://dapr:Pass@word@localhost:1433?database=dapr +``` + +### MongoDB +``` +mongodb://localhost:27017/dapr +``` + +### Redis +``` +redis[s]://[[username][:password]@][host][:port][/db-number]: +``` + +## Purge workflow instances with terminal states (COMPLETED, FAILED, TERMINATED). + +## Usage + +```bash +dapr workflow purge [instance-id] [flags] +``` + +## Flags + +| Name | Type | Description | +|------|------|-------------| +| `--app-id`, `-a` | string | (Required) The app ID owner of the workflow instances | +| `--all` | bool | Purge all terminal workflow instances (use with caution) | +| `--all-older-than` | string | Purge instances older than duration or timestamp (e.g., "24h", "2023-01-02T15:04:05Z") | +| `--connection-string`, `-c` | string | Connection string to the actor state store | +| `--table-name`, `-t` | string | Table or collection name used as the actor state store | +| `--kubernetes`, `-k` | bool | Target a Kubernetes Dapr installation | +| `--namespace`, `-n` | string | Kubernetes namespace (default "default") | + +## Examples + +### Purge a specific instance +```bash +dapr workflow purge wf-12345 --app-id myapp +``` + +### Purge instances older than 30 days +```bash +dapr workflow purge --app-id myapp --all-older-than 720h +``` + +### Purge instances older than specific date +```bash +dapr workflow purge --app-id myapp --all-older-than 2023-12-01T00:00:00Z +``` + +### Purge all terminal instances (dangerous!) +```bash +dapr workflow purge --app-id myapp --all +``` + +### Kubernetes with database access +```bash +# Port forward to database +kubectl port-forward service/postgres 5432:5432 -n production + +# Purge old workflows +dapr workflow purge \ + --kubernetes \ + --namespace production \ + --app-id myapp \ + --connection-string "host=localhost user=dapr password=dapr dbname=dapr port=5432 sslmode=disable" \ + --table-name workflows \ + --all-older-than 2160h # 90 days +``` + +## Best Practices + +1. **Regular Cleanup**: Schedule periodic purge operations + ```bash + # Cron job to purge workflows older than 90 days + 0 2 * * 0 dapr workflow purge --app-id myapp --all-older-than 2160h + ``` + +2. **Test First**: Use list command to see what will be purged + ```bash + dapr workflow list --app-id myapp --filter-status COMPLETED --filter-max-age 2160h + ``` + +3. **Backup Before Bulk Purge**: Export data before using `--all` + ```bash + dapr workflow list --app-id myapp --output json > backup.json + ```