Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions Sources/StreamVideo/StreamVideo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,11 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
log.debug("Listening for WS connection")

do {
var cancellable: AnyCancellable?
log.debug("Listening for WS connection")
_ = try await DefaultTimer
.publish(every: 0.1)
.filter { [weak webSocketClient] _ in webSocketClient?.connectionState.isConnected == true }
.nextValue(timeout: 30) { cancellable = $0 }
cancellable?.cancel()
cancellable = nil
.nextValue(timeout: 30)
} catch {
log.debug("Timeout while waiting for WS connection opening")
throw ClientError.NetworkError()
Expand Down Expand Up @@ -583,21 +580,16 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
return ""
}

var cancellable: AnyCancellable?
do {
let result = try await DefaultTimer
.publish(every: 0.1)
.log(.debug) { _ in "Waiting for connection id" }
.compactMap { [weak self] _ in self?.loadConnectionIdFromHealthcheck() }
.nextValue(timeout: 5) { cancellable = $0 }
.nextValue(timeout: 5)
defer { log.debug("ConnectionId loaded: \(result)") }
cancellable?.cancel()
cancellable = nil
return result
} catch {
log.warning("Unable to load connectionId.")
cancellable?.cancel()
cancellable = nil
return ""
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,61 @@ public extension Publisher where Output: Sendable {
}
}
}

func firstValue(
file: StaticString = #file,
line: UInt = #line
) async throws -> Output {
if #available(iOS 15.0, *) {
for try await value in self.values {
return value
}
} else {
for try await value in eraseAsAsyncStream() {
return value
}
}

throw ClientError("Task produced no value.", file, line)
}

