diff --git a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs index 08d0a62a14..f75c896951 100644 --- a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. +using System.Threading; using System.Threading.Tasks; namespace System.Reactive @@ -27,10 +28,11 @@ public async ValueTask SubscribeAsync(IAsyncObserver observ private sealed class AutoDetachAsyncObserver : AsyncObserverBase, IAsyncDisposable { private readonly IAsyncObserver _observer; + private TaskCompletionSource _pendingOnSomethingCallsTcs; + private readonly AsyncLocal _reentrancyFlag = new(); // If any On* method, calls OnDisposeAsync, this will be true private readonly object _gate = new(); private IAsyncDisposable _subscription; - private ValueTask _task; private bool _disposing; public AutoDetachAsyncObserver(IAsyncObserver observer) @@ -62,61 +64,12 @@ public async ValueTask AssignAsync(IAsyncDisposable subscription) public async ValueTask DisposeAsync() { - ValueTask task; - var subscription = default(IAsyncDisposable); - - lock (_gate) - { - // - // NB: The postcondition of awaiting the first DisposeAsync call to complete is that all message - // processing has ceased, i.e. no further On*AsyncCore calls will be made. This is achieved - // here by setting _disposing to true, which is checked by the On*AsyncCore calls upon - // entry, and by awaiting the task of any in-flight On*AsyncCore calls. - // - // Timing of the disposal of the subscription is less deterministic due to the intersection - // with the AssignAsync code path. However, the auto-detach observer can only be returned - // from the SubscribeAsync call *after* a call to AssignAsync has been made and awaited, so - // either AssignAsync triggers the disposal and an already disposed instance is returned, or - // the user calling DisposeAsync will either encounter a busy observer which will be stopped - // in its tracks (as described above) or it will trigger a disposal of the subscription. In - // both these cases the result of awaiting DisposeAsync guarantees no further message flow. - // - - if (!_disposing) - { - _disposing = true; - - task = _task; - subscription = _subscription; - } - } - - try - { - // - // BUGBUG: This causes grief when an outgoing On*Async call reenters the DisposeAsync method and - // results in the task returned from the On*Async call to be awaited to serialize the - // call to subscription.DisposeAsync after it's done. We need to either detect reentrancy - // and queue up the call to DisposeAsync or follow an when we trigger the disposal without - // awaiting outstanding work (thus allowing for concurrency). - // - // if (task != null) - // { - // await task.ConfigureAwait(false); - // } - // - } - finally - { - if (subscription != null) - { - await subscription.DisposeAsync().ConfigureAwait(false); - } - } + await FinishAsync().ConfigureAwait(false); } protected override async ValueTask OnCompletedAsyncCore() { + ValueTask task; lock (_gate) { if (_disposing) @@ -124,12 +77,12 @@ protected override async ValueTask OnCompletedAsyncCore() return; } - _task = _observer.OnCompletedAsync(); + task = WithReentrancyFlagOn(static (@this, _) => @this._observer.OnCompletedAsync(), (object)null); } try { - await _task.ConfigureAwait(false); + await task.ConfigureAwait(false); } finally { @@ -139,6 +92,7 @@ protected override async ValueTask OnCompletedAsyncCore() protected override async ValueTask OnErrorAsyncCore(Exception error) { + ValueTask task; lock (_gate) { if (_disposing) @@ -146,12 +100,12 @@ protected override async ValueTask OnErrorAsyncCore(Exception error) return; } - _task = _observer.OnErrorAsync(error); + task = WithReentrancyFlagOn(static (@this, error) => @this._observer.OnErrorAsync(error), error); } try { - await _task.ConfigureAwait(false); + await task.ConfigureAwait(false); } finally { @@ -161,6 +115,7 @@ protected override async ValueTask OnErrorAsyncCore(Exception error) protected override async ValueTask OnNextAsyncCore(T value) { + ValueTask task; lock (_gate) { if (_disposing) @@ -168,36 +123,37 @@ protected override async ValueTask OnNextAsyncCore(T value) return; } - _task = _observer.OnNextAsync(value); + task = WithReentrancyFlagOn(static (@this, value) => @this._observer.OnNextAsync(value), value); } - try - { - await _task.ConfigureAwait(false); - } - finally - { - lock (_gate) - { - _task = default; - } - } + await task.ConfigureAwait(false); } private async ValueTask FinishAsync() { - var subscription = default(IAsyncDisposable); + // On synchronous Rx, if Dispose is called while we're in the middle of an OnNext/OnError/OnCompleted, + // we immediately execute the Dispose() method. + // So it's possible that the On* method finishes after the Dispose() method has completed. + // What it's impossible is that another On* method STARTS AFTER Dispose() has completed. + + Task onSomethingCall; + IAsyncDisposable subscription; lock (_gate) { - if (!_disposing) + if (_disposing) { - _disposing = true; - - subscription = _subscription; + return; } - _task = default; + _disposing = true; + subscription = _subscription; + onSomethingCall = _reentrancyFlag.Value ? null : _pendingOnSomethingCallsTcs?.Task; + } + + if (onSomethingCall != null) + { + await onSomethingCall.ConfigureAwait(false); } if (subscription != null) @@ -205,6 +161,23 @@ private async ValueTask FinishAsync() await subscription.DisposeAsync().ConfigureAwait(false); } } + + private async ValueTask WithReentrancyFlagOn(Func asyncAction, TState state) + { + var runningMethod = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _pendingOnSomethingCallsTcs = runningMethod; + _reentrancyFlag.Value = true; + try + { + await asyncAction(this, state).ConfigureAwait(false); + } + finally + { + _reentrancyFlag.Value = false; + _pendingOnSomethingCallsTcs = null; + runningMethod.SetResult(null!); + } + } } } }