diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Exceptions/IdempotencyItemAlreadyExistsException.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Exceptions/IdempotencyItemAlreadyExistsException.cs index 1603bba7..55144bff 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Exceptions/IdempotencyItemAlreadyExistsException.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Exceptions/IdempotencyItemAlreadyExistsException.cs @@ -14,6 +14,7 @@ */ using System; +using AWS.Lambda.Powertools.Idempotency.Persistence; namespace AWS.Lambda.Powertools.Idempotency.Exceptions; @@ -22,6 +23,11 @@ namespace AWS.Lambda.Powertools.Idempotency.Exceptions; /// public class IdempotencyItemAlreadyExistsException : Exception { + /// + /// The record that already exists in the persistence layer. + /// + public DataRecord Record { get; set; } + /// /// Creates a new IdempotencyItemAlreadyExistsException /// diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs index a8d7da73..35ebd640 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs @@ -106,9 +106,25 @@ private async Task ProcessIdempotency() // already exists. If it succeeds, there's no need to call getRecord. await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow, GetRemainingTimeInMillis()); } - catch (IdempotencyItemAlreadyExistsException) + catch (IdempotencyItemAlreadyExistsException ex) { - var record = await GetIdempotencyRecord(); + DataRecord record; + + if(ex.Record != null) + { + // If the error includes the existing record, we can use it to validate + // the record being processed and cache it in memory. + var existingRecord = _persistenceStore.ProcessExistingRecord(ex.Record, _data); + record = existingRecord; + } + else + { + // If the error doesn't include the existing record, we need to fetch + // it from the persistence layer. In doing so, we also call the processExistingRecord + // method to validate the record and cache it in memory. + record = await GetIdempotencyRecord(); + } + return await HandleForStatus(record); } catch (IdempotencyKeyException) diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs index c603a867..07a19913 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs @@ -378,4 +378,20 @@ private static string GetHash(HashAlgorithm hashAlgorithm, string input) /// public abstract Task DeleteRecord(string idempotencyKey); + + /// + /// Validates an existing record against the data payload being processed. + /// If the payload does not match the stored record, an `IdempotencyValidationError` error is thrown. + /// Whenever a record is retrieved from the persistence layer, it should be validated against the data payload + /// being processed. This is to ensure that the data payload being processed is the same as the one that was + /// used to create the record in the first place. + /// + /// The record is also saved to the local cache if local caching is enabled. + /// + public virtual DataRecord ProcessExistingRecord(DataRecord exRecord, JsonDocument data) + { + ValidatePayload(data, exRecord); + SaveToCache(exRecord); + return exRecord; + } } \ No newline at end of file diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs index 9b8cf500..d82681c4 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs @@ -191,6 +191,7 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now) Item = item, ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)", ExpressionAttributeNames = expressionAttributeNames, + ReturnValuesOnConditionCheckFailure = ReturnValuesOnConditionCheckFailure.ALL_OLD, ExpressionAttributeValues = new Dictionary { {":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}}, @@ -202,8 +203,20 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now) } catch (ConditionalCheckFailedException e) { - throw new IdempotencyItemAlreadyExistsException( + var ex = new IdempotencyItemAlreadyExistsException( "Failed to put record for already existing idempotency key: " + record.IdempotencyKey, e); + + if (e.Item != null) + { + ex.Record = new DataRecord(e.Item[_keyAttr].S, + Enum.Parse(e.Item[_statusAttr].S), + long.Parse(e.Item[_expiryAttr].N), + e.Item.TryGetValue(_dataAttr, out var data) ? data?.S : null, + e.Item.TryGetValue(_validationAttr, out var validation) ? validation?.S : null, + e.Item.TryGetValue(_inProgressExpiryAttr, out var inProgExp) ? long.Parse(inProgExp.N) : null); + } + + throw ex; } } diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs index 32057ccb..104beb68 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs @@ -524,7 +524,7 @@ public void GenerateHash_WhenInputIsDouble_ShouldGenerateMd5ofDouble() // Assert generatedHash.Should().Be(expectedHash); } - + [Fact] public async Task When_Key_Prefix_Set_Should_Create_With_Prefix() { @@ -563,4 +563,35 @@ private static APIGatewayProxyRequest LoadApiGatewayProxyRequest() throw; } } + + [Fact] + public async Task ProcessExistingRecord_WhenValidRecord_ShouldReturnRecordAndSaveToCache() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + LRUCache cache = new(2); + + persistenceStore.Configure(new IdempotencyOptionsBuilder() + .WithUseLocalCache(true) + .Build(), null, null, cache); + + var now = DateTimeOffset.UtcNow; + var existingRecord = new DataRecord( + "testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6", + DataRecord.DataRecordStatus.COMPLETED, + now.AddSeconds(3600).ToUnixTimeSeconds(), + "existing response", + null); + + // Act + var result = + persistenceStore.ProcessExistingRecord(existingRecord, JsonSerializer.SerializeToDocument(request)!); + + // Assert + result.Should().Be(existingRecord); + cache.Count.Should().Be(1); + cache.TryGet("testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6", out var cachedRecord).Should().BeTrue(); + cachedRecord.Should().Be(existingRecord); + } } \ No newline at end of file diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs index 6dc2fb84..09b4c781 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs @@ -1,12 +1,12 @@ /* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * + * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at - * + * * http://aws.amazon.com/apache2.0 - * + * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing @@ -33,7 +33,7 @@ public class DynamoDbPersistenceStoreTests : IClassFixture private readonly DynamoDBPersistenceStore _dynamoDbPersistenceStore; private readonly AmazonDynamoDBClient _client; private readonly string _tableName; - + public DynamoDbPersistenceStoreTests(DynamoDbFixture fixture) { _client = fixture.Client; @@ -42,21 +42,23 @@ public DynamoDbPersistenceStoreTests(DynamoDbFixture fixture) .WithTableName(_tableName) .WithDynamoDBClient(_client) .Build(); - _dynamoDbPersistenceStore.Configure(new IdempotencyOptionsBuilder().Build(),functionName: null, keyPrefix: null); + _dynamoDbPersistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), functionName: null, + keyPrefix: null); } - + //putRecord [Fact] public async Task PutRecord_WhenRecordDoesNotExist_ShouldCreateRecordInDynamoDB() { // Arrange var now = DateTimeOffset.UtcNow; + var uniqueKey = $"key_{Guid.NewGuid()}"; var expiry = now.AddSeconds(3600).ToUnixTimeSeconds(); - var key = CreateKey("key"); - + var key = CreateKey(uniqueKey); + // Act await _dynamoDbPersistenceStore - .PutRecord(new DataRecord("key", DataRecord.DataRecordStatus.COMPLETED, expiry, null, null), now); + .PutRecord(new DataRecord(uniqueKey, DataRecord.DataRecordStatus.COMPLETED, expiry, null, null), now); // Assert var getItemResponse = @@ -73,7 +75,7 @@ await _client.GetItemAsync(new GetItemRequest } [Fact] - public async Task PutRecord_WhenRecordAlreadyExist_ShouldThrowIdempotencyItemAlreadyExistsException() + public async Task PutRecord_WhenRecordAlreadyExist_ShouldThrowIdempotencyItemAlreadyExistsException() { // Arrange var key = CreateKey("key"); @@ -82,7 +84,7 @@ public async Task PutRecord_WhenRecordAlreadyExist_ShouldThrowIdempotencyItemAlr Dictionary item = new(key); var now = DateTimeOffset.UtcNow; var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); - item.Add("expiration", new AttributeValue {N = expiry.ToString()}); + item.Add("expiration", new AttributeValue { N = expiry.ToString() }); item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.COMPLETED.ToString())); item.Add("data", new AttributeValue("Fake Data")); await _client.PutItemAsync(new PutItemRequest @@ -100,24 +102,24 @@ await _client.PutItemAsync(new PutItemRequest null, null ), now); - + // Assert await act.Should().ThrowAsync(); - + // item was not updated, retrieve the initial one var itemInDb = (await _client.GetItemAsync(new GetItemRequest - { - TableName = _tableName, - Key = key - })).Item; + { + TableName = _tableName, + Key = key + })).Item; itemInDb.Should().NotBeNull(); itemInDb["status"].S.Should().Be("COMPLETED"); itemInDb["expiration"].N.Should().Be(expiry.ToString()); itemInDb["data"].S.Should().Be("Fake Data"); } - + [Fact] - public async Task PutRecord_ShouldBlockUpdate_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() + public async Task PutRecord_ShouldBlockUpdate_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() { // Arrange var key = CreateKey("key"); @@ -127,18 +129,18 @@ public async Task PutRecord_ShouldBlockUpdate_IfRecordAlreadyExistAndProgressNot var now = DateTimeOffset.UtcNow; var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); var progressExpiry = now.AddSeconds(30).ToUnixTimeMilliseconds(); - - item.Add("expiration", new AttributeValue {N = expiry.ToString()}); + + item.Add("expiration", new AttributeValue { N = expiry.ToString() }); item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); item.Add("data", new AttributeValue("Fake Data")); - item.Add("in_progress_expiration", new AttributeValue {N = progressExpiry.ToString()}); - + item.Add("in_progress_expiration", new AttributeValue { N = progressExpiry.ToString() }); + await _client.PutItemAsync(new PutItemRequest { TableName = _tableName, Item = item }); - + var expiry2 = now.AddSeconds(3600).ToUnixTimeSeconds(); // Act var act = () => _dynamoDbPersistenceStore.PutRecord( @@ -148,10 +150,10 @@ await _client.PutItemAsync(new PutItemRequest "Fake Data 2", null ), now); - + // Assert await act.Should().ThrowAsync(); - + // item was not updated, retrieve the initial one var itemInDb = (await _client.GetItemAsync(new GetItemRequest { @@ -163,9 +165,9 @@ await _client.PutItemAsync(new PutItemRequest itemInDb["expiration"].N.Should().Be(expiry.ToString()); itemInDb["data"].S.Should().Be("Fake Data"); } - + [Fact] - public async Task PutRecord_ShouldCreateRecordInDynamoDB_IfLambdaWasInProgressAndTimedOut() + public async Task PutRecord_ShouldCreateRecordInDynamoDB_IfLambdaWasInProgressAndTimedOut() { // Arrange var key = CreateKey("key"); @@ -175,20 +177,20 @@ public async Task PutRecord_ShouldCreateRecordInDynamoDB_IfLambdaWasInProgressAn var now = DateTimeOffset.UtcNow; var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); var progressExpiry = now.AddSeconds(-30).ToUnixTimeMilliseconds(); - - item.Add("expiration", new AttributeValue {N = expiry.ToString()}); + + item.Add("expiration", new AttributeValue { N = expiry.ToString() }); item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); item.Add("data", new AttributeValue("Fake Data")); - item.Add("in_progress_expiration", new AttributeValue {N = progressExpiry.ToString()}); - + item.Add("in_progress_expiration", new AttributeValue { N = progressExpiry.ToString() }); + await _client.PutItemAsync(new PutItemRequest { TableName = _tableName, Item = item }); - + var expiry2 = now.AddSeconds(3600).ToUnixTimeSeconds(); - + // Act await _dynamoDbPersistenceStore.PutRecord( new DataRecord("key", @@ -197,7 +199,7 @@ await _dynamoDbPersistenceStore.PutRecord( null, null ), now); - + // Assert // an item is inserted var itemInDb = (await _client.GetItemAsync(new GetItemRequest @@ -205,23 +207,23 @@ await _dynamoDbPersistenceStore.PutRecord( TableName = _tableName, Key = key })).Item; - + itemInDb.Should().NotBeNull(); itemInDb["status"].S.Should().Be("INPROGRESS"); itemInDb["expiration"].N.Should().Be(expiry2.ToString()); } - + //getRecord [Fact] public async Task GetRecord_WhenRecordExistsInDynamoDb_ShouldReturnExistingRecord() { // Arrange //await InitializeAsync(); - + // Insert a fake item with same id Dictionary item = new() { - {"id", new AttributeValue("key")} //key + { "id", new AttributeValue("key") } //key }; var now = DateTimeOffset.UtcNow; var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); @@ -252,10 +254,10 @@ public async Task GetRecord_WhenRecordIsAbsent_ShouldThrowException() { //Arrange await _dynamoDbPersistenceStore.DeleteRecord("key"); - + // Act Func act = () => _dynamoDbPersistenceStore.GetRecord("key"); - + // Assert await act.Should().ThrowAsync(); } @@ -280,7 +282,8 @@ await _client.PutItemAsync(new PutItemRequest Item = item }); // enable payload validation - _dynamoDbPersistenceStore.Configure(new IdempotencyOptionsBuilder().WithPayloadValidationJmesPath("path").Build(), + _dynamoDbPersistenceStore.Configure( + new IdempotencyOptionsBuilder().WithPayloadValidationJmesPath("path").Build(), null, null); // Act @@ -303,14 +306,14 @@ await _client.PutItemAsync(new PutItemRequest //deleteRecord [Fact] - public async Task DeleteRecord_WhenRecordExistsInDynamoDb_ShouldDeleteRecord() + public async Task DeleteRecord_WhenRecordExistsInDynamoDb_ShouldDeleteRecord() { // Arrange: Insert a fake item with same id var key = CreateKey("key"); Dictionary item = new(key); var now = DateTimeOffset.UtcNow; var expiry = now.AddSeconds(360).ToUnixTimeSeconds(); - item.Add("expiration", new AttributeValue {N=expiry.ToString()}); + item.Add("expiration", new AttributeValue { N = expiry.ToString() }); item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); await _client.PutItemAsync(new PutItemRequest { @@ -367,7 +370,7 @@ public async Task EndToEndWithCustomAttrNamesAndSortKey() .WithStatusAttr("state") .WithValidationAttr("valid") .Build(); - persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(),functionName: null, keyPrefix: null); + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), functionName: null, keyPrefix: null); var now = DateTimeOffset.UtcNow; var record = new DataRecord( @@ -419,7 +422,6 @@ public async Task EndToEndWithCustomAttrNamesAndSortKey() { TableName = tableNameCustom })).Count.Should().Be(0); - } finally { @@ -438,18 +440,18 @@ await _client.DeleteTableAsync(new DeleteTableRequest } [Fact] - public async Task GetRecord_WhenIdempotencyDisabled_ShouldNotCreateClients() + public async Task GetRecord_WhenIdempotencyDisabled_ShouldNotCreateClients() { try { // Arrange Environment.SetEnvironmentVariable(Constants.IdempotencyDisabledEnv, "true"); - + var store = new DynamoDBPersistenceStoreBuilder().WithTableName(_tableName).Build(); - + // Act Func act = () => store.GetRecord("fake"); - + // Assert await act.Should().ThrowAsync(); } @@ -458,12 +460,136 @@ public async Task GetRecord_WhenIdempotencyDisabled_ShouldNotCreateClients() Environment.SetEnvironmentVariable(Constants.IdempotencyDisabledEnv, "false"); } } + private static Dictionary CreateKey(string keyValue) { var key = new Dictionary { - {"id", new AttributeValue(keyValue)} + { "id", new AttributeValue(keyValue) } }; return key; } + + [Fact] + public async Task PutRecord_WhenRecordAlreadyExists_ShouldReturnExistingRecordInException() + { + // Arrange + var key = CreateKey("key"); + var now = DateTimeOffset.UtcNow; + var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); + + // Insert a fake item with same id + Dictionary item = new(key); + item.Add("expiration", new AttributeValue { N = expiry.ToString() }); + item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.COMPLETED.ToString())); + item.Add("data", new AttributeValue("Existing Data")); + item.Add("validation", new AttributeValue("existing-hash")); + + await _client.PutItemAsync(new PutItemRequest + { + TableName = _tableName, + Item = item + }); + + var newRecord = new DataRecord("key", + DataRecord.DataRecordStatus.INPROGRESS, + now.AddSeconds(3600).ToUnixTimeSeconds(), + null, + null); + + // Act + var exception = + await Assert.ThrowsAsync(() => + _dynamoDbPersistenceStore.PutRecord(newRecord, now)); + + // Assert + exception.Record.Should().NotBeNull(); + exception.Record.IdempotencyKey.Should().Be("key"); + exception.Record.Status.Should().Be(DataRecord.DataRecordStatus.COMPLETED); + exception.Record.ResponseData.Should().Be("Existing Data"); + exception.Record.PayloadHash.Should().Be("existing-hash"); + exception.Record.ExpiryTimestamp.Should().Be(expiry); + } + + [Fact] + public async Task PutRecord_WhenRecordWithInProgressExpiryExists_ShouldReturnExistingRecordInException() + { + // Arrange + var key = CreateKey("key"); + var now = DateTimeOffset.UtcNow; + var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); + var inProgressExpiry = now.AddSeconds(30).ToUnixTimeMilliseconds(); + + // Insert a fake item with same id including in_progress_expiration + Dictionary item = new(key); + item.Add("expiration", new AttributeValue { N = expiry.ToString() }); + item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); + item.Add("data", new AttributeValue("In Progress Data")); + item.Add("in_progress_expiration", new AttributeValue { N = inProgressExpiry.ToString() }); + + await _client.PutItemAsync(new PutItemRequest + { + TableName = _tableName, + Item = item + }); + + var newRecord = new DataRecord("key", + DataRecord.DataRecordStatus.INPROGRESS, + now.AddSeconds(3600).ToUnixTimeSeconds(), + null, + null); + + // Act + var exception = + await Assert.ThrowsAsync(() => + _dynamoDbPersistenceStore.PutRecord(newRecord, now)); + + // Assert + exception.Record.Should().NotBeNull(); + exception.Record.IdempotencyKey.Should().Be("key"); + exception.Record.Status.Should().Be(DataRecord.DataRecordStatus.INPROGRESS); + exception.Record.ResponseData.Should().Be("In Progress Data"); + exception.Record.InProgressExpiryTimestamp.Should().Be(inProgressExpiry); + exception.Record.ExpiryTimestamp.Should().Be(expiry); + } + + [Fact] + public async Task PutRecord_WhenRecordExistsWithMissingOptionalFields_ShouldHandleNullValues() + { + // Arrange + var key = CreateKey("key"); + var now = DateTimeOffset.UtcNow; + var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); + + // Insert a minimal record without optional fields (data, validation, in_progress_expiration) + Dictionary item = new(key); + item.Add("expiration", new AttributeValue { N = expiry.ToString() }); + item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); + + await _client.PutItemAsync(new PutItemRequest + { + TableName = _tableName, + Item = item + }); + + var newRecord = new DataRecord("key", + DataRecord.DataRecordStatus.INPROGRESS, + now.AddSeconds(3600).ToUnixTimeSeconds(), + null, + null); + + // Act + var exception = + await Assert.ThrowsAsync(() => + _dynamoDbPersistenceStore.PutRecord(newRecord, now)); + + // Assert + exception.Record.Should().NotBeNull(); + exception.Record.IdempotencyKey.Should().Be("key"); + exception.Record.Status.Should().Be(DataRecord.DataRecordStatus.INPROGRESS); + exception.Record.ResponseData.Should().BeNull(); + exception.Record.PayloadHash.Should().BeNull(); + exception.Record.InProgressExpiryTimestamp.Should().BeNull(); + exception.Record.ExpiryTimestamp.Should().Be(expiry); + } } \ No newline at end of file