Skip to content

Don't deliver cancellation to sync wait/poll/yield #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 97 additions & 83 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -805,8 +805,8 @@ has a chance to re-enable backpressure. To ensure this property, the
`starting_pending_task` flag is set here and cleared when the pending task
actually resumes execution in `enter`, preventing more pending tasks from being
started in the interim. Lastly, `maybe_start_pending_task` is only called at
specific points (`wait_on` and `exit`, below) where the core wasm code has had
the opportunity to re-enable backpressure if need be.
specific points (`wait_async` and `exit`, below) where the core wasm code has
had the opportunity to re-enable backpressure if need be.
```python
def maybe_start_pending_task(self):
if self.inst.starting_pending_task:
Expand All @@ -822,63 +822,65 @@ Notably, the loop in `maybe_start_pending_task` allows pending async tasks to
start even when there is a blocked pending sync task ahead of them in the
`pending_tasks` queue.

The `Task.wait_on` method defines how to block the current task on a given
Python [awaitable] using the `OnBlock` callback described above:
The `Task.wait_sync` method blocks the current task until a given Python
[awaitable] is resolved, setting the instance-wide `calling_sync_import` flag
to prohibit (via guards in `may_enter` and `wait_async`) any other tasks from
starting or resuming in the current component instance in the interim. (Tasks
in *other* component instances may however execute in the interim.)
```python
async def wait_on(self, awaitable, sync, cancellable = False) -> bool:
if sync:
assert(not self.inst.calling_sync_import)
self.inst.calling_sync_import = True
else:
self.maybe_start_pending_task()

async def wait_sync(self, awaitable) -> None:
awaitable = asyncio.ensure_future(awaitable)
if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1):
cancelled = Cancelled.FALSE
else:
cancelled = await self.on_block(awaitable)
if cancelled and not cancellable:
assert(self.state == Task.State.INITIAL)
self.state = Task.State.PENDING_CANCEL
cancelled = await self.on_block(awaitable)
assert(not cancelled)

if sync:
self.inst.calling_sync_import = False
self.inst.async_waiting_tasks.notify_all()
else:
while self.inst.calling_sync_import:
await self.inst.async_waiting_tasks.wait()

