Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 47 additions & 74 deletions AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Threading;
using System.Threading.Tasks;

namespace System.Reactive
Expand All @@ -27,10 +28,11 @@ public async ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observ
private sealed class AutoDetachAsyncObserver : AsyncObserverBase<T>, IAsyncDisposable
{
private readonly IAsyncObserver<T> _observer;
private TaskCompletionSource<object> _pendingOnSomethingCallsTcs;
private readonly AsyncLocal<bool> _reentrancyFlag = new(); // If any On* method, calls OnDisposeAsync, this will be true
private readonly object _gate = new();

private IAsyncDisposable _subscription;
private ValueTask _task;
private bool _disposing;

public AutoDetachAsyncObserver(IAsyncObserver<T> observer)
Expand Down Expand Up @@ -62,74 +64,25 @@ public async ValueTask AssignAsync(IAsyncDisposable subscription)

public async ValueTask DisposeAsync()
{
ValueTask task;
var subscription = default(IAsyncDisposable);

lock (_gate)
{
//
// NB: The postcondition of awaiting the first DisposeAsync call to complete is that all message
// processing has ceased, i.e. no further On*AsyncCore calls will be made. This is achieved
// here by setting _disposing to true, which is checked by the On*AsyncCore calls upon
// entry, and by awaiting the task of any in-flight On*AsyncCore calls.
//
// Timing of the disposal of the subscription is less deterministic due to the intersection
// with the AssignAsync code path. However, the auto-detach observer can only be returned
// from the SubscribeAsync call *after* a call to AssignAsync has been made and awaited, so
// either AssignAsync triggers the disposal and an already disposed instance is returned, or
// the user calling DisposeAsync will either encounter a busy observer which will be stopped
// in its tracks (as described above) or it will trigger a disposal of the subscription. In
// both these cases the result of awaiting DisposeAsync guarantees no further message flow.
//

if (!_disposing)
{
_disposing = true;

task = _task;
subscription = _subscription;
}
}

try
{
//
// BUGBUG: This causes grief when an outgoing On*Async call reenters the DisposeAsync method and
// results in the task returned from the On*Async call to be awaited to serialize the
// call to subscription.DisposeAsync after it's done. We need to either detect reentrancy
// and queue up the call to DisposeAsync or follow an when we trigger the disposal without
// awaiting outstanding work (thus allowing for concurrency).
//
// if (task != null)
// {
// await task.ConfigureAwait(false);
// }
//
}
finally
{
if (subscription != null)
{
await subscription.DisposeAsync().ConfigureAwait(false);
}
}
await FinishAsync().ConfigureAwait(false);
}

protected override async ValueTask OnCompletedAsyncCore()
{
ValueTask task;
lock (_gate)
{
if (_disposing)
{
return;
}

_task = _observer.OnCompletedAsync();
task = WithReentrancyFlagOn(static (@this, _) => @this._observer.OnCompletedAsync(), (object)null);
}

try
{
await _task.ConfigureAwait(false);
await task.ConfigureAwait(false);
}
finally
{
Expand All @@ -139,19 +92,20 @@ protected override async ValueTask OnCompletedAsyncCore()

protected override async ValueTask OnErrorAsyncCore(Exception error)
{
ValueTask task;
lock (_gate)
{
if (_disposing)
{
return;
}

_task = _observer.OnErrorAsync(error);
task = WithReentrancyFlagOn(static (@this, error) => @this._observer.OnErrorAsync(error), error);
}

try
{
await _task.ConfigureAwait(false);
await task.ConfigureAwait(false);
}
finally
{
Expand All @@ -161,50 +115,69 @@ protected override async ValueTask OnErrorAsyncCore(Exception error)

protected override async ValueTask OnNextAsyncCore(T value)
{
ValueTask task;
lock (_gate)
{
if (_disposing)
{
return;
}

_task = _observer.OnNextAsync(value);
task = WithReentrancyFlagOn(static (@this, value) => @this._observer.OnNextAsync(value), value);
}

try
{
await _task.ConfigureAwait(false);
}
finally
{
lock (_gate)
{
_task = default;
}
}
await task.ConfigureAwait(false);
}

private async ValueTask FinishAsync()
{
var subscription = default(IAsyncDisposable);
// On synchronous Rx, if Dispose is called while we're in the middle of an OnNext/OnError/OnCompleted,
// we immediately execute the Dispose() method.
// So it's possible that the On* method finishes after the Dispose() method has completed.
// What it's impossible is that another On* method STARTS AFTER Dispose() has completed.

Task onSomethingCall;
IAsyncDisposable subscription;

lock (_gate)
{
if (!_disposing)
if (_disposing)
{
_disposing = true;

subscription = _subscription;
return;
}

_task = default;
_disposing = true;
subscription = _subscription;
onSomethingCall = _reentrancyFlag.Value ? null : _pendingOnSomethingCallsTcs?.Task;
}

if (onSomethingCall != null)
{
await onSomethingCall.ConfigureAwait(false);
}

if (subscription != null)
{
await subscription.DisposeAsync().ConfigureAwait(false);
}
}

private async ValueTask WithReentrancyFlagOn<TState>(Func<AutoDetachAsyncObserver, TState, ValueTask> asyncAction, TState state)
{
var runningMethod = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_pendingOnSomethingCallsTcs = runningMethod;
_reentrancyFlag.Value = true;
try
{
await asyncAction(this, state).ConfigureAwait(false);
}
finally
{
_reentrancyFlag.Value = false;
_pendingOnSomethingCallsTcs = null;
runningMethod.SetResult(null!);
}
}
}
}
}
Loading