Skip to content

Commit 7713ea0

Browse files
committed
Data Store now has MetaData
1 parent 9349e64 commit 7713ea0

16 files changed

+216
-106
lines changed

source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeFixture.cs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ public async Task SetInHash_ShouldStoreValueInHash()
5757
var key = Guid.NewGuid().ToString();
5858
var field = "test-field";
5959
var payload = "test-payload";
60+
var values = new Dictionary<string, string> { { field, payload } };
6061

6162
// Act
62-
await redisFacade.SetInHash(key, field, payload, TimeSpan.FromMinutes(1), CancellationToken);
63+
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
6364

6465
// Assert - We'll verify by trying to get and delete it
65-
var retrievedValue = await redisFacade.TryGetAndDeleteFromHash(key, field, CancellationToken);
66-
retrievedValue.Should().Be(payload);
66+
var retrievedValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
67+
retrievedValues.Should().NotBeNull();
68+
retrievedValues![field].Should().Be(payload);
6769
}
6870

6971
[Test]
@@ -74,14 +76,16 @@ public async Task TryGetAndDeleteFromHash_WithExistingValue_ShouldReturnValueAnd
7476
var key = Guid.NewGuid().ToString();
7577
var field = "test-field";
7678
var payload = "test-payload";
79+
var values = new Dictionary<string, string> { { field, payload } };
7780

78-
await redisFacade.SetInHash(key, field, payload, TimeSpan.FromMinutes(1), CancellationToken);
81+
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
7982

8083
// Act
81-
var retrievedValue = await redisFacade.TryGetAndDeleteFromHash(key, field, CancellationToken);
84+
var retrievedValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
8285

8386
// Assert
84-
retrievedValue.Should().Be(payload);
87+
retrievedValues.Should().NotBeNull();
88+
retrievedValues![field].Should().Be(payload);
8589
}
8690

8791
[Test]
@@ -92,8 +96,9 @@ public async Task HashContainsKey_WithExistingField_ShouldReturnTrue()
9296
var key = Guid.NewGuid().ToString();
9397
var field = "test-field";
9498
var payload = "test-payload";
99+
var values = new Dictionary<string, string> { { field, payload } };
95100

96-
await redisFacade.SetInHash(key, field, payload, TimeSpan.FromMinutes(1), CancellationToken);
101+
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
97102

98103
// Act
99104
var exists = await redisFacade.HashContainsKey(key, field, CancellationToken);
@@ -140,25 +145,27 @@ public async Task TryGetAndDeleteFromHash_ShouldDeleteTheEntireKey()
140145
var key = Guid.NewGuid().ToString();
141146
var field = "test-field";
142147
var payload = "test-payload";
148+
var values = new Dictionary<string, string> { { field, payload } };
143149

144-
await redisFacade.SetInHash(key, field, payload, TimeSpan.FromMinutes(1), CancellationToken);
150+
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
145151

146152
// Verify the hash field exists
147153
var existsBefore = await redisFacade.HashContainsKey(key, field, CancellationToken);
148154
existsBefore.Should().BeTrue();
149155

150156
// Act
151-
var retrievedValue = await redisFacade.TryGetAndDeleteFromHash(key, field, CancellationToken);
157+
var retrievedValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
152158

153159
// Assert
154-
retrievedValue.Should().Be(payload);
160+
retrievedValues.Should().NotBeNull();
161+
retrievedValues![field].Should().Be(payload);
155162

156163
// Verify the entire key was deleted (not just the field)
157164
var existsAfter = await redisFacade.HashContainsKey(key, field, CancellationToken);
158165
existsAfter.Should().BeFalse();
159166

160167
// Verify trying to get it again returns null
161-
var secondRetrieval = await redisFacade.TryGetAndDeleteFromHash(key, field, CancellationToken);
168+
var secondRetrieval = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
162169
secondRetrieval.Should().BeNull();
163170
}
164171

