diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift index acc3aff5e7..8e90a7dfba 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift @@ -77,6 +77,22 @@ public protocol NIOAsyncSequenceProducerDelegate: Sendable { /// /// - Note: This is guaranteed to be called _exactly_ once. func didTerminate() + + /// This method is called once the ``NIOAsyncSequenceProducer`` is terminated. + /// + /// Termination happens if: + /// - The ``NIOAsyncSequenceProducer/AsyncIterator`` is deinited. + /// - The ``NIOAsyncSequenceProducer`` deinited and no iterator is alive. + /// - The consuming `Task` is cancelled (e.g. `for await let element in`). + /// - The source finished and all remaining buffered elements have been consumed. + /// + /// - Note: This is guaranteed to be called _exactly_ once. + func didTerminate(remainingBuffer: some Collection) +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension NIOAsyncSequenceProducerDelegate { + public func didTerminate(remainingBuffer: some Collection) { self.didTerminate() } } /// This is an `AsyncSequence` that supports a unicast `AsyncIterator`. diff --git a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift index b807f6f407..8102846b29 100644 --- a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift @@ -448,21 +448,21 @@ extension NIOThrowingAsyncSequenceProducer { @inlinable internal func sequenceDeinitialized() { - let delegate: Delegate? = self._state.withLockedValue { + let (delegate, buffer): (Delegate?, Deque) = self._state.withLockedValue { let action = $0.stateMachine.sequenceDeinitialized() switch action { - case .callDidTerminate: + case .callDidTerminate(let buffer): let delegate = $0.delegate $0.delegate = nil - return delegate + return (delegate, buffer) case .none: - return nil + return (nil, .init()) } } - delegate?.didTerminate() + delegate?.didTerminate(remainingBuffer: buffer.map { $0 }) } @inlinable @@ -474,22 +474,22 @@ extension NIOThrowingAsyncSequenceProducer { @inlinable internal func iteratorDeinitialized() { - let delegate: Delegate? = self._state.withLockedValue { + let (delegate, buffer): (Delegate?, Deque) = self._state.withLockedValue { let action = $0.stateMachine.iteratorDeinitialized() switch action { - case .callDidTerminate: + case .callDidTerminate(let buffer): let delegate = $0.delegate $0.delegate = nil - return delegate + return (delegate, buffer) case .none: - return nil + return (nil, .init()) } } - delegate?.didTerminate() + delegate?.didTerminate(remainingBuffer: buffer.map { $0 } ) } @inlinable @@ -557,7 +557,8 @@ extension NIOThrowingAsyncSequenceProducer { break } - delegate?.didTerminate() + // We don't have a buffer in this case + delegate?.didTerminate(remainingBuffer: []) } @inlinable @@ -585,7 +586,7 @@ extension NIOThrowingAsyncSequenceProducer { return element - case .returnFailureAndCallDidTerminate(let failure): + case .returnFailureAndCallDidTerminate(let failure, let buffer): let delegate = unsafe.withValueAssumingLockIsAcquired { let delegate = $0.delegate $0.delegate = nil @@ -593,7 +594,7 @@ extension NIOThrowingAsyncSequenceProducer { } unsafe.unlock() - delegate?.didTerminate() + delegate?.didTerminate(remainingBuffer: buffer.map { $0 }) switch failure { case .some(let error): @@ -679,10 +680,10 @@ extension NIOThrowingAsyncSequenceProducer { } switch action { - case .callDidTerminate: - break + case .callDidTerminate(let buffer): + delegate?.didTerminate(remainingBuffer: buffer.map { $0 }) - case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation): + case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation, let buffer): // We have deprecated the generic Failure type in the public API and Failure should // now be `Swift.Error`. However, if users have not migrated to the new API they could // still use a custom generic Error type and this cast might fail. @@ -696,12 +697,12 @@ extension NIOThrowingAsyncSequenceProducer { } else { continuation.resume(returning: nil) } + + delegate?.didTerminate(remainingBuffer: buffer.map { $0 }) case .none: - break + delegate?.didTerminate() } - - delegate?.didTerminate() } } } @@ -781,7 +782,7 @@ extension NIOThrowingAsyncSequenceProducer { @usableFromInline enum SequenceDeinitializedAction { /// Indicates that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called. - case callDidTerminate + case callDidTerminate(buffer: Deque) /// Indicates that nothing should be done. case none } @@ -790,13 +791,14 @@ extension NIOThrowingAsyncSequenceProducer { mutating func sequenceDeinitialized() -> SequenceDeinitializedAction { switch self._state { case .initial(_, iteratorInitialized: false), - .streaming(_, _, _, _, iteratorInitialized: false), - .sourceFinished(_, iteratorInitialized: false, _), .cancelled(iteratorInitialized: false): // No iterator was created so we can transition to finished right away. self._state = .finished(iteratorInitialized: false) - - return .callDidTerminate + return .callDidTerminate(buffer: .init()) + + case .streaming(_, let buffer, _, _, iteratorInitialized: false), + .sourceFinished(let buffer, iteratorInitialized: false, _): + return .callDidTerminate(buffer: buffer) case .initial(_, iteratorInitialized: true), .streaming(_, _, _, _, iteratorInitialized: true), @@ -871,7 +873,7 @@ extension NIOThrowingAsyncSequenceProducer { @usableFromInline enum IteratorDeinitializedAction { /// Indicates that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called. - case callDidTerminate + case callDidTerminate(buffer: Deque) /// Indicates that nothing should be done. case none } @@ -887,14 +889,17 @@ extension NIOThrowingAsyncSequenceProducer { preconditionFailure("Internal inconsistency") case .initial(_, iteratorInitialized: true), - .streaming(_, _, _, _, iteratorInitialized: true), - .sourceFinished(_, iteratorInitialized: true, _), .cancelled(iteratorInitialized: true): // An iterator was created and deinited. Since we only support // a single iterator we can now transition to finish and inform the delegate. self._state = .finished(iteratorInitialized: true) - return .callDidTerminate + return .callDidTerminate(buffer: .init()) + + case .streaming(_, let buffer, _, _, iteratorInitialized: true), + .sourceFinished(let buffer, iteratorInitialized: true, _): + + return .callDidTerminate(buffer: buffer) case .finished: // We are already finished so there is nothing left to clean up. @@ -1091,10 +1096,10 @@ extension NIOThrowingAsyncSequenceProducer { @usableFromInline enum CancelledAction { /// Indicates that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called. - case callDidTerminate + case callDidTerminate(buffer: Deque) /// Indicates that the continuation should be resumed with a `CancellationError` and /// that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called. - case resumeContinuationWithCancellationErrorAndCallDidTerminate(CheckedContinuation) + case resumeContinuationWithCancellationErrorAndCallDidTerminate(CheckedContinuation, buffer: Deque) /// Indicates that nothing should be done. case none } @@ -1125,20 +1130,20 @@ extension NIOThrowingAsyncSequenceProducer { return .none - case .streaming(_, _, .some(let continuation), _, let iteratorInitialized): + case .streaming(_, let buffer, .some(let continuation), _, let iteratorInitialized): // We have an outstanding continuation that needs to resumed // and we can transition to finished here and inform the delegate self._state = .finished(iteratorInitialized: iteratorInitialized) - return .resumeContinuationWithCancellationErrorAndCallDidTerminate(continuation) + return .resumeContinuationWithCancellationErrorAndCallDidTerminate(continuation, buffer: buffer) - case .streaming(_, _, continuation: .none, _, let iteratorInitialized): + case .streaming(_, let buffer, continuation: .none, _, let iteratorInitialized): // We may have elements in the buffer, which is why we have no continuation // waiting. We must store the cancellation error to hand it out on the next // next() call. self._state = .cancelled(iteratorInitialized: iteratorInitialized) - return .callDidTerminate + return .callDidTerminate(buffer: buffer) case .cancelled, .sourceFinished, .finished: // If the source has finished, finishing again has no effect. @@ -1159,7 +1164,7 @@ extension NIOThrowingAsyncSequenceProducer { case returnElementAndCallProduceMore(Element) /// Indicates that the `Failure` should be returned to the caller and /// that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called. - case returnFailureAndCallDidTerminate(Failure?) + case returnFailureAndCallDidTerminate(Failure?, buffer: Deque) /// Indicates that the next call to AsyncSequence got cancelled case returnCancellationError /// Indicates that the `nil` should be returned to the caller. @@ -1248,7 +1253,7 @@ extension NIOThrowingAsyncSequenceProducer { // We are returning the queued failure now and can transition to finished self._state = .finished(iteratorInitialized: iteratorInitialized) - return .returnFailureAndCallDidTerminate(failure) + return .returnFailureAndCallDidTerminate(failure, buffer: buffer) } case .cancelled(let iteratorInitialized): diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift index 2def03562a..534978d641 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift @@ -53,9 +53,9 @@ final class MockNIOElementStreamBackPressureStrategy: NIOAsyncSequenceProducerBa @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) final class MockNIOBackPressuredStreamSourceDelegate: NIOAsyncSequenceProducerDelegate, @unchecked Sendable { - enum Event { + enum Event: Hashable { case produceMore - case didTerminate + case didTerminate(buffer: [Int]) } let events: AsyncStream private let eventsContinuation: AsyncStream.Continuation @@ -74,13 +74,17 @@ final class MockNIOBackPressuredStreamSourceDelegate: NIOAsyncSequenceProducerDe } } - var didTerminateHandler: (() -> Void)? - func didTerminate() { - self.eventsContinuation.yield(.didTerminate) + var didTerminateHandler: (([Int]) -> Void)? + func didTerminate(remainingBuffer: some Collection) { + self.eventsContinuation.yield(.didTerminate(buffer: remainingBuffer.map { $0 as! Int })) if let didTerminateHandler = self.didTerminateHandler { - return didTerminateHandler() + return didTerminateHandler(remainingBuffer.map { $0 as! Int }) } } + + func didTerminate() { + fatalError() + } } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -182,7 +186,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertEqualWithoutAutoclosure(await iterator.next(), 8) source.finish() XCTAssertEqualWithoutAutoclosure(await iterator.next(), nil) - XCTAssertEqualWithoutAutoclosure(await eventsIterator.next(), .didTerminate) + XCTAssertEqualWithoutAutoclosure(await eventsIterator.next(), .didTerminate(buffer: [])) } // MARK: - Yield @@ -324,7 +328,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { self.source.finish() XCTAssertEqualWithoutAutoclosure(await element, nil) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinish_whenStreaming_andNotSuspended_andBufferEmpty() async throws { @@ -334,7 +338,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let element = await self.sequence.first { _ in true } XCTAssertNil(element) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinish_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { @@ -345,7 +349,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let element = await self.sequence.first { _ in true } XCTAssertEqual(element, 1) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinish_whenFinished() async throws { @@ -353,7 +357,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { _ = await self.sequence.first { _ in true } - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) self.source.finish() } @@ -415,7 +419,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(element, nil) XCTAssertNil(source) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws { @@ -447,7 +451,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { } XCTAssertNil(element) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { @@ -480,7 +484,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(element, 1) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } // MARK: - Task cancel @@ -500,9 +504,30 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { task.cancel() let value = await task.value - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) XCTAssertNil(value) } + + func testTaskCancel_whenYielded_andSuspended() async throws { + let sequence = try XCTUnwrap(self.sequence) + + let suspended = expectation(description: "task suspended") + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } + + let task: Task = Task { + let iterator = sequence.makeAsyncIterator() + return await iterator.next() + } + + await fulfillment(of: [suspended], timeout: 1) + + _ = self.source.yield(1) + _ = self.source.yield(2) + task.cancel() + let value = await task.value + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [2])]) + XCTAssertEqual(value, 1) + } func testTaskCancel_whenStreaming_andNotSuspended() async throws { let sequence = try XCTUnwrap(self.sequence) @@ -531,7 +556,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { cancelled.fulfill() let value = await task.value - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) XCTAssertEqual(value, 1) } @@ -550,7 +575,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { self.source.finish() - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) task.cancel() let value = await task.value XCTAssertNil(value) @@ -701,7 +726,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { func testSequenceDeinitialized() async { self.sequence = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testSequenceDeinitialized_whenIteratorReferenced() async { @@ -711,7 +736,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } // MARK: - IteratorDeinitialized @@ -721,7 +746,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) self.sequence = nil } @@ -734,7 +759,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testIteratorDeinitialized_whenStreaming() async { @@ -744,7 +769,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [1])]) } } diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift index 888361a82e..9d4765d997 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift @@ -271,7 +271,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { } XCTAssertEqual(element, nil) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinish_whenStreaming_andNotSuspended_andBufferEmpty() async throws { @@ -289,7 +289,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { } XCTAssertNil(element) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinish_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { @@ -308,7 +308,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(element, 1) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinish_whenFinished() async throws { @@ -316,7 +316,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { _ = try await self.sequence.first { _ in true } - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) self.source.finish() } @@ -330,7 +330,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(error as? ChannelError, .alreadyClosed) } - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinishError_whenStreaming_andSuspended() async throws { @@ -355,7 +355,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(error as? ChannelError, .alreadyClosed) } - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinishError_whenStreaming_andNotSuspended_andBufferEmpty() async throws { @@ -376,7 +376,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(error as? ChannelError, .alreadyClosed) } - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinishError_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { @@ -397,7 +397,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { } XCTAssertEqual(elements, [1]) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testFinishError_whenFinished() async throws { @@ -405,7 +405,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let iterator = self.sequence.makeAsyncIterator() _ = try await iterator.next() - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) self.source.finish(ChannelError.alreadyClosed) @@ -474,7 +474,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(element, nil) XCTAssertNil(source) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws { @@ -507,7 +507,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { } XCTAssertNil(element) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { @@ -541,7 +541,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(element, 1) - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } // MARK: - Task cancel @@ -561,7 +561,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { task.cancel() let result = await task.result - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) await XCTAssertThrowsError(try result.get()) { error in XCTAssertTrue(error is CancellationError) } @@ -592,7 +592,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { task.cancel() let result = await task.result - XCTAssertEqualWithoutAutoclosure(await delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) try withExtendedLifetime(new.source) { XCTAssertNil(try result.get()) @@ -625,7 +625,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { cancelled.fulfill() let value = try await task.value - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) XCTAssertEqual(value, 1) } @@ -643,7 +643,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { await fulfillment(of: [suspended], timeout: 1) self.source.finish() - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) task.cancel() let value = try await task.value XCTAssertNil(value) @@ -831,7 +831,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { func testSequenceDeinitialized() async { self.sequence = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testSequenceDeinitialized_whenIteratorReferenced() async { @@ -841,7 +841,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } // MARK: - IteratorDeinitialized @@ -851,7 +851,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) self.sequence = nil } @@ -864,7 +864,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [])]) } func testIteratorDeinitialized_whenStreaming() async { @@ -874,7 +874,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertNotNil(iterator) iterator = nil - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [1])]) } func testIteratorThrows_whenCancelled() async { @@ -905,7 +905,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { group.cancelAll() } - XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate]) + XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate(buffer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100])]) } }