diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 84828ea1..aa449924 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -805,8 +805,8 @@ has a chance to re-enable backpressure. To ensure this property, the `starting_pending_task` flag is set here and cleared when the pending task actually resumes execution in `enter`, preventing more pending tasks from being started in the interim. Lastly, `maybe_start_pending_task` is only called at -specific points (`wait_on` and `exit`, below) where the core wasm code has had -the opportunity to re-enable backpressure if need be. +specific points (`wait_async` and `exit`, below) where the core wasm code has +had the opportunity to re-enable backpressure if need be. ```python def maybe_start_pending_task(self): if self.inst.starting_pending_task: @@ -822,35 +822,24 @@ Notably, the loop in `maybe_start_pending_task` allows pending async tasks to start even when there is a blocked pending sync task ahead of them in the `pending_tasks` queue. -The `Task.wait_on` method defines how to block the current task on a given -Python [awaitable] using the `OnBlock` callback described above: +The `Task.wait_sync` method blocks the current task until a given Python +[awaitable] is resolved, setting the instance-wide `calling_sync_import` flag +to prohibit (via guards in `may_enter` and `wait_async`) any other tasks from +starting or resuming in the current component instance in the interim. (Tasks +in *other* component instances may however execute in the interim.) ```python - async def wait_on(self, awaitable, sync, cancellable = False) -> bool: - if sync: - assert(not self.inst.calling_sync_import) - self.inst.calling_sync_import = True - else: - self.maybe_start_pending_task() - + async def wait_sync(self, awaitable) -> None: awaitable = asyncio.ensure_future(awaitable) if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1): - cancelled = Cancelled.FALSE - else: - cancelled = await self.on_block(awaitable) - if cancelled and not cancellable: - assert(self.state == Task.State.INITIAL) - self.state = Task.State.PENDING_CANCEL - cancelled = await self.on_block(awaitable) - assert(not cancelled) - - if sync: - self.inst.calling_sync_import = False - self.inst.async_waiting_tasks.notify_all() - else: - while self.inst.calling_sync_import: - await self.inst.async_waiting_tasks.wait() - - return cancelled + return + assert(not self.inst.calling_sync_import) + self.calling_sync_import = True + if await self.on_block(awaitable) == Cancelled.TRUE: + assert(self.state == Task.State.INITIAL) + self.state = Task.State.PENDING_CANCEL + assert(await self.on_block(awaitable) == Cancelled.FALSE) + self.inst.calling_sync_import = False + self.inst.async_waiting_tasks.notify_all() ``` If the given `awaitable` is already resolved (e.g., if between making an async import call that blocked and calling `waitable-set.wait` the I/O operation @@ -858,27 +847,40 @@ completed), the Component Model allows the runtime to nondeterministically avoid calling `OnBlock` which, in component-to-component async calls, means that control flow does not need to transfer to the calling component. -If `wait_on` is called with `sync` set to `True`, only tasks in *other* -component instances may execute; no code in the current component instance may -execute. This is achieved by setting and waiting on `calling_sync_import` -(using the `async_waiting_tasks` [`asyncio.Condition`]). `calling_sync_import` -is also checked by `may_enter` (to prevent reentrance by a new task) and set by -`call_sync` (defined next). - -If `wait_on` is called with `cancellable` set to `True`, then the caller -expects and propagates the case where the supertask requests cancellation -(when `True` is returned). But if `cancellable` is `False`, then `wait_on` must -handle the cancellation request itself by setting the task's `state` to -`PENDING_CANCEL` (to be picked up by `wait_for_event`, `poll_for_event` or -`yield` in the future) and calling `OnBlock` a second time (noting that -`OnBlock` can only request cancellation *once*). +Since synchronous callers may not be have enough context to know how or where +to propagate cancellation, if a supertask requests cancellation while +synchronously waiting, `wait_sync` remembers the request for cancellation and +then keeps waiting (which, since cancellation is delivered at most once, will +not return `Cancelled.TRUE` again). Cancellation will be delivered to core +wasm code at the next asynchronous call where cancellation is expected. + +The `Task.wait_async` method blocks the current task until a given Python +[awaitable] is resolved. Unlike `wait_sync`, `wait_async` allows other tasks to +be started or resumed in the current component instance in the interim. +However, while asynchronously waiting, another task may start *synchronously* +waiting and thus `wait_async` takes care to wait until `calling_sync_import` is +cleared before returning control flow back to the calling task: +```python + async def wait_async(self, awaitable) -> Cancelled: + self.maybe_start_pending_task() + awaitable = asyncio.ensure_future(awaitable) + if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1): + return Cancelled.FALSE + cancelled = await self.on_block(awaitable) + while self.inst.calling_sync_import: + cancelled |= await self.on_block(self.inst.async_waiting_tasks.wait()) + return cancelled +``` +The `maybe_start_pending_task()` call on the first line indicates that each +time a task asynchronously waits, it potentially unblocks a single pending task +that had previously hit backpressure and queued, but has since been unblocked. The `Task.call_sync` method defines how a task makes a synchronous call to an -imported `callee`. `call_sync` works just like `wait_on` when `sync` is `True` -and `cancellable` is `False` except that `call_sync` avoids unconditionally -blocking and instead only blocks if `callee` transitively blocks. This means -that N-deep synchronous callstacks avoid the overhead of async calls if none of -the calls in the stack actually block on external I/O. +imported `callee`. `call_sync` works just like `wait_sync` except that +`call_sync` avoids unconditionally blocking and instead only blocks if `callee` +transitively blocks. This means that N-deep synchronous callstacks avoid the +overhead of async calls if none of the calls in the stack actually block on +external I/O. ```python async def call_sync(self, callee, on_start, on_return): async def sync_on_block(awaitable): @@ -900,21 +902,24 @@ when a `callback` is used, when the `callback` returns `WAIT` to the event loop. `wait_for_event` waits until a `Waitable` in a given `WaitableSet` makes progress: ```python - async def wait_for_event(self, waitable_set, sync) -> EventTuple: + async def wait_for_event(self, wset, sync) -> EventTuple: if self.state == Task.State.PENDING_CANCEL: self.state = Task.State.CANCEL_DELIVERED return (EventCode.TASK_CANCELLED, 0, 0) else: - waitable_set.num_waiting += 1 + wset.num_waiting += 1 e = None while not e: - maybe_event = waitable_set.maybe_has_pending_event.wait() - if await self.wait_on(maybe_event, sync, cancellable = True): - assert(self.state == Task.State.INITIAL) - self.state = Task.State.CANCEL_DELIVERED - return (EventCode.TASK_CANCELLED, 0, 0) - e = waitable_set.poll() - waitable_set.num_waiting -= 1 + if sync: + await self.wait_sync(wset.maybe_has_pending_event.wait()) + else: + if await self.wait_async(wset.maybe_has_pending_event.wait()) == Cancelled.TRUE: + assert(self.state == Task.State.INITIAL) + self.state = Task.State.CANCEL_DELIVERED + e = (EventCode.TASK_CANCELLED, 0, 0) + break + e = wset.poll() + wset.num_waiting -= 1 return e ``` As mentioned above with `WaitableSet`, `maybe_has_pending_event` is allowed to @@ -923,10 +928,10 @@ there is actually an event. This looping as well as the number of iterations is not semantically observable by the wasm code and so the host implementation can loop or not using its own event delivery scheme. -If there is already a pending cancellation request (from a previous -non-cancellable `wait_on` or a `call_sync`), the cancellation request is -delivered to core wasm via the `TASK_CANCELLED` event code and task's `state` -is transitioned to `CANCEL_DELIVERED` so that `canon_task_cancel` can be called +If there is already a pending cancellation request (during a previous +`wait_sync` or a `call_sync`), the cancellation request is now delivered to +core wasm via the `TASK_CANCELLED` event code and task's `state` is +transitioned to `CANCEL_DELIVERED` so that `canon_task_cancel` can be called without trapping. If cancellation is requested *during* `wait_for_event`, there is a direct transition to the `CANCEL_DELIVERED` state. @@ -941,12 +946,16 @@ control flow to switch to other `asyncio.Task`s. if self.state == Task.State.PENDING_CANCEL: self.state = Task.State.CANCEL_DELIVERED return (EventCode.TASK_CANCELLED, 0, 0) - elif await self.wait_on(asyncio.sleep(0), sync, cancellable = True): - assert(self.state == Task.State.INITIAL) - self.state = Task.State.CANCEL_DELIVERED - return (EventCode.TASK_CANCELLED, 0, 0) - else: + elif sync: + await self.wait_sync(asyncio.sleep(0)) return (EventCode.NONE, 0, 0) + else: + if await self.wait_async(asyncio.sleep(0)) == Cancelled.TRUE: + assert(self.state == Task.State.INITIAL) + self.state = Task.State.CANCEL_DELIVERED + return (EventCode.TASK_CANCELLED, 0, 0) + else: + return (EventCode.NONE, 0, 0) ``` Handling of cancellation requests in `yield_` mirrors `wait_for_event` above, handling both the cases of pending cancellation and cancellation while @@ -957,11 +966,11 @@ when a `callback` is used, when the `callback` returns `POLL` to the event loop. Polling returns the `NONE` event code instead of blocking when there are no pending events. ```python - async def poll_for_event(self, waitable_set, sync) -> Optional[EventTuple]: + async def poll_for_event(self, wset, sync) -> Optional[EventTuple]: event_code,_,_ = e = await self.yield_(sync) if event_code == EventCode.TASK_CANCELLED: return e - elif (e := waitable_set.poll()): + elif (e := wset.poll()): return e else: return (EventCode.NONE, 0, 0) @@ -3579,10 +3588,8 @@ execute, however tasks in *other* component instances may execute. This allows a long-running task in one component to avoid starving other components without needing support full reentrancy. -Because other tasks can execute, a subtask can be cancelled while executing -`yield`, in which case `yield` returns `1`. The language runtime and bindings -generators should handle cancellation the same way as when receiving the -`TASK_CANCELLED` event from `waitable-set.wait`. +If `async` is set, `yield` can return the `1` to indicate that the caller +requested cancellation. If `async` is not set, `yield` will always return `0`. ### 🔀 `canon waitable-set.new` @@ -3636,11 +3643,15 @@ in the same component instance, which can be useful for producer toolchains in situations where interleaving is not supported. However, this is generally worse for concurrency and thus producer toolchains should set `async` when possible. -`wait` can be called from a synchronously-lifted export so that even -synchronous code can make concurrent import calls. In these synchronous cases, -though, the automatic backpressure (applied by `Task.enter`) will ensure there -is only ever at most once synchronously-lifted task executing in a component -instance at a time. +If `async` is set, `waitable-set.wait` can return the `TASK_CANCELLED` event +indicating that the caller requested cancellation. If `async` is not set, this +notification will be remembered and delivered at the next `async` call. + +`waitable-set.wait` can be called from a synchronously-lifted export so that +even synchronous code can make concurrent import calls. In these synchronous +cases, though, the automatic backpressure (applied by `Task.enter`) will ensure +there is only ever at most once synchronously-lifted task executing in a +component instance at a time. ### 🔀 `canon waitable-set.poll` @@ -3667,6 +3678,10 @@ async def canon_waitable_set_poll(sync, mem, task, si, ptr): When `async` is set, `poll_for_event` can yield to other tasks (in this or other components) as part of polling for an event. +If `async` is set, `waitable-set.poll` can return the `TASK_CANCELLED` event +indicating that the caller requested cancellation. If `async` is not set, this +notification will be remembered and delivered at the next `async` call. + ### 🔀 `canon waitable-set.drop` @@ -3764,7 +3779,7 @@ async def canon_subtask_cancel(sync, task, i): while not subtask.resolved(): if subtask.has_pending_event(): _ = subtask.get_event() - await task.wait_on(subtask.wait_for_pending_event(), sync = True) + await task.wait_sync(subtask.wait_for_pending_event()) else: if not subtask.resolved(): return [BLOCKED] @@ -3932,7 +3947,7 @@ instance, but allowing other tasks in other component instances to make progress): ```python if opts.sync and not e.has_pending_event(): - await task.wait_on(e.wait_for_pending_event(), sync = True) + await task.wait_sync(e.wait_for_pending_event()) ``` Finally, if there is a pending event on the stream end (which is necessarily a `copy_event` closure), it is eagerly returned to the caller. Otherwise, the @@ -4021,9 +4036,8 @@ in the high 28 bits; they're always zero. The end of `future_copy` is the exact same as `stream_copy`: waiting if `sync` and returning either the progress made or `BLOCKED`. ```python - if opts.sync and not e.has_pending_event(): - await task.wait_on(e.wait_for_pending_event(), sync = True) + await task.wait_sync(e.wait_for_pending_event()) if e.has_pending_event(): code,index,payload = e.get_event() @@ -4073,7 +4087,7 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i): e.shared.cancel() if not e.has_pending_event(): if sync: - await task.wait_on(e.wait_for_pending_event(), sync = True) + await task.wait_sync(e.wait_for_pending_event()) else: return [BLOCKED] code,index,payload = e.get_event() @@ -4087,8 +4101,8 @@ callbacks (passed by `canon_{stream,future}_{read,write}` above) which will set a pending event that is caught by the *second* check for `e.has_pending_event()`. -If the copy hasn't been cancelled, the synchronous case uses `Task.wait_on` to -synchronously and uninterruptibly wait for one of the `on_*` callbacks to +If the copy hasn't been cancelled, the synchronous case uses `Task.wait_sync` +to synchronously and uninterruptibly wait for one of the `on_*` callbacks to eventually be called (which will set the pending event). The asynchronous case simply returns `BLOCKING` and the client code must wait diff --git a/design/mvp/Explainer.md b/design/mvp/Explainer.md index a991ca20..473f86b3 100644 --- a/design/mvp/Explainer.md +++ b/design/mvp/Explainer.md @@ -1615,14 +1615,17 @@ expecting a return value. (See also "[Cancellation]" in the async explainer and | Canonical ABI signature | `[] -> [i32]` | The `yield` built-in allows the runtime to switch to other tasks, enabling a -long-running computation to cooperatively interleave execution. `yield` returns -`true` (`1`) if the caller has requested [cancellation] of the [current task]. +long-running computation to cooperatively interleave execution. If the `async` +immediate is present, the runtime can switch to other tasks in the *same* +component instance, which the calling core wasm must be prepared to handle. If +`async` is not present, only tasks in *other* component instances may be +switched to. -If the `async` immediate is present, the runtime can switch to other tasks in -the *same* component instance, which the calling core wasm must be prepared to -handle. If `async` is not present, only tasks in *other* component instances -may be switched to. (See also [`canon_yield`] in the Canonical ABI explainer -for details.) +If `async` is set, `yield` can return `true` (`1`) if the caller has +requested [cancellation] of the [current task]. If `async` is not set, +`yield` will always return `false` (`0`). + +(See also [`canon_yield`] in the Canonical ABI explainer for details.) ###### 🔀 `waitable-set.new` @@ -1683,6 +1686,11 @@ The meanings of the `{stream,future}-{read,write}` events/payloads are given as part [`stream.read` and `stream.write`](#-streamread-and-streamwrite) and [`future.read` and `future.write`](#-futureread-and-futurewrite) below. +If the `async` immediate is set, `waitable-set.wait` can return the +`task-cancelled` event to indicate that the caller requested [cancellation] of +the [current task]. If `async` is not set, `task-cancelled` will never be +delivered. + In the Canonical ABI, the `event-code` return value provides the `event` discriminant and the case payloads are stored as two contiguous `i32`s at the 8-byte-aligned address `payload-addr`. (See also [`canon_waitable_set_wait`] diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index fde2e407..d9f0fffa 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -532,31 +532,27 @@ def maybe_start_pending_task(self): pending_future.set_result(None) return - async def wait_on(self, awaitable, sync, cancellable = False) -> bool: - if sync: - assert(not self.inst.calling_sync_import) - self.inst.calling_sync_import = True - else: - self.maybe_start_pending_task() - + async def wait_sync(self, awaitable) -> None: awaitable = asyncio.ensure_future(awaitable) if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1): - cancelled = Cancelled.FALSE - else: - cancelled = await self.on_block(awaitable) - if cancelled and not cancellable: - assert(self.state == Task.State.INITIAL) - self.state = Task.State.PENDING_CANCEL - cancelled = await self.on_block(awaitable) - assert(not cancelled) - - if sync: - self.inst.calling_sync_import = False - self.inst.async_waiting_tasks.notify_all() - else: - while self.inst.calling_sync_import: - await self.inst.async_waiting_tasks.wait() + return + assert(not self.inst.calling_sync_import) + self.calling_sync_import = True + if await self.on_block(awaitable) == Cancelled.TRUE: + assert(self.state == Task.State.INITIAL) + self.state = Task.State.PENDING_CANCEL + assert(await self.on_block(awaitable) == Cancelled.FALSE) + self.inst.calling_sync_import = False + self.inst.async_waiting_tasks.notify_all() + async def wait_async(self, awaitable) -> Cancelled: + self.maybe_start_pending_task() + awaitable = asyncio.ensure_future(awaitable) + if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1): + return Cancelled.FALSE + cancelled = await self.on_block(awaitable) + while self.inst.calling_sync_import: + cancelled |= await self.on_block(self.inst.async_waiting_tasks.wait()) return cancelled async def call_sync(self, callee, on_start, on_return): @@ -573,39 +569,46 @@ async def sync_on_block(awaitable): self.inst.calling_sync_import = False self.inst.async_waiting_tasks.notify_all() - async def wait_for_event(self, waitable_set, sync) -> EventTuple: + async def wait_for_event(self, wset, sync) -> EventTuple: if self.state == Task.State.PENDING_CANCEL: self.state = Task.State.CANCEL_DELIVERED return (EventCode.TASK_CANCELLED, 0, 0) else: - waitable_set.num_waiting += 1 + wset.num_waiting += 1 e = None while not e: - maybe_event = waitable_set.maybe_has_pending_event.wait() - if await self.wait_on(maybe_event, sync, cancellable = True): - assert(self.state == Task.State.INITIAL) - self.state = Task.State.CANCEL_DELIVERED - return (EventCode.TASK_CANCELLED, 0, 0) - e = waitable_set.poll() - waitable_set.num_waiting -= 1 + if sync: + await self.wait_sync(wset.maybe_has_pending_event.wait()) + else: + if await self.wait_async(wset.maybe_has_pending_event.wait()) == Cancelled.TRUE: + assert(self.state == Task.State.INITIAL) + self.state = Task.State.CANCEL_DELIVERED + e = (EventCode.TASK_CANCELLED, 0, 0) + break + e = wset.poll() + wset.num_waiting -= 1 return e async def yield_(self, sync) -> EventTuple: if self.state == Task.State.PENDING_CANCEL: self.state = Task.State.CANCEL_DELIVERED return (EventCode.TASK_CANCELLED, 0, 0) - elif await self.wait_on(asyncio.sleep(0), sync, cancellable = True): - assert(self.state == Task.State.INITIAL) - self.state = Task.State.CANCEL_DELIVERED - return (EventCode.TASK_CANCELLED, 0, 0) - else: + elif sync: + await self.wait_sync(asyncio.sleep(0)) return (EventCode.NONE, 0, 0) + else: + if await self.wait_async(asyncio.sleep(0)) == Cancelled.TRUE: + assert(self.state == Task.State.INITIAL) + self.state = Task.State.CANCEL_DELIVERED + return (EventCode.TASK_CANCELLED, 0, 0) + else: + return (EventCode.NONE, 0, 0) - async def poll_for_event(self, waitable_set, sync) -> Optional[EventTuple]: + async def poll_for_event(self, wset, sync) -> Optional[EventTuple]: event_code,_,_ = e = await self.yield_(sync) if event_code == EventCode.TASK_CANCELLED: return e - elif (e := waitable_set.poll()): + elif (e := wset.poll()): return e else: return (EventCode.NONE, 0, 0) @@ -2220,7 +2223,7 @@ async def canon_subtask_cancel(sync, task, i): while not subtask.resolved(): if subtask.has_pending_event(): _ = subtask.get_event() - await task.wait_on(subtask.wait_for_pending_event(), sync = True) + await task.wait_sync(subtask.wait_for_pending_event()) else: if not subtask.resolved(): return [BLOCKED] @@ -2296,7 +2299,7 @@ def on_copy_done(result): e.copy(task.inst, buffer, on_copy, on_copy_done) if opts.sync and not e.has_pending_event(): - await task.wait_on(e.wait_for_pending_event(), sync = True) + await task.wait_sync(e.wait_for_pending_event()) if e.has_pending_event(): code,index,payload = e.get_event() @@ -2342,7 +2345,7 @@ def on_copy_done(result): e.copy(task.inst, buffer, on_copy_done) if opts.sync and not e.has_pending_event(): - await task.wait_on(e.wait_for_pending_event(), sync = True) + await task.wait_sync(e.wait_for_pending_event()) if e.has_pending_event(): code,index,payload = e.get_event() @@ -2375,7 +2378,7 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i): e.shared.cancel() if not e.has_pending_event(): if sync: - await task.wait_on(e.wait_for_pending_event(), sync = True) + await task.wait_sync(e.wait_for_pending_event()) else: return [BLOCKED] code,index,payload = e.get_event() diff --git a/test/async/cancel-subtask.wast b/test/async/cancel-subtask.wast index 54ad413d..5c0a7185 100644 --- a/test/async/cancel-subtask.wast +++ b/test/async/cancel-subtask.wast @@ -9,8 +9,10 @@ (core module $CM (import "" "mem" (memory 1)) (import "" "task.cancel" (func $task.cancel)) + (import "" "future.read" (func $future.read (param i32 i32) (result i32))) (import "" "waitable.join" (func $waitable.join (param i32 i32))) (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) ;; $ws is waited on by 'f' (global $ws (mut i32) (i32.const 0)) @@ -34,24 +36,53 @@ (call $task.cancel) (i32.const 0 (; EXIT ;)) ) + + (func $g (export "g") (param $futr i32) (result i32) + (local $ret i32) + (local $event_code i32) + + ;; perform a future.read which will block, waiting for the caller to write + (local.set $ret (call $future.read (local.get $futr) (i32.const 0xdeadbeef))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + (call $waitable.join (local.get $futr) (global.get $ws)) + + ;; wait on $ws synchronously, don't expect cancellation + (local.set $event_code (call $waitable-set.wait (global.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 4 (; FUTURE_READ ;)) (local.get $event_code)) + (then unreachable)) + + ;; finish returning a value + (i32.const 42) + ) ) + (type $FT (future)) (canon task.cancel (core func $task.cancel)) + (canon future.read $FT async (memory $memory "mem") (core func $future.read)) (canon waitable.join (core func $waitable.join)) (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) (core instance $cm (instantiate $CM (with "" (instance (export "mem" (memory $memory "mem")) (export "task.cancel" (func $task.cancel)) + (export "future.read" (func $future.read)) (export "waitable.join" (func $waitable.join)) (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) )))) (func (export "f") (result u32) (canon lift (core func $cm "f") async (callback (func $cm "f_cb")) )) + (func (export "g") (param "fut" $FT) (result u32) (canon lift + (core func $cm "g") + )) ) (component $D + (type $FT (future)) (import "f" (func $f (result u32))) + (import "g" (func $g (param "fut" $FT) (result u32))) (core module $Memory (memory (export "mem") 1)) (core instance $memory (instantiate $Memory)) @@ -59,12 +90,21 @@ (import "" "mem" (memory 1)) (import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32))) (import "" "subtask.drop" (func $subtask.drop (param i32))) + (import "" "future.new" (func $future.new (result i64))) + (import "" "future.write" (func $future.write (param i32 i32) (result i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) (import "" "f" (func $f (param i32) (result i32))) + (import "" "g" (func $g (param i32 i32) (result i32))) (func $run (export "run") (result i32) - (local $ret i32) (local $retp i32) + (local $ret i32) (local $ret64 i64) + (local $retp i32) (local $retp1 i32) (local $retp2 i32) (local $subtask i32) (local $event_code i32) + (local $futr i32) (local $futw i32) + (local $ws i32) ;; call 'f'; it should block (local.set $retp (i32.const 4)) @@ -85,24 +125,77 @@ (call $subtask.drop (local.get $subtask)) + ;; create future that g will wait on + (local.set $ret64 (call $future.new)) + (local.set $futr (i32.wrap_i64 (local.get $ret64))) + (local.set $futw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + + ;; call 'g'; it should block + (local.set $retp1 (i32.const 4)) + (local.set $retp2 (i32.const 8)) + (i32.store (local.get $retp1) (i32.const 0xbad0bad0)) + (i32.store (local.get $retp2) (i32.const 0xbad0bad0)) + (local.set $ret (call $g (local.get $futr) (local.get $retp1))) + (if (i32.ne (i32.const 1 (; STARTED ;)) (i32.and (local.get $ret) (i32.const 0xf))) + (then unreachable)) + (local.set $subtask (i32.shr_u (local.get $ret) (i32.const 4))) + + ;; cancel 'g'; it should block + (local.set $ret (call $subtask.cancel (local.get $subtask))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; future.write, unblocking 'g' + (local.set $ret (call $future.write (local.get $futw) (i32.const 0xdeadbeef))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + + ;; wait to see 'g' finish and check its return value + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $subtask) (local.get $ws)) + (local.set $event_code (call $waitable-set.wait (local.get $ws) (local.get $retp2))) + (if (i32.ne (i32.const 1 (; SUBTASK ;)) (local.get $event_code)) + (then unreachable)) + (if (i32.ne (local.get $subtask) (i32.load (local.get $retp2))) + (then unreachable)) + (if (i32.ne (i32.const 2 (; RETURNED=2 | (0<<4) ;)) (i32.load offset=4 (local.get $retp2))) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load (local.get $retp1))) + (then unreachable)) + ;; return to the top-level assert_return (i32.const 42) ) ) (canon subtask.cancel (core func $subtask.cancel)) (canon subtask.drop (core func $subtask.drop)) + (canon future.new $FT (core func $future.new)) + (canon future.write $FT async (memory $memory "mem") (core func $future.write)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) (canon lower (func $f) async (memory $memory "mem") (core func $f')) + (canon lower (func $g) async (memory $memory "mem") (core func $g')) (core instance $dm (instantiate $DM (with "" (instance (export "mem" (memory $memory "mem")) (export "subtask.cancel" (func $subtask.cancel)) (export "subtask.drop" (func $subtask.drop)) + (export "future.new" (func $future.new)) + (export "future.write" (func $future.write)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) (export "f" (func $f')) + (export "g" (func $g')) )))) (func (export "run") (result u32) (canon lift (core func $dm "run"))) ) (instance $c (instantiate $C)) - (instance $d (instantiate $D (with "f" (func $c "f")))) + (instance $d (instantiate $D + (with "f" (func $c "f")) + (with "g" (func $c "g")) + )) (func (export "run") (alias export $d "run")) ) (assert_return (invoke "run") (u32.const 42))