
I'm currently playing with ReactiveUI (have some experience with Rx prior). What I'm trying to do, is handle some sort of a fire/forget/notify workflow.

Basically, I want to perform and action, and then be notified when it succeeds or fails. However, I don't want to wait for that action to complete before doing the next one, so I've implement the following snippet of code :

    private ReactiveList<VerifiableString> _inputData = new ReactiveList<VerifiableString>();
    private ReactiveList<VerifiableString> _savedData = new ReactiveList<VerifiableString>();

    private Subject<VerifiableString> _stringSubject = new Subject<VerifiableString>();

    private ReactiveCommand<Unit> _addCommand;

    public MainViewModel()
        for (int i = 0; i < 10; i++)
            _inputData.Add(new VerifiableString{Value = Guid.NewGuid().ToString()});

        var canExecute = this.WhenAny(x => x.InputData.Count, x => x.Value != 0);

        AddCommand = ReactiveCommand.CreateAsyncTask(canExecute, x => SendStringForProcessingAsync());
        AddCommand.ThrownExceptions.Subscribe(ex => MessageBox.Show(ex.ToString()));

        _stringSubject.ObserveOn(RxApp.MainThreadScheduler).Subscribe(AddString, e => MessageBox.Show(e.Message));

    private async Task<Unit> SendStringForProcessingAsync()
        var item = InputData.First();
        //intentionally not awaiting on this
        return new Unit();

    private async Task PostNewItemAsync(VerifiableString item)
        await Task.Delay(1000);
        item.Verified = true;

This code works as I expect it. I can invoke the command as many times as I like, get instant notification that the command was invoked, and then 1s later, notification that the command completed.

I have a feeling though, that by using the ReactiveCommand and Subject, I may be missing the point of ReactiveUI ? Also, by using the subject, I don't get that lovely ThrownError observable that you get with ReactiveCommands directly.

For context, the UI contains two lists and a button, clicking the button moves a string from one list to another, and a second later, that string is updated with a "Verified" flag.

EDIT 20141023

So now I have this:

       .SelectMany(_ => Observable.FromAsync(SendStringForProcessingAsync))
       .Catch(Observable.Return(new VerifiableString{Value = "What the hell !"}))

    AddCommand.ThrownExceptions.Subscribe(ex => Debug.WriteLine(ex));

private async Task<VerifiableString> PostNewItemAsync(VerifiableString param, CancellationToken cancellationToken)
    await Task.Delay(_random.Next(1000, 5000), cancellationToken);
    param.Verified = VerifyState.Verified;
    return param;

private async Task<VerifiableString> SendStringForProcessingAsync(CancellationToken t)
    var item = InputData.First();
    return await PostNewItemAsync(item, t);

If I thrown an exception in "SendStringForProcessingAsync", my "error message" appears in my list (though nothing appears in the Debug logs). However, at this point, I can no longer continue executing commands.

Also, I used Observable.FromAsync so I can pass in the cancellation token, and cancel items in flight. I can't for the life of me though, figure out how to access the CancellationTokenSource so I can cancel these things ...

Have I missed something obvious ?


1 Answers


If you want to opt-out of RxCmd's single-item'ing, you do something like this (proper generics left out because coding-via-TextArea):

AddCommand = ReactiveCommand.Create();

    .SelectMany(_ => DoSomethingAsync()
        .Catch(ex => { log.write("Crap."); return Observable.Empty(); }))
    .Subscribe(x => log.write("It worked!");