Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5605,6 +5605,7 @@ namespace Akka.Util
public bool IsSuccess { get; }
public Akka.Util.Option<T> Success { get; }
public static Akka.Util.Try<T> From(System.Func<T> func) { }
public static Akka.Util.Try<T> FromTask(System.Threading.Tasks.Task<T> task) { }
public T Get() { }
public Akka.Util.Try<T> GetOrElse(System.Func<T> fallback) { }
public Akka.Util.Try<T> OrElse(Akka.Util.Try<T> @default) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5593,6 +5593,7 @@ namespace Akka.Util
public bool IsSuccess { get; }
public Akka.Util.Option<T> Success { get; }
public static Akka.Util.Try<T> From(System.Func<T> func) { }
public static Akka.Util.Try<T> FromTask(System.Threading.Tasks.Task<T> task) { }
public T Get() { }
public Akka.Util.Try<T> GetOrElse(System.Func<T> fallback) { }
public Akka.Util.Try<T> OrElse(Akka.Util.Try<T> @default) { }
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ await this.AssertAllStagesStoppedAsync(async() => {
sub.Request(10);

var exception = await c.ExpectErrorAsync();
exception.InnerException!.Message.Should().Be("err1");
exception.Message.Should().Be("err1");
}, Materializer).ShouldCompleteWithin(RemainingOrDefault);
}

Expand Down
34 changes: 17 additions & 17 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2545,35 +2545,35 @@ public sealed class SelectAsync<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>>

private sealed class Logic : InAndOutGraphStageLogic
{
private sealed class Holder<T>(object? message, Result<T> element)
private sealed class Holder<T>(object? message, Try<T> element)
{
public object? Message { get; } = message;

public Result<T> Element { get; private set; } = element;
public Try<T> Element { get; private set; } = element;

public void SetElement(Result<T> result)
public void SetElement(Try<T> result)
{
Element = result.IsSuccess && result.Value == null
? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
Element = result is { IsSuccess: true, Success.Value: null }
? new Try<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
: result;
}
}

private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception());
private static readonly Try<TOut> NotYetThere = new (new Exception());

private readonly SelectAsync<TIn, TOut> _stage;
private readonly Decider _decider;

private IBuffer<Holder<TOut>>? _buffer;
private readonly Action<(Holder<TOut> holder, Result<TOut> result)> _taskCallback;
private readonly Action<(Holder<TOut> holder, Try<TOut> result)> _taskCallback;

public Logic(Attributes inheritedAttributes, SelectAsync<TIn, TOut> stage) : base(stage.Shape)
{
_stage = stage;
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>();
_decider = attr != null ? attr.Decider : Deciders.StoppingDecider;

_taskCallback = GetAsyncCallback<(Holder<TOut> holder, Result<TOut> result)>(t => HolderCompleted(t.holder, t.result));
_taskCallback = GetAsyncCallback<(Holder<TOut> holder, Try<TOut> result)>(t => HolderCompleted(t.holder, t.result));

SetHandlers(stage.In, stage.Out, this);
}
Expand All @@ -2593,19 +2593,19 @@ public override void OnPush()
// scheduling it to an execution context
if (task.IsCompleted)
{
HolderCompleted(holder, Result.FromTask(task));
HolderCompleted(holder, Try<TOut>.FromTask(task));
}
else
{
async Task WaitForTask()
{
try
{
var result = Result.Success(await task);
var result = new Try<TOut>(await task);
_taskCallback((holder, result));
}
catch(Exception ex){
var result = Result.Failure<TOut>(ex);
var result = new Try<TOut>(ex);
_taskCallback((holder, result));
}
}
Expand Down Expand Up @@ -2673,24 +2673,24 @@ private void PushOne()
{
// this could happen if we are looping in PushOne and end up on a failed Task before the
// HolderCompleted callback has run
var strategy = _decider(result.Exception);
var strategy = _decider(result.Failure.Value);
switch (strategy)
{
case Directive.Stop:
FailStage(result.Exception);
FailStage(result.Failure.Value);
return;

case Directive.Resume:
case Directive.Restart:
break;

default:
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", result.Failure.Value);
}
continue;
}

Push(_stage.Out!, result.Value);
Push(_stage.Out, result.Success.Value);
PullIfNeeded();
}

Expand All @@ -2707,7 +2707,7 @@ private void PullIfNeeded()
// else already pulled and waiting for next element
}

private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)
private void HolderCompleted(Holder<TOut> holder, Try<TOut> result)
{
// we may not be at the front of the line right now, so save the result for later
holder.SetElement(result);
Expand All @@ -2718,7 +2718,7 @@ private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)
return;
}

var exception = result.Exception;
var exception = result.Failure.Value;
var strategy = _decider(exception);
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
switch (strategy)
Expand Down
18 changes: 17 additions & 1 deletion src/core/Akka/Util/Try.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;

namespace Akka.Util
{
Expand All @@ -21,6 +22,7 @@ public class Try<T>
public Try(T success)
{
Success = success;
IsSuccess = true;
}

/// <summary>
Expand All @@ -39,7 +41,7 @@ public static implicit operator Try<T>(T value)
/// <summary>
/// Shows if this is Success
/// </summary>
public bool IsSuccess => Success.HasValue;
public bool IsSuccess { get; }

/// <summary>
/// If set, contains successfull execution result
Expand Down Expand Up @@ -106,6 +108,20 @@ public static Try<T> From(Func<T> func)
}
}

public static Try<T> FromTask(Task<T> task)
{
if(!task.IsCompleted)
throw new ArgumentException("Task is not completed. Try.FromTask only accepts completed tasks.", nameof(task));
try
{
return new Try<T>(task.GetAwaiter().GetResult());
}
catch (Exception ex)
{
return new Try<T>(ex);
}
}

/// <summary>
/// Returns this Try if it's a Success or the given default argument if this is a Failure.
/// </summary>
Expand Down
Loading