return cancelled
return
assert(not self.inst.calling_sync_import)
self.calling_sync_import = True
if await self.on_block(awaitable) == Cancelled.TRUE:
assert(self.state == Task.State.INITIAL)
self.state = Task.State.PENDING_CANCEL
assert(await self.on_block(awaitable) == Cancelled.FALSE)
self.inst.calling_sync_import = False
self.inst.async_waiting_tasks.notify_all()
```
If the given `awaitable` is already resolved (e.g., if between making an async
import call that blocked and calling `waitable-set.wait` the I/O operation
completed), the Component Model allows the runtime to nondeterministically
avoid calling `OnBlock` which, in component-to-component async calls, means
that control flow does not need to transfer to the calling component.

If `wait_on` is called with `sync` set to `True`, only tasks in *other*
component instances may execute; no code in the current component instance may
execute. This is achieved by setting and waiting on `calling_sync_import`
(using the `async_waiting_tasks` [`asyncio.Condition`]). `calling_sync_import`
is also checked by `may_enter` (to prevent reentrance by a new task) and set by
`call_sync` (defined next).

If `wait_on` is called with `cancellable` set to `True`, then the caller
expects and propagates the case where the supertask requests cancellation
(when `True` is returned). But if `cancellable` is `False`, then `wait_on` must
handle the cancellation request itself by setting the task's `state` to
`PENDING_CANCEL` (to be picked up by `wait_for_event`, `poll_for_event` or
`yield` in the future) and calling `OnBlock` a second time (noting that
`OnBlock` can only request cancellation *once*).
Since synchronous callers may not be have enough context to know how or where
to propagate cancellation, if a supertask requests cancellation while
synchronously waiting, `wait_sync` remembers the request for cancellation and
then keeps waiting (which, since cancellation is delivered at most once, will
not return `Cancelled.TRUE` again). Cancellation will be delivered to core
wasm code at the next asynchronous call where cancellation is expected.

The `Task.wait_async` method blocks the current task until a given Python
[awaitable] is resolved. Unlike `wait_sync`, `wait_async` allows other tasks to
be started or resumed in the current component instance in the interim.
However, while asynchronously waiting, another task may start *synchronously*
waiting and thus `wait_async` takes care to wait until `calling_sync_import` is
cleared before returning control flow back to the calling task:
```python
async def wait_async(self, awaitable) -> Cancelled:
self.maybe_start_pending_task()
awaitable = asyncio.ensure_future(awaitable)
if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1):
return Cancelled.FALSE
cancelled = await self.on_block(awaitable)
while self.inst.calling_sync_import:
cancelled |= await self.on_block(self.inst.async_waiting_tasks.wait())
return cancelled
```
The `maybe_start_pending_task()` call on the first line indicates that each
time a task asynchronously waits, it potentially unblocks a single pending task
that had previously hit backpressure and queued, but has since been unblocked.

The `Task.call_sync` method defines how a task makes a synchronous call to an
imported `callee`. `call_sync` works just like `wait_on` when `sync` is `True`
and `cancellable` is `False` except that `call_sync` avoids unconditionally
blocking and instead only blocks if `callee` transitively blocks. This means
that N-deep synchronous callstacks avoid the overhead of async calls if none of
the calls in the stack actually block on external I/O.
imported `callee`. `call_sync` works just like `wait_sync` except that
`call_sync` avoids unconditionally blocking and instead only blocks if `callee`
transitively blocks. This means that N-deep synchronous callstacks avoid the
overhead of async calls if none of the calls in the stack actually block on
external I/O.
```python
async def call_sync(self, callee, on_start, on_return):
async def sync_on_block(awaitable):
Expand All @@ -900,21 +902,24 @@ when a `callback` is used, when the `callback` returns `WAIT` to the event
loop. `wait_for_event` waits until a `Waitable` in a given `WaitableSet` makes
progress:
```python
async def wait_for_event(self, waitable_set, sync) -> EventTuple:
async def wait_for_event(self, wset, sync) -> EventTuple:
if self.state == Task.State.PENDING_CANCEL:
self.state = Task.State.CANCEL_DELIVERED
return (EventCode.TASK_CANCELLED, 0, 0)
else:
waitable_set.num_waiting += 1
wset.num_waiting += 1
e = None
while not e:
maybe_event = waitable_set.maybe_has_pending_event.wait()
if await self.wait_on(maybe_event, sync, cancellable = True):
assert(self.state == Task.State.INITIAL)
self.state = Task.State.CANCEL_DELIVERED
return (EventCode.TASK_CANCELLED, 0, 0)
e = waitable_set.poll()
waitable_set.num_waiting -= 1
if sync:
await self.wait_sync(wset.maybe_has_pending_event.wait())
else:
if await self.wait_async(wset.maybe_has_pending_event.wait()) == Cancelled.TRUE:
assert(self.state == Task.State.INITIAL)
self.state = Task.State.CANCEL_DELIVERED
e = (EventCode.TASK_CANCELLED, 0, 0)
break
e = wset.poll()
wset.num_waiting -= 1
return e
```
As mentioned above with `WaitableSet`, `maybe_has_pending_event` is allowed to
Expand All @@ -923,10 +928,10 @@ there is actually an event. This looping as well as the number of iterations is
not semantically observable by the wasm code and so the host implementation can
loop or not using its own event delivery scheme.

If there is already a pending cancellation request (from a previous
non-cancellable `wait_on` or a `call_sync`), the cancellation request is
delivered to core wasm via the `TASK_CANCELLED` event code and task's `state`
is transitioned to `CANCEL_DELIVERED` so that `canon_task_cancel` can be called
If there is already a pending cancellation request (during a previous
`wait_sync` or a `call_sync`), the cancellation request is now delivered to
core wasm via the `TASK_CANCELLED` event code and task's `state` is
transitioned to `CANCEL_DELIVERED` so that `canon_task_cancel` can be called
without trapping. If cancellation is requested *during* `wait_for_event`, there
is a direct transition to the `CANCEL_DELIVERED` state.

