Skip to content

Commit 83b6a56

Browse files
LukeButtersCopilotsburmanoctopus
authored
By default all Halibut tests will run with the Redis queue (#696)
* Test with redis queue * Fix responses with data streams * . * Fix QueuedUpRequestsShouldBeDequeuedInOrder * We must use BSON * . * . * . * Allow setting redis with env var * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * Update source/Halibut.Tests/PollingTentacleDequeuesRequestsInOrderFixture.cs Co-authored-by: Copilot <[email protected]> * Update source/Halibut/Queue/Redis/MessageStorage/RehydrateWithProgressReporting.cs Co-authored-by: Copilot <[email protected]> * . * Fix flaky test * . * Update source/Halibut.Tests/Queue/Redis/RedisPendingRequestQueueFixture.cs Co-authored-by: Stephen Burman <[email protected]> * Update source/Halibut.Tests/Queue/Redis/RedisPendingRequestQueueFixture.cs Co-authored-by: Stephen Burman <[email protected]> * . * . * . * . * . * . --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: Stephen Burman <[email protected]>
1 parent db30d0c commit 83b6a56

33 files changed

+581
-145
lines changed

.profanityignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.git/

source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public QueueHolder Build()
5959

6060
var disposableCollection = new DisposableCollection();
6161

62-
var redisFacade = new RedisFacade("localhost:" + RedisTestHost.Port(), (Guid.NewGuid()).ToString(), log);
62+
var redisFacade = RedisFacadeBuilder.CreateRedisFacade(port: RedisTestHost.Port());
6363
disposableCollection.AddAsyncDisposable(redisFacade);
6464

6565
var redisTransport = new HalibutRedisTransport(redisFacade);

source/Halibut.Tests/CancellationViaClientProxyFixture.cs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Halibut.Tests.TestServices.Async;
1313
using Halibut.TestUtils.Contracts;
1414
using Halibut.Transport.Protocol;
15+
using Halibut.Util;
1516
using NUnit.Framework;
1617

1718
namespace Halibut.Tests
@@ -26,9 +27,21 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_
2627
var tokenSourceToCancel = new CancellationTokenSource();
2728
var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token);
2829

