|  | 
|  | 1 | +//===----------------------------------------------------------------------===// | 
|  | 2 | +// | 
|  | 3 | +// This source file is part of the Swift.org open source project | 
|  | 4 | +// | 
|  | 5 | +// Copyright (c) 2014 - 2025 Apple Inc. and the Swift project authors | 
|  | 6 | +// Licensed under Apache License v2.0 with Runtime Library Exception | 
|  | 7 | +// | 
|  | 8 | +// See https://swift.org/LICENSE.txt for license information | 
|  | 9 | +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors | 
|  | 10 | +// | 
|  | 11 | +//===----------------------------------------------------------------------===// | 
|  | 12 | + | 
|  | 13 | +import Foundation | 
|  | 14 | + | 
|  | 15 | +/// Abstraction layer so we can store a heterogeneous collection of tasks in an | 
|  | 16 | +/// array. | 
|  | 17 | +private protocol AnyTask: Sendable { | 
|  | 18 | +  func waitForCompletion() async | 
|  | 19 | + | 
|  | 20 | +  func cancel() | 
|  | 21 | +} | 
|  | 22 | + | 
|  | 23 | +extension Task: AnyTask { | 
|  | 24 | +  func waitForCompletion() async { | 
|  | 25 | +    _ = try? await value | 
|  | 26 | +  } | 
|  | 27 | +} | 
|  | 28 | + | 
|  | 29 | +/// A type that is able to track dependencies between tasks. | 
|  | 30 | +public protocol DependencyTracker: Sendable, Hashable { | 
|  | 31 | +  /// Whether the task described by `self` needs to finish executing before `other` can start executing. | 
|  | 32 | +  func isDependency(of other: Self) -> Bool | 
|  | 33 | +} | 
|  | 34 | + | 
|  | 35 | +/// A dependency tracker where each task depends on every other, i.e. a serial | 
|  | 36 | +/// queue. | 
|  | 37 | +public struct Serial: DependencyTracker { | 
|  | 38 | +  public func isDependency(of other: Serial) -> Bool { | 
|  | 39 | +    return true | 
|  | 40 | +  } | 
|  | 41 | +} | 
|  | 42 | + | 
|  | 43 | +package struct PendingTask<TaskMetadata: Sendable & Hashable>: Sendable { | 
|  | 44 | +  /// The task that is pending. | 
|  | 45 | +  fileprivate let task: any AnyTask | 
|  | 46 | + | 
|  | 47 | +  /// A unique value used to identify the task. This allows tasks to get | 
|  | 48 | +  /// removed from `pendingTasks` again after they finished executing. | 
|  | 49 | +  fileprivate let id: UUID | 
|  | 50 | +} | 
|  | 51 | + | 
|  | 52 | +/// A list of pending tasks that can be sent across actor boundaries and is guarded by a lock. | 
|  | 53 | +/// | 
|  | 54 | +/// - Note: Unchecked sendable because the tasks are being protected by a lock. | 
|  | 55 | +private final class PendingTasks<TaskMetadata: Sendable & Hashable>: Sendable { | 
|  | 56 | +  ///  Lock guarding `pendingTasks`. | 
|  | 57 | +  private let lock = NSLock() | 
|  | 58 | + | 
|  | 59 | +  /// Pending tasks that have not finished execution yet. | 
|  | 60 | +  /// | 
|  | 61 | +  /// - Important: This must only be accessed while `lock` has been acquired. | 
|  | 62 | +  private nonisolated(unsafe) var tasksByMetadata: [TaskMetadata: [PendingTask<TaskMetadata>]] = [:] | 
|  | 63 | + | 
|  | 64 | +  init() { | 
|  | 65 | +    self.lock.name = "AsyncQueue" | 
|  | 66 | +  } | 
|  | 67 | + | 
|  | 68 | +  /// Capture a lock and execute the closure, which may modify the pending tasks. | 
|  | 69 | +  func withLock<T>( | 
|  | 70 | +    _ body: (_ tasksByMetadata: inout [TaskMetadata: [PendingTask<TaskMetadata>]]) throws -> T | 
|  | 71 | +  ) rethrows -> T { | 
|  | 72 | +    try lock.withLock { | 
|  | 73 | +      try body(&tasksByMetadata) | 
|  | 74 | +    } | 
|  | 75 | +  } | 
|  | 76 | +} | 
|  | 77 | + | 
|  | 78 | +/// A queue that allows the execution of asynchronous blocks of code. | 
|  | 79 | +public final class AsyncQueue<TaskMetadata: DependencyTracker>: Sendable { | 
|  | 80 | +  private let pendingTasks: PendingTasks<TaskMetadata> = PendingTasks() | 
|  | 81 | + | 
|  | 82 | +  public init() {} | 
|  | 83 | + | 
|  | 84 | +  /// Schedule a new closure to be executed on the queue. | 
|  | 85 | +  /// | 
|  | 86 | +  /// If this is a serial queue, all previously added tasks are guaranteed to | 
|  | 87 | +  /// finished executing before this closure gets executed. | 
|  | 88 | +  /// | 
|  | 89 | +  /// If this is a barrier, all previously scheduled tasks are guaranteed to | 
|  | 90 | +  /// finish execution before the barrier is executed and all tasks that are | 
|  | 91 | +  /// added later will wait until the barrier finishes execution. | 
|  | 92 | +  @discardableResult | 
|  | 93 | +  public func async<Success: Sendable>( | 
|  | 94 | +    priority: TaskPriority? = nil, | 
|  | 95 | +    metadata: TaskMetadata, | 
|  | 96 | +    @_inheritActorContext operation: @escaping @Sendable () async -> Success | 
|  | 97 | +  ) -> Task<Success, Never> { | 
|  | 98 | +    let throwingTask = asyncThrowing(priority: priority, metadata: metadata, operation: operation) | 
|  | 99 | +    return Task(priority: priority) { | 
|  | 100 | +      do { | 
|  | 101 | +        return try await throwingTask.valuePropagatingCancellation | 
|  | 102 | +      } catch { | 
|  | 103 | +        // We know this can never happen because `operation` does not throw. | 
|  | 104 | +        preconditionFailure("Executing a task threw an error even though the operation did not throw") | 
|  | 105 | +      } | 
|  | 106 | +    } | 
|  | 107 | +  } | 
|  | 108 | + | 
|  | 109 | +  /// Same as ``AsyncQueue/async(priority:barrier:operation:)`` but allows the | 
|  | 110 | +  /// operation to throw. | 
|  | 111 | +  /// | 
|  | 112 | +  /// - Important: The caller is responsible for handling any errors thrown from | 
|  | 113 | +  ///   the operation by awaiting the result of the returned task. | 
|  | 114 | +  public func asyncThrowing<Success: Sendable>( | 
|  | 115 | +    priority: TaskPriority? = nil, | 
|  | 116 | +    metadata: TaskMetadata, | 
|  | 117 | +    @_inheritActorContext operation: @escaping @Sendable () async throws -> Success | 
|  | 118 | +  ) -> Task<Success, any Error> { | 
|  | 119 | +    let id = UUID() | 
|  | 120 | + | 
|  | 121 | +    return pendingTasks.withLock { tasksByMetadata in | 
|  | 122 | +      // Build the list of tasks that need to finished execution before this one | 
|  | 123 | +      // can be executed | 
|  | 124 | +      var dependencies: [PendingTask<TaskMetadata>] = [] | 
|  | 125 | +      for (pendingMetadata, pendingTasks) in tasksByMetadata { | 
|  | 126 | +        guard pendingMetadata.isDependency(of: metadata) else { | 
|  | 127 | +          // No dependency | 
|  | 128 | +          continue | 
|  | 129 | +        } | 
|  | 130 | +        if metadata.isDependency(of: metadata), let lastPendingTask = pendingTasks.last { | 
|  | 131 | +          // This kind of task depends on all other tasks of the same kind finishing. It is sufficient to just wait on | 
|  | 132 | +          // the last task with this metadata, it will have all the other tasks with the same metadata as transitive | 
|  | 133 | +          // dependencies. | 
|  | 134 | +          dependencies.append(lastPendingTask) | 
|  | 135 | +        } else { | 
|  | 136 | +          // We depend on tasks with this metadata, but they don't have any dependencies between them, eg. | 
|  | 137 | +          // `documentUpdate` depends on all `documentRequest` but `documentRequest` don't have dependencies between | 
|  | 138 | +          // them. We need to depend on all of them unless we knew that we depended on some other task that already | 
|  | 139 | +          // depends on all of these. But determining that would also require knowledge about the entire dependency | 
|  | 140 | +          // graph, which is likely as expensive as depending on all of these tasks. | 
|  | 141 | +          dependencies += pendingTasks | 
|  | 142 | +        } | 
|  | 143 | +      } | 
|  | 144 | + | 
|  | 145 | +      // Schedule the task. | 
|  | 146 | +      let task = Task(priority: priority) { [pendingTasks] in | 
|  | 147 | +        // IMPORTANT: The only throwing call in here must be the call to | 
|  | 148 | +        // operation. Otherwise the assumption that the task will never throw | 
|  | 149 | +        // if `operation` does not throw, which we are making in `async` does | 
|  | 150 | +        // not hold anymore. | 
|  | 151 | +        for dependency in dependencies { | 
|  | 152 | +          await dependency.task.waitForCompletion() | 
|  | 153 | +        } | 
|  | 154 | + | 
|  | 155 | +        let result = try await operation() | 
|  | 156 | + | 
|  | 157 | +        pendingTasks.withLock { tasksByMetadata in | 
|  | 158 | +          tasksByMetadata[metadata, default: []].removeAll(where: { $0.id == id }) | 
|  | 159 | +          if tasksByMetadata[metadata]?.isEmpty ?? false { | 
|  | 160 | +            tasksByMetadata[metadata] = nil | 
|  | 161 | +          } | 
|  | 162 | +        } | 
|  | 163 | + | 
|  | 164 | +        return result | 
|  | 165 | +      } | 
|  | 166 | + | 
|  | 167 | +      tasksByMetadata[metadata, default: []].append(PendingTask(task: task, id: id)) | 
|  | 168 | + | 
|  | 169 | +      return task | 
|  | 170 | +    } | 
|  | 171 | +  } | 
|  | 172 | +} | 
|  | 173 | + | 
|  | 174 | +/// Convenience overloads for serial queues. | 
|  | 175 | +extension AsyncQueue where TaskMetadata == Serial { | 
|  | 176 | +  /// Same as ``async(priority:operation:)`` but specialized for serial queues | 
|  | 177 | +  /// that don't specify any metadata. | 
|  | 178 | +  @discardableResult | 
|  | 179 | +  public func async<Success: Sendable>( | 
|  | 180 | +    priority: TaskPriority? = nil, | 
|  | 181 | +    @_inheritActorContext operation: @escaping @Sendable () async -> Success | 
|  | 182 | +  ) -> Task<Success, Never> { | 
|  | 183 | +    return self.async(priority: priority, metadata: Serial(), operation: operation) | 
|  | 184 | +  } | 
|  | 185 | + | 
|  | 186 | +  /// Same as ``asyncThrowing(priority:metadata:operation:)`` but specialized | 
|  | 187 | +  /// for serial queues that don't specify any metadata. | 
|  | 188 | +  public func asyncThrowing<Success: Sendable>( | 
|  | 189 | +    priority: TaskPriority? = nil, | 
|  | 190 | +    @_inheritActorContext operation: @escaping @Sendable () async throws -> Success | 
|  | 191 | +  ) -> Task<Success, any Error> { | 
|  | 192 | +    return self.asyncThrowing(priority: priority, metadata: Serial(), operation: operation) | 
|  | 193 | +  } | 
|  | 194 | +} | 
0 commit comments