diff --git a/Sources/StreamVideo/StreamVideo.swift b/Sources/StreamVideo/StreamVideo.swift index 93482f449..60e4a24de 100644 --- a/Sources/StreamVideo/StreamVideo.swift +++ b/Sources/StreamVideo/StreamVideo.swift @@ -530,14 +530,11 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { log.debug("Listening for WS connection") do { - var cancellable: AnyCancellable? log.debug("Listening for WS connection") _ = try await DefaultTimer .publish(every: 0.1) .filter { [weak webSocketClient] _ in webSocketClient?.connectionState.isConnected == true } - .nextValue(timeout: 30) { cancellable = $0 } - cancellable?.cancel() - cancellable = nil + .nextValue(timeout: 30) } catch { log.debug("Timeout while waiting for WS connection opening") throw ClientError.NetworkError() @@ -583,21 +580,16 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { return "" } - var cancellable: AnyCancellable? do { let result = try await DefaultTimer .publish(every: 0.1) .log(.debug) { _ in "Waiting for connection id" } .compactMap { [weak self] _ in self?.loadConnectionIdFromHealthcheck() } - .nextValue(timeout: 5) { cancellable = $0 } + .nextValue(timeout: 5) defer { log.debug("ConnectionId loaded: \(result)") } - cancellable?.cancel() - cancellable = nil return result } catch { log.warning("Unable to load connectionId.") - cancellable?.cancel() - cancellable = nil return "" } } diff --git a/Sources/StreamVideo/Utils/Extensions/Combine/Publisher+AsyncStream.swift b/Sources/StreamVideo/Utils/Extensions/Combine/Publisher+AsyncStream.swift index 20e2888c4..62b917baa 100644 --- a/Sources/StreamVideo/Utils/Extensions/Combine/Publisher+AsyncStream.swift +++ b/Sources/StreamVideo/Utils/Extensions/Combine/Publisher+AsyncStream.swift @@ -3,6 +3,7 @@ // import Combine +import Foundation public extension Publisher where Output: Sendable { @@ -28,4 +29,61 @@ public extension Publisher where Output: Sendable { } } } + + func firstValue( + file: StaticString = #file, + line: UInt = #line + ) async throws -> Output { + if #available(iOS 15.0, *) { + for try await value in self.values { + return value + } + } else { + for try await value in eraseAsAsyncStream() { + return value + } + } + + throw ClientError("Task produced no value.", file, line) + } + + func firstValue( + timeoutInSeconds: TimeInterval, + file: StaticString = #file, + function: StaticString = #function, + line: UInt = #line + ) async throws -> Output { + nonisolated(unsafe) let selfReference = self + return try await Task( + timeoutInSeconds: timeoutInSeconds, + file: file, + function: function, + line: line + ) { + try await selfReference.firstValue(file: file, line: line) + }.value + } + + func nextValue( + dropFirst: Int = 0, + timeout: TimeInterval? = nil, + file: StaticString = #fileID, + function: StaticString = #function, + line: UInt = #line + ) async throws -> Output { + let publisher = dropFirst > 0 + ? self.dropFirst(dropFirst).eraseToAnyPublisher() + : eraseToAnyPublisher() + + if let timeout { + return try await publisher.firstValue( + timeoutInSeconds: timeout, + file: file, + function: function, + line: line + ) + } else { + return try await publisher.firstValue() + } + } } diff --git a/Sources/StreamVideo/Utils/StateMachine/Publisher+NextValue.swift b/Sources/StreamVideo/Utils/StateMachine/Publisher+NextValue.swift deleted file mode 100644 index f4af78c26..000000000 --- a/Sources/StreamVideo/Utils/StateMachine/Publisher+NextValue.swift +++ /dev/null @@ -1,82 +0,0 @@ -// -// Copyright © 2025 Stream.io Inc. All rights reserved. -// - -import Combine -import Foundation - -extension Publisher where Output: Sendable { - - /// Retrieves the next value from the publisher after optionally skipping the initial values. - /// - /// - Parameter dropFirst: The number of initial values to skip. Defaults to 0. - /// - Returns: The next value emitted by the publisher. - /// - Throws: An error if the publisher completes with a failure. - /// - /// - Important: When subscribing to a timer use the registrationHandler to receive the reference - /// to the cancellable, so you can effectively cancel it. Otherwise the Timer will keep posting updates - func nextValue( - dropFirst: Int = 0, - timeout: TimeInterval? = nil, - registrationHandler: ((AnyCancellable) -> Void)? = nil, - file: StaticString = #fileID, - function: StaticString = #function, - line: UInt = #line - ) async throws -> Output { - try await withCheckedThrowingContinuation { continuation in - var cancellable: AnyCancellable? - var receivedValue = false - var timeoutWorkItem: DispatchWorkItem? - - if let timeout = timeout { - let workItem = DispatchWorkItem { - cancellable?.cancel() - continuation.resume( - throwing: ClientError("Operation timed out", file, line) - ) - } - timeoutWorkItem = workItem - DispatchQueue - .global() - .asyncAfter(deadline: .now() + timeout, execute: workItem) - } - - let publisher = dropFirst > 0 - ? self.dropFirst(dropFirst).eraseToAnyPublisher() - : self.eraseToAnyPublisher() - - let _cancellable = publisher - .sink( - receiveCompletion: { completion in - timeoutWorkItem?.cancel() - switch completion { - case .finished: - if !receivedValue { - continuation - .resume( - throwing: ClientError( - "Publisher completed with no value", - file, - line - ) - ) - } - case let .failure(error): - if !receivedValue { - continuation.resume(throwing: error) - } - } - cancellable?.cancel() - }, - receiveValue: { value in - timeoutWorkItem?.cancel() - guard !receivedValue else { return } - receivedValue = true - continuation.resume(returning: value) - } - ) - cancellable = _cancellable - registrationHandler?(_cancellable) - } - } -} diff --git a/Sources/StreamVideo/WebRTC/v2/Extensions/Foundation/Task+Timeout.swift b/Sources/StreamVideo/WebRTC/v2/Extensions/Foundation/Task+Timeout.swift index 68b882c34..f86ed104b 100644 --- a/Sources/StreamVideo/WebRTC/v2/Extensions/Foundation/Task+Timeout.swift +++ b/Sources/StreamVideo/WebRTC/v2/Extensions/Foundation/Task+Timeout.swift @@ -5,47 +5,45 @@ import Foundation /// An extension to the `Task` type that adds timeout functionality. -extension Task where Failure == Error { - /// An error type representing a timeout condition. - enum TimeoutError: Error { - /// Indicates that the operation has timed out. - case timedOut - } - - /// Initializes a new task with a timeout. - /// - /// This initializer creates a new task that will execute the given operation - /// with a specified timeout. If the operation doesn't complete within the - /// timeout period, a `TimeoutError` will be thrown. - /// - /// - Parameters: - /// - timeout: The maximum duration (in seconds) to wait for the operation to complete. - /// - operation: The asynchronous operation to perform. - /// - /// - Returns: A new `Task` instance that will execute the operation with the specified timeout. - /// - /// - Throws: `TimeoutError.timedOut` if the operation doesn't complete within the specified timeout. - /// - /// - Note: This implementation uses a task group to manage concurrent execution - /// of the main operation and the timeout timer. +extension Task where Failure == any Error { + @discardableResult init( - timeout: TimeInterval, + priority: TaskPriority? = nil, + /// New: a timeout property to configure how long a task may perform before failing with a timeout error. + timeoutInSeconds: TimeInterval, + file: StaticString = #fileID, + function: StaticString = #function, + line: UInt = #line, operation: @Sendable @escaping () async throws -> Success ) { - self.init { + self = Task(priority: priority) { try await withThrowingTaskGroup(of: Success.self) { group in - group.addTask { + + /// Add the operation to perform as the first task. + _ = group.addTaskUnlessCancelled { try await operation() } - group.addTask { - try await Task.sleep( - nanoseconds: UInt64( - timeout * 1_000_000_000 - ) - ) - throw TimeoutError.timedOut + + if timeoutInSeconds > 0, timeoutInSeconds <= TimeInterval(UInt64.max) { + /// Add another task to trigger the timeout if it finishes earlier than our first task. + _ = group.addTaskUnlessCancelled { () -> Success in + try await Task.sleep(nanoseconds: UInt64(timeoutInSeconds * 1_000_000_000)) + throw ClientError("Operation timed out", file, line) + } + } else { + log.warning("Invalid timeout:\(timeoutInSeconds) was passed to Task.timeout. Task will timeout immediately.") + throw ClientError("Operation timed out", file, line) } - let result = try await group.next()! + + /// We need to deal with an optional, even though we know it's not optional. + /// This is default for task groups to account for when there aren't any pending tasks. + /// Awaiting on an empty group immediately returns 'nil' without suspending. + guard let result = try await group.next() else { + throw ClientError("Task produced no value", file, line) + } + + /// If we reach this, it means we have a value before the timeout. + /// We cancel the group, which means just cancelling the timeout task. group.cancelAll() return result } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Disconnected.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Disconnected.swift index 3db163968..01c3e1b0d 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Disconnected.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Disconnected.swift @@ -124,15 +124,12 @@ extension WebRTCCoordinator.StateMachine.Stage { /// will be delivered on the next batch of traces. statsAdapter?.sfuAdapter = nil - var cancellable: AnyCancellable? /// We add a small delay of 100ms in oder to ensure that the internet connection state /// has been updated, so that when we start observing it will receive the latest and /// updated value. _ = try? await DefaultTimer .publish(every: ScreenPropertiesAdapter.currentValue.refreshRate) - .nextValue { cancellable = $0 } - cancellable?.cancel() - cancellable = nil + .nextValue() statsAdapter?.sfuAdapter = nil diff --git a/StreamVideo.xcodeproj/project.pbxproj b/StreamVideo.xcodeproj/project.pbxproj index 424b785d9..0e72a6310 100644 --- a/StreamVideo.xcodeproj/project.pbxproj +++ b/StreamVideo.xcodeproj/project.pbxproj @@ -857,7 +857,6 @@ 40FB151B2BF77EEE00D5E580 /* Call+JoiningStage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB151A2BF77EEE00D5E580 /* Call+JoiningStage.swift */; }; 40FB151D2BF77EFA00D5E580 /* Call+JoinedStage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB151C2BF77EFA00D5E580 /* Call+JoinedStage.swift */; }; 40FB151F2BF78C6A00D5E580 /* Call+Error.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB151E2BF78C6A00D5E580 /* Call+Error.swift */; }; - 40FB15212BF78FA100D5E580 /* Publisher+NextValue.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB15202BF78FA100D5E580 /* Publisher+NextValue.swift */; }; 40FB8FF62D661DC400F4390A /* Call+Identifiable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40F445E72A9E2824004BE3DA /* Call+Identifiable.swift */; }; 40FB8FF82D661E2000F4390A /* String+OpenApiExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB8FF72D661E2000F4390A /* String+OpenApiExtensions.swift */; }; 40FB8FFA2D661F3F00F4390A /* CustomStringConvertible+Retroactive.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB8FF92D661F3F00F4390A /* CustomStringConvertible+Retroactive.swift */; }; @@ -2383,7 +2382,6 @@ 40FB151A2BF77EEE00D5E580 /* Call+JoiningStage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Call+JoiningStage.swift"; sourceTree = ""; }; 40FB151C2BF77EFA00D5E580 /* Call+JoinedStage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Call+JoinedStage.swift"; sourceTree = ""; }; 40FB151E2BF78C6A00D5E580 /* Call+Error.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Call+Error.swift"; sourceTree = ""; }; - 40FB15202BF78FA100D5E580 /* Publisher+NextValue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Publisher+NextValue.swift"; sourceTree = ""; }; 40FB8FF72D661E2000F4390A /* String+OpenApiExtensions.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "String+OpenApiExtensions.swift"; sourceTree = ""; }; 40FB8FF92D661F3F00F4390A /* CustomStringConvertible+Retroactive.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "CustomStringConvertible+Retroactive.swift"; sourceTree = ""; }; 40FB95CF2DAD6FE600B24BC7 /* PublisherAsyncStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PublisherAsyncStreamTests.swift; sourceTree = ""; }; @@ -5560,7 +5558,6 @@ children = ( 40FB150E2BF77CEC00D5E580 /* StreamStateMachine.swift */, 40FB15102BF77D5800D5E580 /* StreamStateMachineStage.swift */, - 40FB15202BF78FA100D5E580 /* Publisher+NextValue.swift */, ); path = StateMachine; sourceTree = ""; @@ -7902,7 +7899,6 @@ 84A7E1B02883E73100526C98 /* EventBatcher.swift in Sources */, 40BBC4DE2C63A507002AEF92 /* WebRTCCoordinator+CleanUp.swift in Sources */, 84CD12222C73831000056640 /* CallRtmpBroadcastStoppedEvent.swift in Sources */, - 40FB15212BF78FA100D5E580 /* Publisher+NextValue.swift in Sources */, 40F646242C7F225200FFB10A /* CollectionDelayedUpdateObserver.swift in Sources */, 40382F282C88B80C00C2D00F /* SignalServerEvent.swift in Sources */, 84BAD77C2A6BFF4300733156 /* BroadcastBufferReaderConnection.swift in Sources */, diff --git a/StreamVideoTests/Mock/MockRTCAudioStore.swift b/StreamVideoTests/Mock/MockRTCAudioStore.swift index 8f946f6c7..1f3d78ba2 100644 --- a/StreamVideoTests/Mock/MockRTCAudioStore.swift +++ b/StreamVideoTests/Mock/MockRTCAudioStore.swift @@ -16,6 +16,10 @@ final class MockRTCAudioStore { audioStore = RTCAudioStore(session: session) } + deinit { + InjectedValues[\.audioStore] = .init() + } + /// We call this just before the object that needs to use the mock is about to be created. func makeShared() { RTCAudioStore.currentValue = audioStore diff --git a/StreamVideoTests/Utils/Extensions/Combine/PublisherAsyncStreamTests.swift b/StreamVideoTests/Utils/Extensions/Combine/PublisherAsyncStreamTests.swift index 3bc3ae730..06c73a592 100644 --- a/StreamVideoTests/Utils/Extensions/Combine/PublisherAsyncStreamTests.swift +++ b/StreamVideoTests/Utils/Extensions/Combine/PublisherAsyncStreamTests.swift @@ -6,21 +6,37 @@ import Combine @testable import StreamVideo import XCTest +/// Comprehensive tests for Publisher+AsyncStream extension. +/// +/// Tests cover: +/// - AsyncStream conversion +/// - firstValue functionality +/// - nextValue functionality +/// - Timeout behavior +/// - Error handling +/// - Concurrency scenarios final class PublisherAsyncStreamTests: XCTestCase, @unchecked Sendable { private final class ReceivedValuesStorage: @unchecked Sendable { - private var values: [Element] = [] + @Atomic private var values: [Element] = [] func append(_ element: Element) { values.append(element) } - var count: Int { values.count } + var count: Int { + values.count + } - var array: [Element] { values } + var array: [Element] { + values + } } - - func testPublisherToAsyncStream() async { + + // MARK: - eraseAsAsyncStream Tests + + /// Tests basic conversion from Publisher to AsyncStream. + func test_eraseAsAsyncStream_convertsPublisher() async { // Given let storage = ReceivedValuesStorage() let publisher = PassthroughSubject() @@ -34,6 +50,7 @@ final class PublisherAsyncStreamTests: XCTestCase, @unchecked Sendable { storage.append(value) if storage.count == 3 { expectation.fulfill() + break } } } @@ -47,7 +64,8 @@ final class PublisherAsyncStreamTests: XCTestCase, @unchecked Sendable { XCTAssertEqual(storage.array, [1, 2, 3]) } - func testPublisherToAsyncStreamWithCompletion() async { + /// Tests AsyncStream properly completes when publisher completes. + func test_eraseAsAsyncStream_completesWithPublisher() async { // Given let storage = ReceivedValuesStorage() let publisher = PassthroughSubject() @@ -72,7 +90,8 @@ final class PublisherAsyncStreamTests: XCTestCase, @unchecked Sendable { XCTAssertEqual(storage.array, [1, 2]) } - func testPublisherToAsyncStreamWithCancellation() async { + /// Tests AsyncStream cancellation properly cancels the subscription. + func test_eraseAsAsyncStream_cancellationCancelsSubscription() async { // Given let storage = ReceivedValuesStorage() let publisher = PassthroughSubject() @@ -84,6 +103,9 @@ final class PublisherAsyncStreamTests: XCTestCase, @unchecked Sendable { let task = Task { for await value in asyncStream { storage.append(value) + if storage.count == 2 { + break + } } expectation.fulfill() } @@ -92,8 +114,295 @@ final class PublisherAsyncStreamTests: XCTestCase, @unchecked Sendable { publisher.send(1) publisher.send(2) task.cancel() + publisher.send(3) // Should not be received await fulfillment(of: [expectation], timeout: 1.0) XCTAssertEqual(storage.array, [1, 2]) } + + /// Tests multiple AsyncStreams from the same publisher. + func test_eraseAsAsyncStream_multipleStreams() async { + // Given + let storage1 = ReceivedValuesStorage() + let storage2 = ReceivedValuesStorage() + let publisher = PassthroughSubject() + + // When + let expectation1 = XCTestExpectation(description: "Stream 1 receives values") + let expectation2 = XCTestExpectation(description: "Stream 2 receives values") + + Task { + for await value in publisher.eraseAsAsyncStream() { + storage1.append(value) + if storage1.count == 3 { + expectation1.fulfill() + break + } + } + } + + Task { + for await value in publisher.eraseAsAsyncStream() { + storage2.append(value) + if storage2.count == 3 { + expectation2.fulfill() + break + } + } + } + + // Allow tasks to start + try? await Task.sleep(nanoseconds: 100_000_000) + + // Then + publisher.send(1) + publisher.send(2) + publisher.send(3) + + await fulfillment(of: [expectation1, expectation2], timeout: 1.0) + XCTAssertEqual(storage1.array, [1, 2, 3]) + XCTAssertEqual(storage2.array, [1, 2, 3]) + } + + // MARK: - firstValue Tests + + /// Tests firstValue returns the first emitted value. + func test_firstValue_returnsFirstEmittedValue() async throws { + // Given + let publisher = PassthroughSubject() + + // When + Task { + try? await Task.sleep(nanoseconds: 100_000_000) + publisher.send("first") + publisher.send("second") + publisher.send("third") + } + + let result = try await publisher.firstValue() + + // Then + XCTAssertEqual(result, "first") + } + + /// Tests firstValue with immediate value. + func test_firstValue_withImmediateValue() async throws { + // Given + let publisher = Just("immediate") + + // When + let result = try await publisher.firstValue() + + // Then + XCTAssertEqual(result, "immediate") + } + + /// Tests firstValue throws when publisher completes without value. + func test_firstValue_throwsWhenNoValue() async { + // Given + let publisher = Empty() + + // When/Then + do { + _ = try await publisher.firstValue() + XCTFail("Should throw when no value is produced") + } catch { + // Expected error + XCTAssertTrue(error is ClientError) + } + } + + /// Tests firstValue with timeout succeeds within timeout. + func test_firstValue_withTimeout_succeeds() async throws { + // Given + let publisher = PassthroughSubject() + + // When + Task { + try? await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds + publisher.send(42) + } + + let result = try await publisher.firstValue(timeoutInSeconds: 1.0) + + // Then + XCTAssertEqual(result, 42) + } + + /// Tests firstValue with timeout throws on timeout. + func test_firstValue_withTimeout_timesOut() async { + // Given + let publisher = PassthroughSubject() + + // When/Then + do { + _ = try await publisher.firstValue(timeoutInSeconds: 0.1) + XCTFail("Should timeout") + } catch { + // Expected timeout error + XCTAssertTrue(error is ClientError) + } + } + + // MARK: - nextValue Tests + + /// Tests nextValue returns the next value. + func test_nextValue_returnsNextValue() async throws { + // Given + let publisher = PassthroughSubject() + + // When + Task { + try? await Task.sleep(nanoseconds: 100_000_000) + publisher.send(1) + } + + let result = try await publisher.nextValue() + + // Then + XCTAssertEqual(result, 1) + } + + /// Tests nextValue with dropFirst skips values. + func test_nextValue_withDropFirst() async throws { + // Given + let publisher = PassthroughSubject() + + // When + Task { + try? await Task.sleep(nanoseconds: 100_000_000) + publisher.send(1) + publisher.send(2) + publisher.send(3) + } + + let result = try await publisher.nextValue(dropFirst: 2) + + // Then + XCTAssertEqual(result, 3) + } + + /// Tests nextValue with timeout. + func test_nextValue_withTimeout() async throws { + // Given + let publisher = PassthroughSubject() + + // When + Task { + try? await Task.sleep(nanoseconds: 200_000_000) // 0.2 seconds + publisher.send("value") + } + + let result = try await publisher.nextValue(timeout: 1.0) + + // Then + XCTAssertEqual(result, "value") + } + + /// Tests nextValue timeout failure. + func test_nextValue_timeoutFailure() async { + // Given + let publisher = PassthroughSubject() + + // When/Then + do { + _ = try await publisher.nextValue(timeout: 0.1) + XCTFail("Should timeout") + } catch { + // Expected timeout + XCTAssertTrue(error is ClientError) + } + } + + /// Tests nextValue with dropFirst and timeout. + func test_nextValue_withDropFirstAndTimeout() async throws { + // Given + let publisher = PassthroughSubject() + + // When + Task { + try? await Task.sleep(nanoseconds: 50_000_000) + publisher.send(1) + publisher.send(2) + try? await Task.sleep(nanoseconds: 50_000_000) + publisher.send(3) + } + + let result = try await publisher.nextValue(dropFirst: 2, timeout: 1.0) + + // Then + XCTAssertEqual(result, 3) + } + + // MARK: - Concurrency Tests + + /// Tests concurrent firstValue calls. + func test_concurrentFirstValueCalls() async throws { + // Given + let publisher = PassthroughSubject() + + // When + async let result1 = publisher.firstValue() + async let result2 = publisher.firstValue() + async let result3 = publisher.firstValue() + + // Send values after tasks are waiting + try? await Task.sleep(nanoseconds: 100_000_000) + publisher.send(42) + + // Then + let results = try await [result1, result2, result3] + XCTAssertTrue(results.allSatisfy { $0 == 42 }) + } + + // MARK: - Performance Tests + + /// Tests performance of AsyncStream conversion. + func test_asyncStreamPerformance() async { + // Given + let iterations = 1000 + let publisher = PassthroughSubject() + let storage = ReceivedValuesStorage() + + // When + let expectation = XCTestExpectation(description: "Performance test") + + Task { + for await value in publisher.eraseAsAsyncStream() { + storage.append(value) + if storage.count == iterations { + expectation.fulfill() + break + } + } + } + + // Allow task to start + try? await Task.sleep(nanoseconds: 100_000_000) + + let start = CFAbsoluteTimeGetCurrent() + for i in 0.. { + func asyncMap(_ transform: (Element) async -> T) async -> [T] { + var results: [T] = [] + for element in self { + results.append(await transform(element)) + } + return results + } } diff --git a/StreamVideoTests/WebRTC/Extensions/Foundation/Task_TimeoutTests.swift b/StreamVideoTests/WebRTC/Extensions/Foundation/Task_TimeoutTests.swift index f4250889e..1c73021b9 100644 --- a/StreamVideoTests/WebRTC/Extensions/Foundation/Task_TimeoutTests.swift +++ b/StreamVideoTests/WebRTC/Extensions/Foundation/Task_TimeoutTests.swift @@ -5,60 +5,303 @@ @testable import StreamVideo import XCTest +/// Comprehensive tests for Task+Timeout extension. +/// +/// Tests cover: +/// - Successful operations within timeout +/// - Timeout scenarios +/// - Concurrent task handling +/// - Edge cases and error handling final class TaskTimeoutTests: XCTestCase, @unchecked Sendable { - - func testSuccessfulOperationWithinTimeout() async throws { - let expectation = XCTestExpectation(description: "Operation completed successfully") - - let task = Task(timeout: 2) { - try await Task.sleep(nanoseconds: 1_000_000_000) // 1 second - expectation.fulfill() - return "Success" + + // MARK: - Basic Functionality Tests + + /// Tests that a task completes successfully within the timeout period. + func test_successfulOperation_withinTimeout() async throws { + // Given + let expectedResult = "Success" + let operationDuration: TimeInterval = 0.5 + let timeout: TimeInterval = 2.0 + + // When + let task = Task(timeoutInSeconds: timeout) { + try await Task.sleep(nanoseconds: UInt64(operationDuration * 1_000_000_000)) + return expectedResult } - + let result = try await task.value - XCTAssertEqual(result, "Success") - - await fulfillment(of: [expectation], timeout: 3) + + // Then + XCTAssertEqual(result, expectedResult, "Task should return expected result") } - - func testOperationTimesOut() async throws { - let task = Task(timeout: 1) { - try await Task.sleep(nanoseconds: 2_000_000_000) // 2 seconds + + /// Tests that a task throws timeout error when exceeding the timeout period. + func test_operation_timesOut() async throws { + // Given + let operationDuration: TimeInterval = 2.0 + let timeout: TimeInterval = 0.5 + + // When/Then + let task = Task(timeoutInSeconds: timeout) { + try await Task.sleep(nanoseconds: UInt64(operationDuration * 1_000_000_000)) return "This should not be reached" } - + do { _ = try await task.value - XCTFail("Expected timeout error") - } catch let error as Task.TimeoutError { - XCTAssertEqual(error, .timedOut) + XCTFail("Expected timeout error but operation succeeded") + } catch let error as ClientError { + XCTAssertTrue( + error.localizedDescription.contains("timed out"), + "Error should indicate timeout" + ) } catch { - XCTFail("Unexpected error: \(error)") + XCTFail("Unexpected error type: \(error)") } } + + /// Tests immediate return for operations that complete instantly. + func test_immediateOperation_completes() async throws { + // Given + let expectedResult = 42 + let timeout: TimeInterval = 1.0 + + // When + let task = Task(timeoutInSeconds: timeout) { + // No delay, immediate return + expectedResult + } + + let result = try await task.value + + // Then + XCTAssertEqual(result, expectedResult, "Immediate operation should complete") + } + + // MARK: - Concurrent Task Tests + + /// Tests multiple concurrent tasks with different timeouts. + func test_concurrentTasks_differentTimeouts() async throws { + // Given + let results = await withTaskGroup(of: Result.self) { group in + // Task that completes successfully + group.addTask { + do { + return .success(try await Task(timeoutInSeconds: 2.0) { + try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds + return "Task 1" + }.value) + } catch { + return .failure(ClientError("Task 1")) + } + } + + // Task that times out + group.addTask { + do { + return .success(try await Task(timeoutInSeconds: 0.5) { + try await Task.sleep(nanoseconds: 2_000_000_000) // 2 seconds + return "Task 2" + }.value) + } catch { + return .failure(ClientError("Task 2")) + } + } + + // Task that completes quickly + group.addTask { + do { + return .success(try await Task(timeoutInSeconds: 1.0) { + return "Task 3" + }.value) + } catch { + return .failure(ClientError("Task 3")) + } + } + + var collectedResults: [Result] = [] + for await result in group { + collectedResults.append(result) + } + return collectedResults + } + + // Then + XCTAssertEqual(results.count, 3, "Should have 3 results") - func testConcurrentTasks() async throws { - let expectation1 = XCTestExpectation(description: "Task 1 completed") - let expectation2 = XCTestExpectation(description: "Task 2 completed") - - async let task1 = Task(timeout: 2) { - try await Task.sleep(nanoseconds: 1_000_000_000) // 1 second - expectation1.fulfill() - return "Task 1" - }.value - - async let task2 = Task(timeout: 2) { - try await Task.sleep(nanoseconds: 1_500_000_000) // 1.5 seconds - expectation2.fulfill() - return "Task 2" - }.value + for result in results { + switch result { + case let .success(value): + switch value { + case "Task 1": + break + case "Task 2": + XCTFail("Task 2 should have timed out") + case "Task 3": + break + default: + XCTFail("Unknown task") + } - let (result1, result2) = try await(task1, task2) + case let .failure(error): + guard + let value = (error as? ClientError)?.localizedDescription + else { + XCTFail() + return + } + switch value { + case "Task 1": + XCTFail("Task 1 should have succeeded.") + case "Task 2": + break + case "Task 3": + XCTFail("Task 3 should have succeeded.") + default: + XCTFail("Unknown task") + } + } + } + } + + // MARK: - Cancellation Tests + + /// Tests that cancelling a timeout task prevents completion. + func test_taskCancellation_preventsCompletion() async throws { + // Given + let task = Task(timeoutInSeconds: 2.0) { + try await Task.sleep(nanoseconds: 1_000_000_000) + return "Should not complete" + } + + // When + task.cancel() + + // Then + do { + _ = try await task.value + XCTFail("Cancelled task should not complete successfully") + } catch { + return + } + } + + /// Tests that timeout respects task cancellation. + func test_timeout_respectsCancellation() async throws { + // Given + nonisolated(unsafe) var operationStarted = false + let task = Task(timeoutInSeconds: 5.0) { + operationStarted = true + try Task.checkCancellation() + try await Task.sleep(nanoseconds: 10_000_000_000) // 10 seconds + return "Should not complete" + } + + // When + try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds + task.cancel() + + // Then + do { + _ = try await task.value + XCTFail("Task should have been cancelled") + } catch { + XCTAssertTrue(operationStarted, "Operation should have started") + } + } - XCTAssertEqual(result1, "Task 1") - XCTAssertEqual(result2, "Task 2") + // MARK: - Edge Cases + + /// Tests zero timeout behavior. + func test_zeroTimeout_immediatelyTimesOut() async throws { + // Given + let task = Task(timeoutInSeconds: 0) { + "Should timeout immediately" + } + + // When/Then + do { + _ = try await task.value + XCTFail("Zero timeout should fail immediately") + } catch { + // Expected timeout + } + } + + /// Tests negative timeout behavior. + func test_negativeTimeout_immediatelyTimesOut() async throws { + // Given + let task = Task(timeoutInSeconds: -1.0) { + "Should timeout immediately" + } + + // When/Then + do { + _ = try await task.value + XCTFail("Negative timeout should fail immediately") + } catch { + // Expected timeout or immediate completion + } + } - await fulfillment(of: [expectation1, expectation2], timeout: 3) + // MARK: - Error Propagation Tests + + /// Tests that errors from the operation are properly propagated. + func test_operationError_isPropagated() async throws { + // Given + struct TestError: Error, Equatable { + let message: String + } + + let expectedError = TestError(message: "Operation failed") + + // When + let task = Task(timeoutInSeconds: 2.0) { + throw expectedError + } + + // Then + do { + _ = try await task.value + XCTFail("Should have thrown error") + } catch let error as TestError { + XCTAssertEqual(error, expectedError, "Original error should be propagated") + } catch { + XCTFail("Unexpected error type: \(error)") + } + } + + // MARK: - Stress Tests + + /// Tests many concurrent timeout tasks. + func test_manyConcurrentTimeoutTasks() async throws { + // Given + let taskCount = 100 + + // When + let results = await withTaskGroup(of: Int?.self) { group in + for i in 0..