@@ -333,20 +340,22 @@ public async Task SetInHash_WithTTL_ShouldExpireAfterSpecifiedTime()
333340
var key = Guid.NewGuid().ToString();
334341
var field = "test-field";
335342
var payload = "test-payload";
343+
var values = new Dictionary<string, string> { { field, payload } };
336344

337345
// Act - Set a value in hash with short TTL that we can actually test
338-
await redisFacade.SetInHash(key, field, payload, TimeSpan.FromMinutes(3), CancellationToken);
346+
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(3), CancellationToken);
339347

340348
// Immediately verify it exists
341349
var immediateExists = await redisFacade.HashContainsKey(key, field, CancellationToken);
342350
immediateExists.Should().BeTrue();
343351

344352
// Also verify we can retrieve the value immediately
345-
var immediateValue = await redisFacade.TryGetAndDeleteFromHash(key, field, CancellationToken);
346-
immediateValue.Should().Be(payload);
353+
var immediateValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
354+
immediateValues.Should().NotBeNull();
355+
immediateValues![field].Should().Be(payload);
347356

348357
// Set the value again to test expiration (since TryGetAndDeleteFromHash removes it)
349-
await redisFacade.SetInHash(key, field, payload, TimeSpan.FromMilliseconds(3), CancellationToken);
358+
await redisFacade.SetInHash(key, values, TimeSpan.FromMilliseconds(3), CancellationToken);
350359

351360
// Assert - Should eventually expire
352361
await ShouldEventually.Eventually(async () =>
@@ -356,8 +365,8 @@ await ShouldEventually.Eventually(async () =>
356365
}, TimeSpan.FromSeconds(5), CancellationToken);
357366

358367
// Verify TryGetAndDeleteFromHash also returns null for expired key
359-
var expiredValue = await redisFacade.TryGetAndDeleteFromHash(key, field, CancellationToken);
360-
expiredValue.Should().BeNull();
368+
var expiredValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
369+
expiredValues.Should().BeNull();
361370
}
362371

363372
[Test]
@@ -554,22 +563,23 @@ public async Task TryGetAndDeleteFromHash_WithConcurrentCalls_ShouldReturnValueT
554563
var key = Guid.NewGuid().ToString();
555564
var field = "test-field";
556565
var payload = "test-payload";
566+
var values = new Dictionary<string, string> { { field, payload } };
557567
const int concurrentCallCount = 20;
558568

559569
// Set a value in the hash
560-
await redisFacade.SetInHash(key, field, payload, TimeSpan.FromMinutes(1), CancellationToken);
570+
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
561571

562572
var countDownLatch = new AsyncCountdownEvent(concurrentCallCount);
563573

564574
// Act - Make multiple concurrent calls to TryGetAndDeleteFromHash
565-
var concurrentTasks = new Task<string?>[concurrentCallCount];
575+
var concurrentTasks = new Task<Dictionary<string, string?>?>[concurrentCallCount];
566576
for (int i = 0; i < concurrentCallCount; i++)
567577
{
568578
concurrentTasks[i] = Task.Run(async () =>
569579
{
570580
countDownLatch.Signal();
571581
await countDownLatch.WaitAsync();
572-
return await redisFacade.TryGetAndDeleteFromHash(key, field, CancellationToken);
582+
return await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
573583
});
574584
}
575585

@@ -580,7 +590,7 @@ public async Task TryGetAndDeleteFromHash_WithConcurrentCalls_ShouldReturnValueT
580590
var nullResults = results.Where(result => result == null).ToArray();
581591

582592
nonNullResults.Should().HaveCount(1, "exactly one concurrent call should retrieve the value");
583-
nonNullResults[0].Should().Be(payload, "the successful call should return the correct payload");
593+
nonNullResults[0]![field].Should().Be(payload, "the successful call should return the correct payload");
584594
nullResults.Should().HaveCount(concurrentCallCount - 1, "all other concurrent calls should return null");
585595

