Skip to content

Commit 673a0fd

Browse files
esttenorioestenori
andauthored
.Net: Processes: Checkpoint state improvements (#12759)
### Description - Checkpoint improvements that allow interacting with storage in particular times -> end of superstep, initialization only. - Checkpoint allows "staging" states from process and steps and only uploading to cloud at the end of superstep - Separating functionality available for process vs step - Adding/Updating tests - Adding simple cosmos db connector for LocalRuntime demo ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [ ] The code builds clean without any errors or warnings - [ ] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [ ] All unit tests pass, and I have added new tests where possible - [ ] I didn't break anyone 😄 --------- Co-authored-by: Estefania Tenorio <[email protected]>
1 parent 8cfae13 commit 673a0fd

36 files changed

+1607
-417
lines changed

dotnet/SK-dotnet.slnx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
</Folder>
8080
<Folder Name="/samples/Demos/ProcessWithCloudEvents/">
8181
<File Path="samples/Demos/ProcessWithCloudEvents/README.md" />
82-
<Project Path="samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.Grpc/ProcessWithCloudEvents.Grpc.csproj" />
82+
<Project Path="samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.Grpc.LocalRuntime/ProcessWithCloudEvents.Grpc.LocalRuntime.csproj" />
8383
<Project Path="samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.Processes/ProcessWithCloudEvents.Processes.csproj" />
8484
</Folder>
8585
<Folder Name="/src/">

dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.Grpc.LocalRuntime/ProcessWithCloudEvents.Grpc.LocalRuntime.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
<PrivateAssets>all</PrivateAssets>
3838
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
3939
</PackageReference>
40+
<PackageReference Include="Microsoft.Azure.Cosmos" />
4041
</ItemGroup>
4142

4243
</Project>

dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.Grpc.LocalRuntime/Program.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
});
2828

2929
var openAIOptions = config.GetValid<OpenAIOptions>(OpenAIOptions.SectionName);
30+
var cosmosDbOptions = config.GetValid<CosmosDBOptions>(CosmosDBOptions.SectionName);
3031

3132
// Configure the Kernel with DI. This is required for dependency injection to work with processes.
3233
builder.Services.AddKernel();
@@ -65,7 +66,13 @@
6566
// Registering storage used for persisting process state with Local Runtime
6667
string tempDirectoryPath = Path.Combine(Path.GetTempPath(), "MySKProcessStorage");
6768
var storageInstance = new JsonFileStorage(tempDirectoryPath);
68-
builder.Services.AddSingleton<IProcessStorageConnector>(storageInstance);
69+
70+
var cloudStorageInstance = new CosmosDbProcessStorageConnector(
71+
cosmosDbOptions.ConnectionString, cosmosDbOptions.DatabaseName, cosmosDbOptions.ContainerName
72+
);
73+
74+
//builder.Services.AddSingleton<IProcessStorageConnector>(storageInstance);
75+
builder.Services.AddSingleton<IProcessStorageConnector>(cloudStorageInstance);
6976