func firstValue(
timeoutInSeconds: TimeInterval,
file: StaticString = #file,
function: StaticString = #function,
line: UInt = #line
) async throws -> Output {
nonisolated(unsafe) let selfReference = self
return try await Task(
timeoutInSeconds: timeoutInSeconds,
file: file,
function: function,
line: line
) {
try await selfReference.firstValue(file: file, line: line)
Copy link
Preview

Copilot AI Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using nonisolated(unsafe) is potentially dangerous as it bypasses Swift's concurrency safety checks. Consider using a safer approach like capturing self directly in the Task closure or using proper isolation.

Suggested change
try await selfReference.firstValue(file: file, line: line)
return try await Task(
timeoutInSeconds: timeoutInSeconds,
file: file,
function: function,
line: line
) {
try await self.firstValue(file: file, line: line)

Copilot uses AI. Check for mistakes.

}.value
}

func nextValue(
dropFirst: Int = 0,
timeout: TimeInterval? = nil,
file: StaticString = #fileID,
function: StaticString = #function,
line: UInt = #line
) async throws -> Output {
let publisher = dropFirst > 0
? self.dropFirst(dropFirst).eraseToAnyPublisher()
: eraseToAnyPublisher()

if let timeout {
return try await publisher.firstValue(
timeoutInSeconds: timeout,
file: file,
function: function,
line: line
)
} else {
return try await publisher.firstValue()
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,45 @@
import Foundation

/// An extension to the `Task` type that adds timeout functionality.
extension Task where Failure == Error {
/// An error type representing a timeout condition.
enum TimeoutError: Error {
/// Indicates that the operation has timed out.
case timedOut
}

/// Initializes a new task with a timeout.
///
/// This initializer creates a new task that will execute the given operation
/// with a specified timeout. If the operation doesn't complete within the
/// timeout period, a `TimeoutError` will be thrown.
///
/// - Parameters:
/// - timeout: The maximum duration (in seconds) to wait for the operation to complete.
/// - operation: The asynchronous operation to perform.
///
/// - Returns: A new `Task` instance that will execute the operation with the specified timeout.
///
/// - Throws: `TimeoutError.timedOut` if the operation doesn't complete within the specified timeout.
///
/// - Note: This implementation uses a task group to manage concurrent execution
/// of the main operation and the timeout timer.
extension Task where Failure == any Error {
Copy link
Preview

Copilot AI Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constraint where Failure == any Error is unnecessarily restrictive. The original constraint where Failure == Error was more appropriate as it allows the extension to work with any concrete Error type, not just existential types.

Suggested change
extension Task where Failure == any Error {
extension Task where Failure == Error {

Copilot uses AI. Check for mistakes.

@discardableResult
init(
timeout: TimeInterval,
operation: @Sendable @escaping () async throws -> Success
priority: TaskPriority? = nil,
/// New: a timeout property to configure how long a task may perform before failing with a timeout error.
timeoutInSeconds: TimeInterval,
file: StaticString = #fileID,
function: StaticString = #function,
line: UInt = #line,
operation: @Sendable @escaping @isolated(any) () async throws -> Success
Copy link
Preview

Copilot AI Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @isolated(any) annotation is unnecessary here and may cause compilation issues. The @Sendable @escaping annotations are sufficient for this async operation parameter.

Suggested change
operation: @Sendable @escaping @isolated(any) () async throws -> Success
operation: @Sendable @escaping () async throws -> Success

Copilot uses AI. Check for mistakes.

) {
self.init {
self = Task(priority: priority) {
try await withThrowingTaskGroup(of: Success.self) { group in
group.addTask {

/// Add the operation to perform as the first task.
_ = group.addTaskUnlessCancelled {
try await operation()
}
group.addTask {
try await Task<Never, Never>.sleep(
nanoseconds: UInt64(
timeout * 1_000_000_000
)
)
throw TimeoutError.timedOut

if timeoutInSeconds > 0, timeoutInSeconds <= TimeInterval(UInt64.max) {
/// Add another task to trigger the timeout if it finishes earlier than our first task.
_ = group.addTaskUnlessCancelled { () -> Success in
try await Task<Never, Never>.sleep(nanoseconds: UInt64(timeoutInSeconds * 1_000_000_000))
throw ClientError("Operation timed out", file, line)
}
} else {
log.warning("Invalid timeout:\(timeoutInSeconds) was passed to Task.timeout. Task will timeout immediately.")
throw ClientError("Operation timed out", file, line)
}
let result = try await group.next()!

/// We need to deal with an optional, even though we know it's not optional.
/// This is default for task groups to account for when there aren't any pending tasks.
/// Awaiting on an empty group immediately returns 'nil' without suspending.
guard let result = try await group.next() else {
throw ClientError("Task produced no value", file, line)
}

/// If we reach this, it means we have a value before the timeout.
/// We cancel the group, which means just cancelling the timeout task.
group.cancelAll()
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,12 @@ extension WebRTCCoordinator.StateMachine.Stage {
/// will be delivered on the next batch of traces.
statsAdapter?.sfuAdapter = nil

var cancellable: AnyCancellable?
/// We add a small delay of 100ms in oder to ensure that the internet connection state
/// has been updated, so that when we start observing it will receive the latest and
/// updated value.
_ = try? await DefaultTimer
.publish(every: ScreenPropertiesAdapter.currentValue.refreshRate)
.nextValue { cancellable = $0 }
cancellable?.cancel()
cancellable = nil
.nextValue()

statsAdapter?.sfuAdapter = nil

Expand Down
4 changes: 0 additions & 4 deletions StreamVideo.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@
40FB151B2BF77EEE00D5E580 /* Call+JoiningStage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB151A2BF77EEE00D5E580 /* Call+JoiningStage.swift */; };
40FB151D2BF77EFA00D5E580 /* Call+JoinedStage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB151C2BF77EFA00D5E580 /* Call+JoinedStage.swift */; };
40FB151F2BF78C6A00D5E580 /* Call+Error.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB151E2BF78C6A00D5E580 /* Call+Error.swift */; };
40FB15212BF78FA100D5E580 /* Publisher+NextValue.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB15202BF78FA100D5E580 /* Publisher+NextValue.swift */; };
40FB8FF62D661DC400F4390A /* Call+Identifiable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40F445E72A9E2824004BE3DA /* Call+Identifiable.swift */; };
40FB8FF82D661E2000F4390A /* String+OpenApiExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB8FF72D661E2000F4390A /* String+OpenApiExtensions.swift */; };
40FB8FFA2D661F3F00F4390A /* CustomStringConvertible+Retroactive.swift in Sources */ = {isa = PBXBuildFile; fileRef = 40FB8FF92D661F3F00F4390A /* CustomStringConvertible+Retroactive.swift */; };
Expand Down Expand Up @@ -2383,7 +2382,6 @@
40FB151A2BF77EEE00D5E580 /* Call+JoiningStage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Call+JoiningStage.swift"; sourceTree = "<group>"; };
40FB151C2BF77EFA00D5E580 /* Call+JoinedStage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Call+JoinedStage.swift"; sourceTree = "<group>"; };
40FB151E2BF78C6A00D5E580 /* Call+Error.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Call+Error.swift"; sourceTree = "<group>"; };
40FB15202BF78FA100D5E580 /* Publisher+NextValue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Publisher+NextValue.swift"; sourceTree = "<group>"; };
40FB8FF72D661E2000F4390A /* String+OpenApiExtensions.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "String+OpenApiExtensions.swift"; sourceTree = "<group>"; };
40FB8FF92D661F3F00F4390A /* CustomStringConvertible+Retroactive.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "CustomStringConvertible+Retroactive.swift"; sourceTree = "<group>"; };
40FB95CF2DAD6FE600B24BC7 /* PublisherAsyncStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PublisherAsyncStreamTests.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -5560,7 +5558,6 @@
children = (
40FB150E2BF77CEC00D5E580 /* StreamStateMachine.swift */,
40FB15102BF77D5800D5E580 /* StreamStateMachineStage.swift */,
40FB15202BF78FA100D5E580 /* Publisher+NextValue.swift */,
);
path = StateMachine;
sourceTree = "<group>";
Expand Down Expand Up @@ -7902,7 +7899,6 @@
84A7E1B02883E73100526C98 /* EventBatcher.swift in Sources */,
40BBC4DE2C63A507002AEF92 /* WebRTCCoordinator+CleanUp.swift in Sources */,
84CD12222C73831000056640 /* CallRtmpBroadcastStoppedEvent.swift in Sources */,
40FB15212BF78FA100D5E580 /* Publisher+NextValue.swift in Sources */,
40F646242C7F225200FFB10A /* CollectionDelayedUpdateObserver.swift in Sources */,
40382F282C88B80C00C2D00F /* SignalServerEvent.swift in Sources */,
84BAD77C2A6BFF4300733156 /* BroadcastBufferReaderConnection.swift in Sources */,
Expand Down
4 changes: 4 additions & 0 deletions StreamVideoTests/Mock/MockRTCAudioStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ final class MockRTCAudioStore {
audioStore = RTCAudioStore(session: session)
}

deinit {
InjectedValues[\.audioStore] = .init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a leftover from the other PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is actually here to clean the InjectedValues context when the store isn't needed any more. Now that i'm thinking out loud though, it will never be triggered as the reference is still in InjectedValues, so deinit will never be triggered. We probably need an explicit method to uninstall/dismantle.

}

/// We call this just before the object that needs to use the mock is about to be created.
func makeShared() {
RTCAudioStore.currentValue = audioStore
Expand Down
Loading
Loading