Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Azure.Core.Serialization;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -259,6 +260,7 @@ static string BuildUrl(string url, params string?[] queryValues)
{
Id = instanceId,
PurgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters),
RestartPostUri = BuildUrl($"{instanceUrl}/restart", commonQueryParameters),
SendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters),
StatusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters),
TerminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}", commonQueryParameters),
Expand All @@ -273,6 +275,41 @@ private static ObjectSerializer GetObjectSerializer(HttpResponseData response)
?? throw new InvalidOperationException("A serializer is not configured for the worker.");
}

/// <summary>
/// Restarts an existing orchestration instance with the original input.
/// </summary>
/// <param name="client">The <see cref="DurableTaskClient"/>.</param>
/// <param name="instanceId">The ID of the orchestration instance to restart.</param>
/// <param name="restartWithNewInstanceId">If true, starts with a new instance ID; otherwise, uses the same instance ID.</param>
/// <param name="cancellation">A token that signals if the operation should be canceled.</param>
/// <returns>The new instance ID.</returns>
public static async Task<string> RestartAsync(
this DurableTaskClient client,
string instanceId,
bool restartWithNewInstanceId = true,
CancellationToken cancellation = default)
{
// Get the status of the existing instance, including input
OrchestrationMetadata? status = await client.GetInstancesAsync(instanceId, getInputsAndOutputs: true, cancellation)
?? throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.");

// Deserialize the input.
string orchestratorName = status.Name;
string? input = status.SerializedInput != null
? System.Text.Json.JsonSerializer.Deserialize<string>(status.SerializedInput)
: null;

if (restartWithNewInstanceId)
{
return await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, null, cancellation);
}
else
{
var options = new StartOrchestrationOptions { InstanceId = instanceId };
return await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, options, cancellation);
}
}

private static string? GetBaseUrlFromRequest(HttpRequestData request)
{
// Default to the scheme from the request URL
Expand Down Expand Up @@ -319,7 +356,6 @@ private static ObjectSerializer GetObjectSerializer(HttpResponseData response)
return $"{proto}://{host}";
}


private static string? GetQueryParams(DurableTaskClient client)
{
return client is FunctionsDurableTaskClient functions ? functions.QueryString : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ private static void AssertHttpManagementPayload(HttpManagementPayload payload, s
{
Assert.Equal(instanceId, payload.Id);
Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.PurgeHistoryDeleteUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/restart", payload.RestartPostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/raiseEvent/{{eventName}}", payload.SendEventPostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.StatusQueryGetUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/terminate?reason={{{{text}}}}", payload.TerminatePostUri);
Expand Down
54 changes: 54 additions & 0 deletions test/e2e/Apps/BasicDotNetIsolated/RestartOrchestration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using System.Net;

namespace Microsoft.Azure.Durable.Tests.E2E;

public static class RestartOrchestration
{
[Function(nameof(RestartOrchestrator))]
public static string RestartOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string? input = context.GetInput<string>();
return "Hello " + input;
}

[Function("RestartOrchestration_HttpStart")]
public static async Task<HttpResponseData> HttpStartRestartOrchestration(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RestartOrchestrator), input: "World");
return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

public class RestartRequest
{
public string InstanceId { get; set; } = string.Empty;
public bool RestartWithNewInstanceId { get; set; }
}

[Function("RestartOrchestration_HttpRestart")]
public static async Task<HttpResponseData> HttpRestartOrchestration(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
var data = await req.ReadFromJsonAsync<RestartRequest>();
if (data == null)
{
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string newInstanceId = await client.RestartAsync(data.InstanceId,data.RestartWithNewInstanceId);

return await client.CreateCheckStatusResponseAsync(req, newInstanceId);
}
}
81 changes: 81 additions & 0 deletions test/e2e/Tests/Tests/RestartOrchestrationTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Net;
using System.Text.Json;
using Xunit;
using Xunit.Abstractions;

namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E;

[Collection(Constants.FunctionAppCollectionName)]
public class RestartOrchestrationTests
{
private readonly FunctionAppFixture fixture;
private readonly ITestOutputHelper output;

public RestartOrchestrationTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper)
{
this.fixture = fixture;
this.fixture.TestLogs.UseTestLogger(testOutputHelper);
this.output = testOutputHelper;
}

[Theory]
[InlineData(false)]
[InlineData(true)]
// Test behavior of restartasync of durabletaskclient.
// when restart with a instanceid and startwithnewinstanceid is false, the orchestration should be restarted with the same instance id.
// and the output should be the same as the original orchestration.
// when restart with a instanceid and startwithnewinstanceid is true, the orchestration should be restarted with a new instance id.
// and the output should be same as the original orchestration.
public async Task RestartOrchestration_CreatedTimeAndOutputChange(bool restartWithNewInstanceId)
{
// Start the orchestration
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("RestartOrchestration_HttpStart", "");
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);
string instanceId = await DurableHelpers.ParseInstanceIdAsync(response);

await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 10);
var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
string output1 = orchestrationDetails.Output;
DateTime createdTime1 = orchestrationDetails.CreatedTime;

// best practice to wait for 1 seconds before restarting orchestration to avoid race condition.
await Task.Delay(1000);

var restartPayload = new {
InstanceId = instanceId,
RestartWithNewInstanceId = restartWithNewInstanceId
};

string jsonBody = JsonSerializer.Serialize(restartPayload);

// Restart the orchestrator with the same instance id)
using HttpResponseMessage restartResponse = await HttpHelpers.InvokeHttpTriggerWithBody(
"RestartOrchestration_HttpRestart", jsonBody, "application/json");
Assert.Equal(HttpStatusCode.Accepted, restartResponse.StatusCode);
string restartStatusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(restartResponse);
string restartInstanceId = await DurableHelpers.ParseInstanceIdAsync(restartResponse);

await DurableHelpers.WaitForOrchestrationStateAsync(restartStatusQueryGetUri, "Completed", 10);
var restartOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(restartStatusQueryGetUri);
string output2 = restartOrchestrationDetails.Output;
DateTime createdTime2 = restartOrchestrationDetails.CreatedTime;

// The outputs and created times should be different
Assert.Equal(output1, output2);
Assert.NotEqual(createdTime1, createdTime2);

if (restartWithNewInstanceId)
{
// If restartWithNewInstanceId is True, the two instanceId should be different.
Assert.NotEqual(instanceId, restartInstanceId);
}
else
{
Assert.Equal(instanceId, restartInstanceId);
}
}
}
Loading