diff --git a/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamReader.cs b/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamReader.cs index 8153adce1..1d35fdafd 100644 --- a/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamReader.cs +++ b/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamReader.cs @@ -58,7 +58,8 @@ async Task MoveNextAsync(ValueTask readStreamTask) if (_completed || _serverCallContext.CancellationToken.IsCancellationRequested) { - return Task.FromException(new InvalidOperationException("Can't read messages after the request is complete.")); + // gRPC specification indicates that MoveNext() should not throw. Simply return false. + return CommonGrpcProtocolHelpers.FalseTask; } var request = _serverCallContext.HttpContext.Request.BodyReader.ReadStreamMessageAsync(_serverCallContext, _deserializer, cancellationToken); diff --git a/test/FunctionalTests/Client/StreamingTests.cs b/test/FunctionalTests/Client/StreamingTests.cs index da3af701a..8ddeabee4 100644 --- a/test/FunctionalTests/Client/StreamingTests.cs +++ b/test/FunctionalTests/Client/StreamingTests.cs @@ -42,7 +42,7 @@ namespace Grpc.AspNetCore.FunctionalTests.Client public class StreamingTests : FunctionalTestBase { [Test] - public async Task DuplexStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Success() + public async Task DuplexStream_SendLargeFileBatchedAndReceiveLargeFileBatched_Success() { // Arrange var data = CreateTestData(1024 * 1024 * 1); // 1 MB @@ -306,7 +306,7 @@ await call.RequestStream.WriteAsync(new DataMessage [TestCase(1)] [TestCase(5)] [TestCase(20)] - public async Task DuplexStreaming_SimultaniousSendAndReceiveInParallel_Success(int tasks) + public async Task DuplexStreaming_SimultaneousSendAndReceiveInParallel_Success(int tasks) { // Arrange const int total = 1024 * 1024 * 1; @@ -316,7 +316,7 @@ public async Task DuplexStreaming_SimultaniousSendAndReceiveInParallel_Success(i var client = new StreamService.StreamServiceClient(Channel); - await TestHelpers.RunParallel(tasks, async taskIndex => + await TestHelpers.RunParallel(tasks, async _ => { var (sent, received) = await EchoData(total, data, client).DefaultTimeout(); @@ -421,7 +421,7 @@ await TestHelpers.AssertIsTrueRetryAsync( [Test] public async Task DuplexStreaming_ParallelCallsFromOneChannel_Success() { - async Task UnaryDeadlineExceeded(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + static async Task UnaryDeadlineExceeded(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { await foreach (var message in requestStream.ReadAllAsync()) { @@ -460,7 +460,7 @@ async Task UnaryDeadlineExceeded(IAsyncStreamReader requestStream, [Test] public async Task ServerStreaming_GetTrailersAndStatus_Success() { - async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter responseStream, ServerCallContext context) + static async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter responseStream, ServerCallContext context) { await responseStream.WriteAsync(new DataMessage()); context.ResponseTrailers.Add("my-trailer", "value"); @@ -625,7 +625,7 @@ async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter< [TestCase(true)] [TestCase(false)] - public async Task ClientStreaming_ReadAfterMethodComplete_Error(bool readBeforeExit) + public async Task ClientStreaming_ReadAfterMethodComplete_False(bool readBeforeExit) { SetExpectedErrorsFilter(writeContext => { @@ -641,7 +641,7 @@ public async Task ClientStreaming_ReadAfterMethodComplete_Error(bool readBeforeE var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var readTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var syncPoint = new SyncPoint(runContinuationsAsynchronously: true); - async Task ClientStreamingWithTrailers(IAsyncStreamReader requestStream, ServerCallContext context) + async Task ClientStreamingWithTrailersAsync(IAsyncStreamReader requestStream, ServerCallContext context) { var readTask = Task.Run(async () => { @@ -661,7 +661,7 @@ async Task ClientStreamingWithTrailers(IAsyncStreamReader(ClientStreamingWithTrailers); + var method = Fixture.DynamicGrpc.AddClientStreamingMethod(ClientStreamingWithTrailersAsync); var channel = CreateChannel(); @@ -680,13 +680,13 @@ async Task ClientStreamingWithTrailers(IAsyncStreamReader(() => readTask).DefaultTimeout(); - Assert.AreEqual("Can't read messages after the request is complete.", ex.Message); + await readTask.DefaultTimeout(); var clientException = await ExceptionAssert.ThrowsAsync(() => call.RequestStream.WriteAsync(new DataMessage())).DefaultTimeout(); Assert.AreEqual(StatusCode.OK, clientException.StatusCode); @@ -694,7 +694,7 @@ async Task ClientStreamingWithTrailers(IAsyncStreamReader { @@ -759,8 +759,7 @@ async Task ClientStreamingWithTrailers(IAsyncStreamReader(() => readTask).DefaultTimeout(); - Assert.AreEqual("Can't read messages after the request is complete.", serverException.Message); + await readTask; // Ensure the server abort reaches the client await Task.Delay(100);