30+
bool waitForItemToLandOnTheQueueBeforeCancellation = false;
31+
IPendingRequestQueue? pendingRequestQueue = null;
32+
2933
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
3034
.WithPortForwarding(out var portForwarderRef, port => PortForwarderUtil.ForwardingToLocalPort(port).Build())
3135
.WithStandardServices()
36+
.OnLatestClientAndLatestServiceBuilder(c =>
37+
{
38+
// Ideally should be done any time we are working with polling, but the other builder doesn't support that.
39+
waitForItemToLandOnTheQueueBeforeCancellation = clientAndServiceTestCase.PollingQueueTestCase != null;
40+
c.WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => inner.CaptureCreatedQueues(queue =>
41+
{
42+
pendingRequestQueue = queue;
43+
})));
44+
})
3245
.Build(CancellationToken))
3346
{
3447
portForwarderRef.Value.EnterKillNewAndExistingConnectionsMode();
@@ -38,10 +51,26 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_
3851
var echo = clientAndService.CreateAsyncClient<ICountingService, IAsyncClientCountingServiceWithOptions>(
3952
point => point.TryAndConnectForALongTime());
4053

54+
var task = Task.Run(() => echo.IncrementAsync(halibutRequestOption));
55+
56+
if (waitForItemToLandOnTheQueueBeforeCancellation)
57+
{
58+
await ShouldEventually.Eventually(() =>
59+
{
60+
pendingRequestQueue.Should().NotBeNull();
61+
pendingRequestQueue!.Count.Should().Be(1);
62+
}, TimeSpan.FromSeconds(20), CancellationToken);
63+
}
64+
65+
// We still wait 100ms like we used to, but this is not required when we wait for the item to land on the queue.
66+
// But doing that is not yet possible with LatestClientAndPreviousServiceVersionsTestCases.
67+
// And listening does not support that, since it has no queue.
4168
tokenSourceToCancel.CancelAfter(TimeSpan.FromMilliseconds(100));
4269

43-
(await AssertionExtensions.Should(() => echo.IncrementAsync(halibutRequestOption)).ThrowAsync<Exception>())
44-
.And.Should().Match(x => x is ConnectingRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Connecting")));
70+
var e =(await AssertionExtensions.Should(() => task).ThrowAsync<Exception>())
71+
.And;
72+
Logger.Information("Exception was: {Exception}", e);
73+
e.Should().Match(x => x is ConnectingRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Connecting")));
4574

4675
portForwarderRef.Value.ReturnToNormalMode();
4776

source/Halibut.Tests/PollingTentacleDequeuesRequestsInOrderFixture.cs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44
using System.Threading.Tasks;
55
using FluentAssertions;
66
using Halibut.ServiceModel;
7-
using Halibut.Tests.Builders;
87
using Halibut.Tests.Support;
98
using Halibut.Tests.Support.TestAttributes;
109
using Halibut.Tests.Support.TestCases;
1110
using Halibut.Tests.TestServices.Async;
12-
using Halibut.Tests.Util;
1311
using Halibut.TestUtils.Contracts;
12+
using Halibut.Util;
1413
using NUnit.Framework;
1514

1615
namespace Halibut.Tests
@@ -21,23 +20,18 @@ public class PollingTentacleDequeuesRequestsInOrderFixture : BaseTest
2120
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)]
2221
public async Task QueuedUpRequestsShouldBeDequeuedInOrder(ClientAndServiceTestCase clientAndServiceTestCase)
2322
{
24-
IPendingRequestQueue ?pendingRequestQueue = null;
23+
var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
24+
halibutTimeoutsAndLimits.PollingQueueWaitTimeout = TimeSpan.FromSeconds(1);
25+
IPendingRequestQueue? pendingRequestQueue = null;
2526
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
2627
.WithStandardServices()
2728
.AsLatestClientAndLatestServiceBuilder()
2829
.WithInstantReconnectPollingRetryPolicy()
29-
.WithPendingRequestQueueFactory(logFactory =>
30+
.WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits)
31+
.WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => inner.CaptureCreatedQueues(queue =>
3032
{
31-
return new FuncPendingRequestQueueFactory(uri =>
32-
{
33-
pendingRequestQueue = new PendingRequestQueueBuilder()
34-
.WithLog(logFactory.ForEndpoint(uri))
35-
.WithPollingQueueWaitTimeout(TimeSpan.FromSeconds(1))
36-
.Build()
37-
.PendingRequestQueue;
38-
return pendingRequestQueue;
39-
});
40-
})
33+
pendingRequestQueue = queue;
34+
})))
4135
.Build(CancellationToken))
4236
{
4337
var echoService = clientAndService.CreateAsyncClient<IEchoService, IAsyncClientEchoService>();
@@ -61,7 +55,10 @@ await Wait.For(async () =>
6155

6256

6357
var countingService = clientAndService.CreateAsyncClient<ICountingService, IAsyncClientCountingService>();
64-
58+
59+
// The queues don't all work the same with the Count operator, this account for that.
60+
int baseCount = pendingRequestQueue!.Count;
61+
6562
var tasks = new List<Task<int>>();
6663
for (int i = 0; i < 10; i++)
6764
{
@@ -73,7 +70,8 @@ await Wait.For(async () =>
7370
#pragma warning disable VSTHRD003 // Avoid awaiting foreign Tasks
7471
await task.AwaitIfFaulted();
7572
#pragma warning restore VSTHRD003 // Avoid awaiting foreign Tasks
76-
return pendingRequestQueue!.Count == i + 1;
73+
74+
return pendingRequestQueue!.Count - baseCount == i + 1;
7775
}, CancellationToken);
7876
}
7977

source/Halibut.Tests/Queue/Redis/MessageSerialiserAndDataStreamStorageExceptionObserverTest.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
using Halibut.Queue.Redis.RedisHelpers;
1616
using Halibut.Tests.Builders;
1717
using Halibut.Tests.Queue.Redis.Utils;
18-
using Halibut.Tests.Support;
1918
using Halibut.Tests.Support.Logging;
20-
using Halibut.Tests.Support.TestAttributes;
2119
using Halibut.Tests.Util;
2220
using Halibut.TestUtils.Contracts;
2321
using Halibut.Transport.Protocol;
@@ -99,7 +97,7 @@ public ThrowingStoreDataStreamsForDistributedQueues(Exception exceptionToThrow)
9997
this.exceptionToThrow = exceptionToThrow;
10098
}
10199