7077
// Enabling CORS for grpc-web when using webApp as client, remove if not needed
7178
builder.Services.AddCors(o => o.AddPolicy("AllowAll", builder =>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System.ComponentModel.DataAnnotations;
4+
5+
namespace ProcessWithCloudEvents.SharedComponents.Options;
6+
7+
/// <summary>
8+
/// Configuration for Cosmos DB.
9+
/// </summary>
10+
public class CosmosDBOptions
11+
{
12+
public const string SectionName = "CosmosDB";
13+
14+
[Required]
15+
public string ConnectionString { get; set; } = string.Empty;
16+
17+
[Required]
18+
public string DatabaseName { get; set; } = string.Empty;
19+
20+
[Required]
21+
public string ContainerName { get; set; } = string.Empty;
22+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
#pragma warning disable IDE0005 // Using directive is unnecessary
3+
using System.Collections.Concurrent;
4+
using System.ComponentModel;
5+
using System.Text.Json;
6+
using Microsoft.Azure.Cosmos;
7+
using Microsoft.SemanticKernel;
8+
using Newtonsoft.Json;
9+
#pragma warning restore IDE0005 // Using directive is unnecessary
10+
11+
namespace ProcessWithCloudEvents.SharedComponents.Storage;
12+
13+
// CosmosDB V3 has a dependency on Newtonsoft.Json, so need to add wrapper class for Cosmos DB entities:
14+
// https://brettmckenzie.net/posts/the-input-content-is-invalid-because-the-required-properties-id-are-missing/
15+
internal record CosmosDbEntity<T>
16+
{
17+
[JsonProperty("id")]
18+
public string Id { get; init; } = string.Empty;
19+
20+
[JsonProperty("body")]
21+
public T Body { get; init; } = default!;
22+
23+
[JsonProperty("instanceId")]
24+
public string PartitionKey { get; init; } = string.Empty;
25+
}
26+
27+
internal sealed class CosmosDbProcessStorageConnector : IProcessStorageConnector, IDisposable
28+
{
29+
private readonly CosmosClient _cosmosClient;
30+
private readonly Microsoft.Azure.Cosmos.Container _container;
31+
private readonly string _databaseId;
32+
private readonly string _containerId;
33+
34+
public CosmosDbProcessStorageConnector(string connectionString, string databaseId, string containerId)
35+
{
36+
this._cosmosClient = new CosmosClient(connectionString);
37+
this._databaseId = databaseId;
38+
this._containerId = containerId;
39+
this._container = this._cosmosClient.GetContainer(databaseId, containerId);
40+
}
41+
42+
public async ValueTask OpenConnectionAsync()
43+
{
44+
// Cosmos DB client handles connection pooling internally, so just ensure the client is initialized
45+
await Task.CompletedTask;
46+
}
47+
48+
public async ValueTask CloseConnectionAsync()
49+
{
50+
// Dispose the CosmosClient to close connections
51+
this._cosmosClient.Dispose();
52+
await Task.CompletedTask;
53+
}
54+
55+
public async Task<TEntry?> GetEntryAsync<TEntry>(string id) where TEntry : class
56+
{
57+
try
58+
{
59+
var response = await this._container.ReadItemAsync<CosmosDbEntity<TEntry>>(id, new PartitionKey(id));
60+
return response.Resource.Body;
61+
}
62+
catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
63+
{
64+
// Item not found
65+
return null;
66+
}
67+
}
68+
69+
public async Task<bool> SaveEntryAsync<TEntry>(string id, string type, TEntry entry) where TEntry : class
70+
{
71+
try
72+
{
73+
var wrappedEntry = new CosmosDbEntity<TEntry>
74+
{
75+
Id = id,
76+
Body = entry,
77+
PartitionKey = id
78+
};
79+
await this._container.UpsertItemAsync(wrappedEntry, new PartitionKey(id));
80+
return true;
81+
}
82+
catch (CosmosException ex)
83+
{
84+
// Handle exceptions as needed, log them, etc.
85+
Console.WriteLine($"Error saving entry: {ex.Message}");
86+
return false;
87+
}
88+
}
89+
90+
public async Task<bool> DeleteEntryAsync(string id)
91+
{
92+
try
93+
{
94+
await this._container.DeleteItemAsync<object>(id, new PartitionKey(id));
95+
return true;
96+
}
97+
catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
98+
{
99+
// Item not found
100+
return false;
101+
}
102+
}
103+
104+
public void Dispose()
105+
{
106+
this._cosmosClient?.Dispose();
107+
}
108+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System.Collections.Generic;
4+
using System.Threading.Tasks;
5+
using Microsoft.SemanticKernel.Process.Models.Storage;
6+
7+
namespace Microsoft.SemanticKernel;
8+
9+
/// <summary>
10+
/// Defines operations for managing the storage of process step information, state, and events.
11+
/// </summary>
12+
public interface IProcessStepStorageOperations
13+
{
14+
// For now step data is fetched by parent process only.
15+
// Task FetchStepDataAsync(KernelProcessStepInfo step);
16+
17+
/// <summary>
18+
/// Retrieves detailed information about a specific process step: parentId, version, etc.
19+
/// </summary>
20+
/// <param name="stepInfo">The step for which information is to be retrieved. This parameter cannot be null.</param>
21+
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="StorageStepInfo"/>
22+
/// object with the step details, or <see langword="null"/> if the step information is not available.</returns>
23+
Task<StorageStepInfo?> GetStepInfoAsync(KernelProcessStepInfo stepInfo);
24+
25+
/// <summary>
26+
/// Saves detailed information about a specific process step, such as parentId, version, etc.
27+
/// </summary>
28+
/// <param name="stepInfo"></param>
29+
/// <returns></returns>
30+
Task<bool> SaveStepInfoAsync(KernelProcessStepInfo stepInfo);
31+
32+
/// <summary>
33+
/// Retrieves the current state of a process step.
34+
/// </summary>
35+
/// <param name="stepInfo"></param>
36+
/// <returns>A task that represents the asynchronous operation. The task result contains the current state of the specified
37+
/// step, or <see langword="null"/> if the step does not exist.</returns>
38+
Task<KernelProcessStepState?> GetStepStateAsync(KernelProcessStepInfo stepInfo);
39+
40+
/// <summary>
41+
/// Saves the current state of a process step.
42+
/// </summary>
43+
/// <param name="stepInfo"></param>
44+
/// <returns></returns>
45+
Task<bool> SaveStepStateAsync(KernelProcessStepInfo stepInfo);
46+
47+
/// <summary>
48+
/// Retrieves the events associated with a specific process step.
49+
/// </summary>
50+
/// <param name="stepInfo"></param>
51+
/// <returns></returns>
52+
Task<StorageStepEvents?> GetStepEventsAsync(KernelProcessStepInfo stepInfo);
53+
54+
/// <summary>
55+
/// Saves the events associated with a specific process step.
56+
/// </summary>
57+
/// <param name="stepInfo"></param>
58+
/// <param name="edgeGroups"></param>
59+
/// <returns></returns>
60+
Task<bool> SaveStepEventsAsync(KernelProcessStepInfo stepInfo, Dictionary<string, Dictionary<string, object?>>? edgeGroups = null);
61+
// For now step data can be saved to storage only by parent process only.
62+
// This is to only save data to storage at the end of the process super step execution.
63+
//Task<bool> SaveStepDataToStorageAsync(KernelProcessStepInfo step);
64+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System.Collections.Generic;
4+
using System.Threading.Tasks;
5+
using Microsoft.SemanticKernel.Process.Models.Storage;
6+
7+
namespace Microsoft.SemanticKernel;
8+
9+
internal interface IProcessStorageOperations
10+
{
11+
/// <summary>
12+
/// Initializes the storage connection to be used by the process.
13+
/// </summary>
14+
/// <returns></returns>
15+
Task<bool> InitializeAsync();
16+
/// <summary>
17+
/// Closes storage connection and cleans up resources.
18+
/// </summary>
19+
/// <returns></returns>
20+
Task<bool> CloseAsync();
21+
/// <summary>
22+
/// Fetches from storage the process data for a specific process.
23+
/// </summary>
24+
/// <param name="process"></param>
25+
/// <returns></returns>
26+
Task FetchProcessDataAsync(KernelProcess process);
27+
/// <summary>
28+
/// Get the process information already retrieved from storage, including parentId, version, mapping of steps and running ids.
29+
/// </summary>
30+
/// <param name="process"></param>
31+
/// <returns></returns>
32+
Task<StorageProcessInfo?> GetProcessInfoAsync(KernelProcess process);
33+
/// <summary>
34+
/// Saves the process information to storage, including parentId, version, mapping of steps and running ids.
35+
/// </summary>
36+
/// <param name="process"></param>
37+
/// <returns></returns>
38+
Task<bool> SaveProcessInfoAsync(KernelProcess process);
39+
/// <summary>
40+
/// Save process events to storage, including pending external events.
41+
/// </summary>
42+
/// <param name="process"></param>
43+
/// <param name="pendingExternalEvents"></param>
44+
/// <returns></returns>
45+
Task<bool> SaveProcessEventsAsync(KernelProcess process, List<KernelProcessEvent>? pendingExternalEvents = null);
46+
/// <summary>
47+
/// Saves all process related data to storage, including process info, events, and step data.
48+
/// </summary>
49+
/// <param name="process"></param>
50+
/// <returns></returns>
51+
Task<bool> SaveProcessDataToStorageAsync(KernelProcess process);
52+
53+
// Step related operations to be applied to process children steps only
54+
/// <summary>
55+
/// Fetches from storage the step data for a specific process step.
56+
/// </summary>
57+
/// <param name="stepInfo"></param>
58+
/// <returns></returns>
59+
Task FetchStepDataAsync(KernelProcessStepInfo stepInfo);
60+
61+
/// <summary>
62+
/// Saves the step data to storage.
63+
/// </summary>
64+
/// <param name="stepInfo"></param>
65+
/// <returns></returns>
66+
Task<bool> SaveStepDataToStorageAsync(KernelProcessStepInfo stepInfo);
67+
}

dotnet/src/Experimental/Process.Abstractions/KernelProcessStepInfo.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ public KernelProcessStepState State
3131
}
3232
}
3333

34+
/// <inheritdoc cref="KernelProcessStepState.RunId"/>
35+
public string? RunId => this.State.RunId;
36+
37+
/// <inheritdoc cref="KernelProcessStepState.StepId"/>
38+
public string? StepId => this.State.StepId;
39+
40+
/// <inheritdoc cref="KernelProcessStepState.ParentId"/>
41+
public string? ParentId => this.State.ParentId;
42+
3443
/// <summary>
3544
/// The semantic description of the Step. This is intended to be human and AI readable and is not required to be unique.
3645
/// </summary>

dotnet/src/Experimental/Process.Abstractions/KernelProcessStepState.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ internal static void RegisterDerivedType(Type derivedType)
4444
[JsonPropertyName("runId")]
4545
public string? RunId { get; set; }
4646

47+
/// <summary>
48+
/// Gets or sets the identifier of the step parent.
49+
/// </summary>
50+
[DataMember]
51+
[JsonPropertyName("parentId")]
52+
public string? ParentId { get; set; }
53+
4754
/// <summary>
4855
/// The name of the Step. This is intended to be human readable and is not required to be unique. If
4956
/// not provided, the name will be derived from the steps .NET type.
@@ -105,4 +112,13 @@ public KernelProcessStepState(string stepId, string version, string? runId = nul
105112
this.StepId = stepId;
106113
this.Version = version;
107114
}
115+
116+
/// <summary>
117+
/// Initializes a new instance of the <see cref="KernelProcessStepState"/> class.
118+
/// </summary>
119+
/// <param name="stepState"></param>
120+
public KernelProcessStepState(KernelProcessStepState stepState)
121+
: base(stepState.StepId, stepState.Version, stepState.RunId)
122+
{
123+
}
108124
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System.Text.Json.Serialization;
4+
5+
namespace Microsoft.SemanticKernel.Process.Models.Storage;
6+
7+
/// <summary>
8+
/// Base class for storage entries.
9+
/// </summary>
10+
public abstract record StorageEntryBase
11+
{
12+
/// <summary>
13+
/// Unique identifier of the storage entry.
14+
/// </summary>
15+
[JsonPropertyName("instanceId")]
16+
public string InstanceId { get; set; } = string.Empty;
17+
}

0 commit comments

Comments
 (0)