问题描述:

I have stream of independent events, that are handled asynchronously with Reactive extension. The handler may fail for whatever reasons, but the stream continues on.

However, in Rx, right after an error occurs, it automatically unsubscribes. Is this somehow configurable?

Example:

async Task<Unit> ActionAsync(int i)

{

if (i > 1)

throw new Exception();

i.Dump();

return Unit.Default;

}

void Main()

{

var sb = new Subject<int>();

sb.SelectMany(ActionAsync).Subscribe(

_ => { },

ex =>

{

ex.Dump();

}

);

sb.OnNext(1);

sb.OnNext(2);

sb.OnNext(3);

}

I'd like to have following output:

  • 1
  • Exception
  • 3

Can I achieve this without try/catching in the ActionAsync?

网友答案:

There is a contract of behaviour in Rx where a stream can only be OnNext*(OnError|OnCompleted). In other words zero or more OnNext and only one of either OnError or OnCompleted at the end.

So, no you can't configure Rx. It would no longer be Rx if you did.

What you can do, however, is write a query that can retry the source.

If you write your code like this:

async Task<int> ActionAsync(int i)
{
    if (i == 2)
        throw new Exception();

    return i;
}

void Main()
{
    var sb = new Subject<int>();

    sb
        .SelectMany(ActionAsync)
        .Do(_ => { }, ex => ex.Dump())
        .Retry()
        .Subscribe(_ => _.Dump());

    sb.OnNext(1);
    sb.OnNext(2);
    sb.OnNext(3);
}

Then you do get:

1
Exception of type 'System.Exception' was thrown. 
3

As per your comment asking about performance issues, there aren't any performance issues in using .Retry(), but there is a behavioural issue.

If the source were cold - like var sb = new [] { 1, 2, 3 }.ToObservable(); - then the .Retry() would start with the entire observable sequence again and result in an infinite sequence of:

1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
...

In your code's case the observable is a hot observable so this doesn't happen.

If you wish to do this on a cold observable you would need to make it hot via .Publish(...). Like this:

var sb = new[] { 1, 2, 3 }.ToObservable();

sb
    .Publish(sbp =>
        sbp
            .SelectMany(ActionAsync)
            .Do(_ => { }, ex => ex.Dump())
            .Retry())
    .Subscribe(_ => _.Dump());

Then the expected behaviour returns.

网友答案:

Using Materialize

    async Task<Unit> ActionAsync(int i)
    {
        if (i > 1)
            throw new Exception();

        i.Dump();
        return Unit.Default;
    }

    void Main()
    {
        var sb = new Subject<int>();
        sb.SelectMany(i => Observable.FromAsync(() => ActionAsync(i)).Materialize())
            .Subscribe(item =>
            {
                if (item.Kind == NotificationKind.OnError)
                {
                    item.Exception.Dump();
                }
                //else if (item.Kind == NotificationKind.OnNext)
                //{
                //    var value = item.Value;
                //}
                //else if (item.Kind == NotificationKind.OnCompleted)
                //{
                //}
            }
        );
        sb.OnNext(1);
        sb.OnNext(2);
        sb.OnNext(3);
    }

Note, if the core logic stays the same, you will never get output "3" since you have exceptions for (i > 1) which includes 3. You probably want to change (i > 1) to (i == 2) to get the output specified in the question.

相关阅读:
Top