102-
public Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
100+
public Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, bool useReceiver, CancellationToken cancellationToken)
103101
{
104102
throw exceptionToThrow;
105103
}

source/Halibut.Tests/Queue/Redis/RedisPendingRequestQueueFixture.cs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ static Exception CreateExceptionFromResponse(ResponseMessage responseThatWouldNo
941941
}
942942

943943
[Test]
944-
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, testWebSocket: false)]
944+
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, testWebSocket: false, pollingQueuesToTest: PollingQueuesToTest.RedisOnly)]
945945
public async Task WhenUsingTheRedisQueue_ASimpleEchoServiceCanBeCalled(ClientAndServiceTestCase clientAndServiceTestCase)
946946
{
947947
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade();
@@ -951,7 +951,7 @@ public async Task WhenUsingTheRedisQueue_ASimpleEchoServiceCanBeCalled(ClientAnd
951951
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
952952
.WithStandardServices()
953953
.AsLatestClientAndLatestServiceBuilder()
954-
.WithPendingRequestQueueFactory((queueMessageSerializer, logFactory) =>
954+
.WithPendingRequestQueueFactory((_, queueMessageSerializer, logFactory) =>
955955
new RedisPendingRequestQueueFactory(
956956
queueMessageSerializer,
957957
dataStreamStore,
@@ -970,7 +970,7 @@ public async Task WhenUsingTheRedisQueue_ASimpleEchoServiceCanBeCalled(ClientAnd
970970
}
971971

972972
[Test]
973-
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, testWebSocket: false)]
973+
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, testWebSocket: false, pollingQueuesToTest: PollingQueuesToTest.RedisOnly)]
974974
public async Task WhenUsingTheRedisQueue_StreamsCanBeSentWithProgressReporting(ClientAndServiceTestCase clientAndServiceTestCase)
975975
{
976976
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade();
@@ -1005,7 +1005,7 @@ public async Task WhenUsingTheRedisQueue_StreamsCanBeSentWithProgressReporting(C
10051005
.Build())
10061006
.Build())
10071007
.Build())
1008-
.WithPendingRequestQueueFactory((queueMessageSerializer, logFactory) =>
1008+
.WithPendingRequestQueueFactory((_, queueMessageSerializer, logFactory) =>
10091009
new RedisPendingRequestQueueFactory(
10101010
queueMessageSerializer,
10111011
dataStreamStore,
@@ -1059,6 +1059,57 @@ await ShouldEventually.Eventually(() =>
10591059
currentProgress.Should().Be(100, "at the end of the clal the upload progress should be 100");
10601060
}
10611061
}
1062+
1063+
[Test]
1064+
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, testWebSocket: false, pollingQueuesToTest: PollingQueuesToTest.RedisOnly)]
1065+
public async Task WhenDataStreamsAreSentAndReceived_TheDisposablesInTheDataStreamStorageAreInvoked(ClientAndServiceTestCase clientAndServiceTestCase)
1066+
{
1067+
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade();
1068+
var redisTransport = new HalibutRedisTransport(redisFacade);
1069+
int disposablesCreated = 0;
1070+
int disposablesDisposed = 0;
1071+
var dataStreamStore = new WithDisposablesDataStreamStorage(() =>
1072+
{
1073+
Interlocked.Increment(ref disposablesCreated);
1074+
return new FuncAsyncDisposable(async () =>
1075+
{
1076+
Interlocked.Increment(ref disposablesDisposed);
1077+
await Task.CompletedTask;
1078+
});
1079+
});
1080+
1081+
Reference<PortForwarder> portForwarder = new Reference<PortForwarder>();
1082+
var aboutHalfTheDataStreamHasBeenSentWaiter = new AsyncManualResetEvent();
1083+
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
1084+
.WithStandardServices()
1085+
.AsLatestClientAndLatestServiceBuilder()
1086+
.WithPendingRequestQueueFactory((_, queueMessageSerializer, logFactory) =>
1087+
new RedisPendingRequestQueueFactory(
1088+
queueMessageSerializer,
1089+
dataStreamStore,
1090+
new RedisNeverLosesData(),
1091+
redisTransport,
1092+
new HalibutTimeoutsAndLimits(),
1093+
logFactory)
1094+
.WithWaitForReceiverToBeReady()
1095+
.WithQueueCreationCallBack(queue =>
1096+
{
1097+
// Lower these timeouts to get faster feedback on the upload progress.
1098+
queue.RequestReceiverNodeHeartBeatRate = TimeSpan.FromMilliseconds(100);
1099+
queue.TimeBetweenCheckingIfRequestWasCollected = TimeSpan.FromMilliseconds(100);
1100+
}))
1101+
.Build(CancellationToken))
1102+
{
1103+
1104+
var complexObjectService = clientAndService.CreateAsyncClient<IComplexObjectService, IAsyncClientComplexObjectService>();
1105+
var response = await complexObjectService.ProcessAsync(new ComplexObjectMultipleDataStreams(DataStream.FromString("Hello"), DataStream.FromString("World")));
1106+
(await response.Payload1!.ReadAsString(CancellationToken)).Should().Be("Hello");
1107+
(await response.Payload2!.ReadAsString(CancellationToken)).Should().Be("World");
1108+
1109+
disposablesCreated.Should().Be(4, "Since we send 2 data streams and receive 2 data streams.");
1110+
disposablesDisposed.Should().Be(4, "Since we send 2 data streams and receive 2 data streams.");
1111+
}
1112+
}
10621113