586596
// Verify the hash key no longer exists

source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeWhenRedisGoesDownAwayTests.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#if NET8_0_OR_GREATER
22
using System;
33
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
45
using System.Threading.Tasks;
56
using FluentAssertions;
67
using Halibut.Tests.Queue.Redis.Utils;
@@ -74,11 +75,13 @@ public async Task WhenTheEstablishedConnectionToRedisBrieflyGoesDown_WeCanImmedi
7475
portForwarder.ReturnToNormalMode();
7576

7677
// Assert
77-
await redisFacade.SetInHash("test-hash", "test-field", "test-value", TimeSpan.FromMinutes(1), CancellationToken);
78+
await redisFacade.SetInHash("test-hash", new Dictionary<string, string>(){{"test-field", "test-value"}}, TimeSpan.FromMinutes(1), CancellationToken);
7879

7980
// Check that the value was set.
80-
var retrievedValue = await redisFacade.TryGetAndDeleteFromHash("test-hash", "test-field", CancellationToken);
81-
retrievedValue.Should().Be("test-value");
81+
var retrievedValue = await redisFacade.TryGetAndDeleteFromHash("test-hash", new []{"test-field"}, CancellationToken);
82+
retrievedValue.Should().NotBeNull();
83+
retrievedValue.Should().ContainKey("test-field");
84+
retrievedValue!["test-field"].Should().Be("test-value");
8285
}
8386

8487
[Test]
@@ -89,13 +92,15 @@ public async Task WhenTheEstablishedConnectionToRedisBrieflyGoesDown_WeCanImmedi
8992
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder);
9093

9194
// Establish connection and set up test data
92-
await redisFacade.SetInHash("test-hash", "test-field", "test-value", TimeSpan.FromMinutes(1), CancellationToken);
95+
await redisFacade.SetInHash("test-hash", new Dictionary<string, string>(){{"test-field", "test-value"}}, TimeSpan.FromMinutes(1), CancellationToken);
9396

9497
portForwarder.EnterKillNewAndExistingConnectionsMode();
9598
portForwarder.ReturnToNormalMode();
9699

97-
var result = await redisFacade.TryGetAndDeleteFromHash("test-hash", "test-field", CancellationToken);
98-
result.Should().Be("test-value");
100+
var retrievedValue = await redisFacade.TryGetAndDeleteFromHash("test-hash", new []{"test-field"}, CancellationToken);
101+
retrievedValue.Should().NotBeNull();
102+
retrievedValue.Should().ContainKey("test-field");
103+
retrievedValue!["test-field"].Should().Be("test-value");
99104
}
100105

101106
[Test]
@@ -180,7 +185,7 @@ public async Task WhenTheEstablishedConnectionToRedisBrieflyGoesDown_WeCanImmedi
180185
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder);
181186

182187
// Establish connection and set up test data
183-
await redisFacade.SetInHash("test-hash", "test-field", "test-value", TimeSpan.FromMinutes(1), CancellationToken);
188+
await redisFacade.SetInHash("test-hash", new Dictionary<string, string>(){{"test-field", "test-value"}}, TimeSpan.FromMinutes(1), CancellationToken);
184189

185190
portForwarder.EnterKillNewAndExistingConnectionsMode();
186191
portForwarder.ReturnToNormalMode();

source/Halibut.Tests/Queue/Redis/RedisPendingRequestQueueFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public async Task WhenEnteringTheQueue_AndRedisIsUnavailableAndDataLoseOccurs_AR
217217
var redisDataLoseDetector = new CancellableDataLossWatchForRedisLosingAllItsData();
218218