Expand All @@ -941,12 +946,16 @@ control flow to switch to other `asyncio.Task`s.
if self.state == Task.State.PENDING_CANCEL:
self.state = Task.State.CANCEL_DELIVERED
return (EventCode.TASK_CANCELLED, 0, 0)
elif await self.wait_on(asyncio.sleep(0), sync, cancellable = True):
assert(self.state == Task.State.INITIAL)
self.state = Task.State.CANCEL_DELIVERED
return (EventCode.TASK_CANCELLED, 0, 0)
else:
elif sync:
await self.wait_sync(asyncio.sleep(0))
return (EventCode.NONE, 0, 0)
else:
if await self.wait_async(asyncio.sleep(0)) == Cancelled.TRUE:
assert(self.state == Task.State.INITIAL)
self.state = Task.State.CANCEL_DELIVERED
return (EventCode.TASK_CANCELLED, 0, 0)
else:
return (EventCode.NONE, 0, 0)
```
Handling of cancellation requests in `yield_` mirrors `wait_for_event` above,
handling both the cases of pending cancellation and cancellation while
Expand All @@ -957,11 +966,11 @@ when a `callback` is used, when the `callback` returns `POLL` to the event
loop. Polling returns the `NONE` event code instead of blocking when there are
no pending events.
```python
async def poll_for_event(self, waitable_set, sync) -> Optional[EventTuple]:
async def poll_for_event(self, wset, sync) -> Optional[EventTuple]:
event_code,_,_ = e = await self.yield_(sync)
if event_code == EventCode.TASK_CANCELLED:
return e
elif (e := waitable_set.poll()):
elif (e := wset.poll()):
return e
else:
return (EventCode.NONE, 0, 0)
Expand Down Expand Up @@ -3579,10 +3588,8 @@ execute, however tasks in *other* component instances may execute. This allows
a long-running task in one component to avoid starving other components without
needing support full reentrancy.

Because other tasks can execute, a subtask can be cancelled while executing
`yield`, in which case `yield` returns `1`. The language runtime and bindings
generators should handle cancellation the same way as when receiving the
`TASK_CANCELLED` event from `waitable-set.wait`.
If `async` is set, `yield` can return the `1` to indicate that the caller
requested cancellation. If `async` is not set, `yield` will always return `0`.


### 🔀 `canon waitable-set.new`
Expand Down Expand Up @@ -3636,11 +3643,15 @@ in the same component instance, which can be useful for producer toolchains in
situations where interleaving is not supported. However, this is generally worse
for concurrency and thus producer toolchains should set `async` when possible.

`wait` can be called from a synchronously-lifted export so that even
synchronous code can make concurrent import calls. In these synchronous cases,
though, the automatic backpressure (applied by `Task.enter`) will ensure there
is only ever at most once synchronously-lifted task executing in a component
instance at a time.
If `async` is set, `waitable-set.wait` can return the `TASK_CANCELLED` event
indicating that the caller requested cancellation. If `async` is not set, this
notification will be remembered and delivered at the next `async` call.

`waitable-set.wait` can be called from a synchronously-lifted export so that
even synchronous code can make concurrent import calls. In these synchronous
cases, though, the automatic backpressure (applied by `Task.enter`) will ensure
there is only ever at most once synchronously-lifted task executing in a
component instance at a time.


### 🔀 `canon waitable-set.poll`
Expand All @@ -3667,6 +3678,10 @@ async def canon_waitable_set_poll(sync, mem, task, si, ptr):
When `async` is set, `poll_for_event` can yield to other tasks (in this or other
components) as part of polling for an event.

If `async` is set, `waitable-set.poll` can return the `TASK_CANCELLED` event
indicating that the caller requested cancellation. If `async` is not set, this
notification will be remembered and delivered at the next `async` call.


### 🔀 `canon waitable-set.drop`

