diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index afc6e038e32..0c3251c978c 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -5605,6 +5605,7 @@ namespace Akka.Util public bool IsSuccess { get; } public Akka.Util.Option Success { get; } public static Akka.Util.Try From(System.Func func) { } + public static Akka.Util.Try FromTask(System.Threading.Tasks.Task task) { } public T Get() { } public Akka.Util.Try GetOrElse(System.Func fallback) { } public Akka.Util.Try OrElse(Akka.Util.Try @default) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 895d2bd5d76..cb56fe1c7c5 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -5593,6 +5593,7 @@ namespace Akka.Util public bool IsSuccess { get; } public Akka.Util.Option Success { get; } public static Akka.Util.Try From(System.Func func) { } + public static Akka.Util.Try FromTask(System.Threading.Tasks.Task task) { } public T Get() { } public Akka.Util.Try GetOrElse(System.Func fallback) { } public Akka.Util.Try OrElse(Akka.Util.Try @default) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs index 4bd48e3ea33..f54419e8992 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs @@ -150,7 +150,7 @@ await this.AssertAllStagesStoppedAsync(async() => { sub.Request(10); var exception = await c.ExpectErrorAsync(); - exception.InnerException!.Message.Should().Be("err1"); + exception.Message.Should().Be("err1"); }, Materializer).ShouldCompleteWithin(RemainingOrDefault); } diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index c9820798b8d..085841f28de 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -2545,27 +2545,27 @@ public sealed class SelectAsync : GraphStage> private sealed class Logic : InAndOutGraphStageLogic { - private sealed class Holder(object? message, Result element) + private sealed class Holder(object? message, Try element) { public object? Message { get; } = message; - public Result Element { get; private set; } = element; + public Try Element { get; private set; } = element; - public void SetElement(Result result) + public void SetElement(Try result) { - Element = result.IsSuccess && result.Value == null - ? Result.Failure(ReactiveStreamsCompliance.ElementMustNotBeNullException) + Element = result is { IsSuccess: true, Success.Value: null } + ? new Try(ReactiveStreamsCompliance.ElementMustNotBeNullException) : result; } } - private static readonly Result NotYetThere = Result.Failure(new Exception()); + private static readonly Try NotYetThere = new (new Exception()); private readonly SelectAsync _stage; private readonly Decider _decider; private IBuffer>? _buffer; - private readonly Action<(Holder holder, Result result)> _taskCallback; + private readonly Action<(Holder holder, Try result)> _taskCallback; public Logic(Attributes inheritedAttributes, SelectAsync stage) : base(stage.Shape) { @@ -2573,7 +2573,7 @@ public Logic(Attributes inheritedAttributes, SelectAsync stage) : bas var attr = inheritedAttributes.GetAttribute(); _decider = attr != null ? attr.Decider : Deciders.StoppingDecider; - _taskCallback = GetAsyncCallback<(Holder holder, Result result)>(t => HolderCompleted(t.holder, t.result)); + _taskCallback = GetAsyncCallback<(Holder holder, Try result)>(t => HolderCompleted(t.holder, t.result)); SetHandlers(stage.In, stage.Out, this); } @@ -2593,7 +2593,7 @@ public override void OnPush() // scheduling it to an execution context if (task.IsCompleted) { - HolderCompleted(holder, Result.FromTask(task)); + HolderCompleted(holder, Try.FromTask(task)); } else { @@ -2601,11 +2601,11 @@ async Task WaitForTask() { try { - var result = Result.Success(await task); + var result = new Try(await task); _taskCallback((holder, result)); } catch(Exception ex){ - var result = Result.Failure(ex); + var result = new Try(ex); _taskCallback((holder, result)); } } @@ -2673,11 +2673,11 @@ private void PushOne() { // this could happen if we are looping in PushOne and end up on a failed Task before the // HolderCompleted callback has run - var strategy = _decider(result.Exception); + var strategy = _decider(result.Failure.Value); switch (strategy) { case Directive.Stop: - FailStage(result.Exception); + FailStage(result.Failure.Value); return; case Directive.Resume: @@ -2685,12 +2685,12 @@ private void PushOne() break; default: - throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception); + throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", result.Failure.Value); } continue; } - Push(_stage.Out!, result.Value); + Push(_stage.Out, result.Success.Value); PullIfNeeded(); } @@ -2707,7 +2707,7 @@ private void PullIfNeeded() // else already pulled and waiting for next element } - private void HolderCompleted(Holder holder, Result result) + private void HolderCompleted(Holder holder, Try result) { // we may not be at the front of the line right now, so save the result for later holder.SetElement(result); @@ -2718,7 +2718,7 @@ private void HolderCompleted(Holder holder, Result result) return; } - var exception = result.Exception; + var exception = result.Failure.Value; var strategy = _decider(exception); Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy); switch (strategy) diff --git a/src/core/Akka/Util/Try.cs b/src/core/Akka/Util/Try.cs index 5208683d6b3..43689fdd8aa 100644 --- a/src/core/Akka/Util/Try.cs +++ b/src/core/Akka/Util/Try.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading.Tasks; namespace Akka.Util { @@ -21,6 +22,7 @@ public class Try public Try(T success) { Success = success; + IsSuccess = true; } /// @@ -39,7 +41,7 @@ public static implicit operator Try(T value) /// /// Shows if this is Success /// - public bool IsSuccess => Success.HasValue; + public bool IsSuccess { get; } /// /// If set, contains successfull execution result @@ -106,6 +108,20 @@ public static Try From(Func func) } } + public static Try FromTask(Task task) + { + if(!task.IsCompleted) + throw new ArgumentException("Task is not completed. Try.FromTask only accepts completed tasks.", nameof(task)); + try + { + return new Try(task.GetAwaiter().GetResult()); + } + catch (Exception ex) + { + return new Try(ex); + } + } + /// /// Returns this Try if it's a Success or the given default argument if this is a Failure. ///