219219
var redisTransport = Substitute.ForPartsOf<HalibutRedisTransportWithVirtuals>(new HalibutRedisTransport(redisFacade));
220-
redisTransport.Configure().PutRequest(Arg.Any<Uri>(), Arg.Any<Guid>(), Arg.Any<string>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>())
220+
redisTransport.Configure().PutRequest(Arg.Any<Uri>(), Arg.Any<Guid>(), Arg.Any<RedisStoredMessage>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>())
221221
.Returns(async callInfo =>
222222
{
223223
await redisDataLoseDetector.DataLossHasOccured();

source/Halibut.Tests/Queue/Redis/Utils/HalibutRedisTransportWithVirtuals.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading.Tasks;
66
using Halibut.Queue.Redis;
77
using Halibut.Queue.Redis.NodeHeartBeat;
8+
using Halibut.Queue.Redis.RedisHelpers;
89
using StackExchange.Redis;
910

1011
namespace Halibut.Tests.Queue.Redis.Utils
@@ -38,12 +39,12 @@ public Task PushRequestGuidOnToQueue(Uri endpoint, Guid guid, CancellationToken
3839
return halibutRedisTransport.TryPopNextRequestGuid(endpoint, cancellationToken);
3940
}
4041

41-
public virtual Task PutRequest(Uri endpoint, Guid requestId, string requestMessage, TimeSpan requestPickupTimeout, CancellationToken cancellationToken)
42+
public virtual Task PutRequest(Uri endpoint, Guid requestId, RedisStoredMessage requestMessage, TimeSpan requestPickupTimeout, CancellationToken cancellationToken)
4243
{
4344
return halibutRedisTransport.PutRequest(endpoint, requestId, requestMessage, requestPickupTimeout, cancellationToken);
4445
}
4546

46-
public Task<string?> TryGetAndRemoveRequest(Uri endpoint, Guid requestId, CancellationToken cancellationToken)
47+
public Task<RedisStoredMessage?> TryGetAndRemoveRequest(Uri endpoint, Guid requestId, CancellationToken cancellationToken)
4748
{
4849
return halibutRedisTransport.TryGetAndRemoveRequest(endpoint, requestId, cancellationToken);
4950
}
@@ -93,17 +94,17 @@ public Task PublishThatResponseIsAvailable(Uri endpoint, Guid identifier, Cancel
9394
return halibutRedisTransport.PublishThatResponseIsAvailable(endpoint, identifier, cancellationToken);
9495
}
9596

96-
public Task SetResponseMessage(Uri endpoint, Guid identifier, string responseMessage, TimeSpan ttl, CancellationToken cancellationToken)
97+
public Task SetResponseMessage(Uri endpoint, Guid identifier, RedisStoredMessage responseMessage, TimeSpan ttl, CancellationToken cancellationToken)
9798
{
9899
return halibutRedisTransport.SetResponseMessage(endpoint, identifier, responseMessage, ttl, cancellationToken);
99100
}
100101

101-
public Task<string?> GetResponseMessage(Uri endpoint, Guid identifier, CancellationToken cancellationToken)
102+
public Task<RedisStoredMessage?> GetResponseMessage(Uri endpoint, Guid identifier, CancellationToken cancellationToken)
102103
{
103104
return halibutRedisTransport.GetResponseMessage(endpoint, identifier, cancellationToken);
104105
}
105106

106-
public Task<bool> DeleteResponseMessage(Uri endpoint, Guid identifier, CancellationToken cancellationToken)
107+
public Task DeleteResponseMessage(Uri endpoint, Guid identifier, CancellationToken cancellationToken)
107108
{
108109
return halibutRedisTransport.DeleteResponseMessage(endpoint, identifier, cancellationToken);
109110
}

source/Halibut.Tests/Queue/Redis/Utils/InMemoryStoreDataStreamsForDistributedQueues.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,19 @@ namespace Halibut.Tests.Queue.Redis.Utils
1010
public class InMemoryStoreDataStreamsForDistributedQueues : IStoreDataStreamsForDistributedQueues
1111
{
1212
readonly IDictionary<Guid, byte[]> dataStreamsStored = new Dictionary<Guid, byte[]>();
13-
public async Task StoreDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
13+
public async Task<string> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
1414
{
1515
foreach (var dataStream in dataStreams)
1616
{
1717
using var memoryStream = new MemoryStream();
1818
await dataStream.WriteData(memoryStream, cancellationToken);
1919
dataStreamsStored[dataStream.Id] = memoryStream.ToArray();
2020
}
21+
22+
return "";
2123
}
2224

23-
public async Task ReHydrateDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
25+
public async Task ReHydrateDataStreams(string _, IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
2426
{
2527
await Task.CompletedTask;
2628
foreach (var dataStream in dataStreams)

source/Halibut.Tests/Queue/Redis/Utils/MessageReaderWriterExtensionsMethods.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
4-
using Halibut.Queue.Redis;
54
using Halibut.Queue.Redis.MessageStorage;
5+
using Halibut.Queue.Redis.RedisHelpers;
66
using Halibut.Transport.Protocol;
77

88
namespace Halibut.Tests.Queue.Redis.Utils
@@ -29,22 +29,22 @@ public MessageSerialiserAndDataStreamStorageWithVirtualMethods(IMessageSerialise
2929
this.messageSerialiserAndDataStreamStorage = messageSerialiserAndDataStreamStorage;
3030
}
3131

32-
public virtual Task<string> PrepareRequest(RequestMessage request, CancellationToken cancellationToken)
32+
public virtual Task<RedisStoredMessage> PrepareRequest(RequestMessage request, CancellationToken cancellationToken)
3333
{
3434
return messageSerialiserAndDataStreamStorage.PrepareRequest(request, cancellationToken);
3535
}
3636

37-
public virtual Task<RequestMessage> ReadRequest(string jsonRequest, CancellationToken cancellationToken)
37+
public virtual Task<RequestMessage> ReadRequest(RedisStoredMessage jsonRequest, CancellationToken cancellationToken)
3838
{
3939
return messageSerialiserAndDataStreamStorage.ReadRequest(jsonRequest, cancellationToken);
4040
}
4141

42-
public virtual Task<string> PrepareResponse(ResponseMessage response, CancellationToken cancellationToken)
42+
public virtual Task<RedisStoredMessage> PrepareResponse(ResponseMessage response, CancellationToken cancellationToken)
4343
{
4444
return messageSerialiserAndDataStreamStorage.PrepareResponse(response, cancellationToken);
4545
}
4646

47-
public virtual Task<ResponseMessage> ReadResponse(string jsonResponse, CancellationToken cancellationToken)
47+
public virtual Task<ResponseMessage> ReadResponse(RedisStoredMessage jsonResponse, CancellationToken cancellationToken)
4848
{
4949
return messageSerialiserAndDataStreamStorage.ReadResponse(jsonResponse, cancellationToken);
5050
}
@@ -59,7 +59,7 @@ public MessageSerialiserAndDataStreamStorageThatThrowsWhenReadingResponse(IMessa
5959
this.exception = exception;
6060
}
6161

62-
public override Task<ResponseMessage> ReadResponse(string jsonResponse, CancellationToken cancellationToken)
62+
public override Task<ResponseMessage> ReadResponse(RedisStoredMessage jsonResponse, CancellationToken cancellationToken)
6363
{
6464
throw exception();
6565
}
@@ -74,7 +74,7 @@ public MessageSerialiserAndDataStreamStorageThatThrowsOnPrepareRequest(IMessageS
7474
this.exception = exception;
7575
}
7676

77-
public override Task<string> PrepareRequest(RequestMessage request, CancellationToken cancellationToken)
77+
public override Task<RedisStoredMessage> PrepareRequest(RequestMessage request, CancellationToken cancellationToken)
7878
{
7979
throw exception();
8080
}

0 commit comments

Comments
 (0)