Expand Down Expand Up @@ -3764,7 +3779,7 @@ async def canon_subtask_cancel(sync, task, i):
while not subtask.resolved():
if subtask.has_pending_event():
_ = subtask.get_event()
await task.wait_on(subtask.wait_for_pending_event(), sync = True)
await task.wait_sync(subtask.wait_for_pending_event())
else:
if not subtask.resolved():
return [BLOCKED]
Expand Down Expand Up @@ -3932,7 +3947,7 @@ instance, but allowing other tasks in other component instances to make
progress):
```python
if opts.sync and not e.has_pending_event():
await task.wait_on(e.wait_for_pending_event(), sync = True)
await task.wait_sync(e.wait_for_pending_event())
```
Finally, if there is a pending event on the stream end (which is necessarily a
`copy_event` closure), it is eagerly returned to the caller. Otherwise, the
Expand Down Expand Up @@ -4021,9 +4036,8 @@ in the high 28 bits; they're always zero.
The end of `future_copy` is the exact same as `stream_copy`: waiting if `sync`
and returning either the progress made or `BLOCKED`.
```python

if opts.sync and not e.has_pending_event():
await task.wait_on(e.wait_for_pending_event(), sync = True)
await task.wait_sync(e.wait_for_pending_event())

if e.has_pending_event():
code,index,payload = e.get_event()
Expand Down Expand Up @@ -4073,7 +4087,7 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
e.shared.cancel()
if not e.has_pending_event():
if sync:
await task.wait_on(e.wait_for_pending_event(), sync = True)
await task.wait_sync(e.wait_for_pending_event())
else:
return [BLOCKED]
code,index,payload = e.get_event()
Expand All @@ -4087,8 +4101,8 @@ callbacks (passed by `canon_{stream,future}_{read,write}` above) which will set
a pending event that is caught by the *second* check for
`e.has_pending_event()`.

If the copy hasn't been cancelled, the synchronous case uses `Task.wait_on` to
synchronously and uninterruptibly wait for one of the `on_*` callbacks to
If the copy hasn't been cancelled, the synchronous case uses `Task.wait_sync`
to synchronously and uninterruptibly wait for one of the `on_*` callbacks to
eventually be called (which will set the pending event).

The asynchronous case simply returns `BLOCKING` and the client code must wait
Expand Down
22 changes: 15 additions & 7 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1615,14 +1615,17 @@ expecting a return value. (See also "[Cancellation]" in the async explainer and
| Canonical ABI signature | `[] -> [i32]` |

The `yield` built-in allows the runtime to switch to other tasks, enabling a
long-running computation to cooperatively interleave execution. `yield` returns
`true` (`1`) if the caller has requested [cancellation] of the [current task].
long-running computation to cooperatively interleave execution. If the `async`
immediate is present, the runtime can switch to other tasks in the *same*
component instance, which the calling core wasm must be prepared to handle. If
`async` is not present, only tasks in *other* component instances may be
switched to.

If the `async` immediate is present, the runtime can switch to other tasks in
the *same* component instance, which the calling core wasm must be prepared to
handle. If `async` is not present, only tasks in *other* component instances
may be switched to. (See also [`canon_yield`] in the Canonical ABI explainer
for details.)
If `async` is set, `yield` can return `true` (`1`) if the caller has
requested [cancellation] of the [current task]. If `async` is not set,
`yield` will always return `false` (`0`).

(See also [`canon_yield`] in the Canonical ABI explainer for details.)

###### 🔀 `waitable-set.new`

Expand Down Expand Up @@ -1683,6 +1686,11 @@ The meanings of the `{stream,future}-{read,write}` events/payloads are given as
part [`stream.read` and `stream.write`](#-streamread-and-streamwrite) and
[`future.read` and `future.write`](#-futureread-and-futurewrite) below.

If the `async` immediate is set, `waitable-set.wait` can return the
`task-cancelled` event to indicate that the caller requested [cancellation] of
the [current task]. If `async` is not set, `task-cancelled` will never be
delivered.

In the Canonical ABI, the `event-code` return value provides the `event`
discriminant and the case payloads are stored as two contiguous `i32`s at the
8-byte-aligned address `payload-addr`. (See also [`canon_waitable_set_wait`]
Expand Down
Loading