10631114
[Test]
10641115
public async Task CancellingARequestShouldResultInTheDequeuedResponseTokenBeingCancelled()

source/Halibut.Tests/Queue/Redis/Utils/InMemoryStoreDataStreamsForDistributedQueues.cs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,27 @@ namespace Halibut.Tests.Queue.Redis.Utils
1111
public class InMemoryStoreDataStreamsForDistributedQueues : IStoreDataStreamsForDistributedQueues
1212
{
1313
readonly IDictionary<Guid, byte[]> dataStreamsStored = new Dictionary<Guid, byte[]>();
14-
public async Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
14+
public async Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, bool useReceiver, CancellationToken cancellationToken)
1515
{
1616
foreach (var dataStream in dataStreams)
1717
{
1818
using var memoryStream = new MemoryStream();
19-
await dataStream.WriteData(memoryStream, cancellationToken);
19+
if (useReceiver)
20+
{
21+
await dataStream.Receiver().ReadAsync(async (stream, ct) =>
22+
{
23+
#if NET8_0_OR_GREATER
24+
await stream.CopyToAsync(memoryStream, ct);
25+
#else
26+
await Task.CompletedTask;
27+
throw new NotImplementedException("Redis PRQ is not supported in net48");
28+
#endif
29+
}, cancellationToken);
30+
}
31+
else
32+
{
33+
await dataStream.WriteData(memoryStream, cancellationToken);
34+
}
2035
dataStreamsStored[dataStream.Id] = memoryStream.ToArray();
2136
}
2237

source/Halibut.Tests/Queue/Redis/Utils/JsonStoreDataStreamsForDistributedQueues.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,27 @@ namespace Halibut.Tests.Queue.Redis.Utils
1616
/// </summary>
1717
public class JsonStoreDataStreamsForDistributedQueues : IStoreDataStreamsForDistributedQueues
1818
{
19-
public async Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
19+
public async Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, bool useReceiver, CancellationToken cancellationToken)
2020
{
2121
var dataStreamData = new Dictionary<Guid, string>();
2222

2323
foreach (var dataStream in dataStreams)
2424
{
2525
using var memoryStream = new MemoryStream();
26-
await dataStream.WriteData(memoryStream, cancellationToken);
26+
if (useReceiver)
27+
{
28+
#if NET8_0_OR_GREATER
29+
await dataStream.Receiver().ReadAsync(async (s, ct) => await s.CopyToAsync(memoryStream, ct), cancellationToken);
30+
#else
31+
await Task.CompletedTask;
32+
throw new NotImplementedException("Redis PRQ is not supported in net48");
33+
#endif
34+
}
35+
else
36+
{
37+
await dataStream.WriteData(memoryStream, cancellationToken);
38+
}
39+
2740
var bytes = memoryStream.ToArray();
2841
var base64Data = Convert.ToBase64String(bytes);
2942
dataStreamData[dataStream.Id] = base64Data;

0 commit comments

Comments
 (0)