Skip to content

Commit d2d172e

Browse files
committed
Introduce Windows IOCP based AsyncIO implementation
1 parent 9c0f3cd commit d2d172e

15 files changed

+717
-349
lines changed

Sources/Subprocess/API.swift

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ public func run<
105105
output: try output.createPipe(),
106106
error: try error.createPipe()
107107
) { execution, inputIO, outputIO, errorIO in
108-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
109-
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
110-
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
108+
var inputIOBox: IOChannel? = consume inputIO
109+
var outputIOBox: IOChannel? = consume outputIO
110+
var errorIOBox: IOChannel? = consume errorIO
111111

112112
// Write input, capture output and error in parallel
113113
async let stdout = try output.captureOutput(from: outputIOBox.take())
@@ -177,12 +177,12 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol, Error: Out
177177
output: try output.createPipe(),
178178
error: try error.createPipe()
179179
) { execution, inputIO, outputIO, errorIO in
180-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
180+
var inputIOBox: IOChannel? = consume inputIO
181181
return try await withThrowingTaskGroup(
182182
of: Void.self,
183183
returning: Result.self
184184
) { group in
185-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
185+
var inputIOContainer: IOChannel? = inputIOBox.take()
186186
group.addTask {
187187
if let inputIO = inputIOContainer.take() {
188188
let writer = StandardInputWriter(diskIO: inputIO)
@@ -237,13 +237,13 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
237237
output: try output.createPipe(),
238238
error: try error.createPipe()
239239
) { execution, inputIO, outputIO, errorIO in
240-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
241-
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
240+
var inputIOBox: IOChannel? = consume inputIO
241+
var outputIOBox: IOChannel? = consume outputIO
242242
return try await withThrowingTaskGroup(
243243
of: Void.self,
244244
returning: Result.self
245245
) { group in
246-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
246+
var inputIOContainer: IOChannel? = inputIOBox.take()
247247
group.addTask {
248248
if let inputIO = inputIOContainer.take() {
249249
let writer = StandardInputWriter(diskIO: inputIO)
@@ -253,7 +253,7 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
253253
}
254254

255255
// Body runs in the same isolation
256-
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO())
256+
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeIOChannel())
257257
let result = try await body(execution, outputSequence)
258258
try await group.waitForAll()
259259
return result
@@ -299,13 +299,13 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
299299
output: try output.createPipe(),
300300
error: try error.createPipe()
301301
) { execution, inputIO, outputIO, errorIO in
302-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
303-
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
302+
var inputIOBox: IOChannel? = consume inputIO
303+
var errorIOBox: IOChannel? = consume errorIO
304304
return try await withThrowingTaskGroup(
305305
of: Void.self,
306306
returning: Result.self
307307
) { group in
308-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
308+
var inputIOContainer: IOChannel? = inputIOBox.take()
309309
group.addTask {
310310
if let inputIO = inputIOContainer.take() {
311311
let writer = StandardInputWriter(diskIO: inputIO)
@@ -315,7 +315,7 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
315315
}
316316

317317
// Body runs in the same isolation
318-
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO())
318+
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeIOChannel())
319319
let result = try await body(execution, errorSequence)
320320
try await group.waitForAll()
321321
return result
@@ -363,7 +363,7 @@ public func run<Result, Error: OutputProtocol>(
363363
error: try error.createPipe()
364364
) { execution, inputIO, outputIO, errorIO in
365365
let writer = StandardInputWriter(diskIO: inputIO!)
366-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
366+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
367367
return try await body(execution, writer, outputSequence)
368368
}
369369
}
@@ -408,7 +408,7 @@ public func run<Result, Output: OutputProtocol>(
408408
error: try error.createPipe()
409409
) { execution, inputIO, outputIO, errorIO in
410410
let writer = StandardInputWriter(diskIO: inputIO!)
411-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
411+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
412412
return try await body(execution, writer, errorSequence)
413413
}
414414
}
@@ -460,8 +460,8 @@ public func run<Result>(
460460
error: try error.createPipe()
461461
) { execution, inputIO, outputIO, errorIO in
462462
let writer = StandardInputWriter(diskIO: inputIO!)
463-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
464-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
463+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
464+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
465465
return try await body(execution, writer, outputSequence, errorSequence)
466466
}
467467
}
@@ -497,16 +497,16 @@ public func run<
497497
error: try error.createPipe()
498498
) { (execution, inputIO, outputIO, errorIO) -> RunResult in
499499
// Write input, capture output and error in parallel
500-
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
501-
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
502-
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
500+
var inputIOBox: IOChannel? = consume inputIO
501+
var outputIOBox: IOChannel? = consume outputIO
502+
var errorIOBox: IOChannel? = consume errorIO
503503
return try await withThrowingTaskGroup(
504504
of: OutputCapturingState<Output.OutputType, Error.OutputType>?.self,
505505
returning: RunResult.self
506506
) { group in
507-
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
508-
var outputIOContainer: TrackedPlatformDiskIO? = outputIOBox.take()
509-
var errorIOContainer: TrackedPlatformDiskIO? = errorIOBox.take()
507+
var inputIOContainer: IOChannel? = inputIOBox.take()
508+
var outputIOContainer: IOChannel? = outputIOBox.take()
509+
var errorIOContainer: IOChannel? = errorIOBox.take()
510510
group.addTask {
511511
if let writeFd = inputIOContainer.take() {
512512
let writer = StandardInputWriter(diskIO: writeFd)
@@ -580,8 +580,8 @@ public func run<Result>(
580580
error: try error.createPipe()
581581
) { execution, inputIO, outputIO, errorIO in
582582
let writer = StandardInputWriter(diskIO: inputIO!)
583-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
584-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
583+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeIOChannel())
584+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeIOChannel())
585585
return try await body(execution, writer, outputSequence, errorSequence)
586586
}
587587
}

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
internal import Dispatch
2020
#endif
2121

22-
public struct AsyncBufferSequence: AsyncSequence, Sendable {
22+
public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
2323
public typealias Failure = any Swift.Error
2424
public typealias Element = Buffer
2525

2626
#if canImport(Darwin)
2727
internal typealias DiskIO = DispatchIO
28+
#elseif canImport(WinSDK)
29+
internal typealias DiskIO = HANDLE
2830
#else
2931
internal typealias DiskIO = FileDescriptor
3032
#endif
@@ -54,9 +56,11 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
5456
guard let data else {
5557
// We finished reading. Close the file descriptor now
5658
#if canImport(Darwin)
57-
self.diskIO.close()
59+
try _safelyClose(.dispatchIO(self.diskIO))
60+
#elseif canImport(WinSDK)
61+
try _safelyClose(.handle(self.diskIO))
5862
#else
59-
try self.diskIO.close()
63+
try _safelyClose(.fileDescriptor(self.diskIO))
6064
#endif
6165
return nil
6266
}
@@ -337,7 +341,7 @@ private let _pageSize: Int = {
337341
Int(_subprocess_vm_size())
338342
}()
339343
#elseif canImport(WinSDK)
340-
import WinSDK
344+
@preconcurrency import WinSDK
341345
private let _pageSize: Int = {
342346
var sysInfo: SYSTEM_INFO = SYSTEM_INFO()
343347
GetSystemInfo(&sysInfo)

0 commit comments

Comments
 (0)