I'm currently playing with ReactiveUI (have some experience with Rx prior). What I'm trying to do, is handle some sort of a fire/forget/notify workflow.
Basically, I want to perform and action, and then be notified when it succeeds or fails. However, I don't want to wait for that action to complete before doing the next one, so I've implement the following snippet of code :
private ReactiveList<VerifiableString> _inputData = new ReactiveList<VerifiableString>();
private ReactiveList<VerifiableString> _savedData = new ReactiveList<VerifiableString>();
private Subject<VerifiableString> _stringSubject = new Subject<VerifiableString>();
private ReactiveCommand<Unit> _addCommand;
public MainViewModel()
{
for (int i = 0; i < 10; i++)
{
_inputData.Add(new VerifiableString{Value = Guid.NewGuid().ToString()});
}
var canExecute = this.WhenAny(x => x.InputData.Count, x => x.Value != 0);
AddCommand = ReactiveCommand.CreateAsyncTask(canExecute, x => SendStringForProcessingAsync());
AddCommand.ThrownExceptions.Subscribe(ex => MessageBox.Show(ex.ToString()));
_stringSubject.ObserveOn(RxApp.MainThreadScheduler).Subscribe(AddString, e => MessageBox.Show(e.Message));
}
private async Task<Unit> SendStringForProcessingAsync()
{
var item = InputData.First();
InputData.RemoveAt(0);
//intentionally not awaiting on this
PostNewItemAsync(item);
return new Unit();
}
private async Task PostNewItemAsync(VerifiableString item)
{
_stringSubject.OnNext(item);
await Task.Delay(1000);
item.Verified = true;
_stringSubject.OnNext(item);
}
This code works as I expect it. I can invoke the command as many times as I like, get instant notification that the command was invoked, and then 1s later, notification that the command completed.
I have a feeling though, that by using the ReactiveCommand and Subject, I may be missing the point of ReactiveUI ? Also, by using the subject, I don't get that lovely ThrownError observable that you get with ReactiveCommands directly.
For context, the UI contains two lists and a button, clicking the button moves a string from one list to another, and a second later, that string is updated with a "Verified" flag.
EDIT 20141023
So now I have this:
{
//...
AddCommand
.SelectMany(_ => Observable.FromAsync(SendStringForProcessingAsync))
.Catch(Observable.Return(new VerifiableString{Value = "What the hell !"}))
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(AddString);
AddCommand.ThrownExceptions.Subscribe(ex => Debug.WriteLine(ex));
//...
}
private async Task<VerifiableString> PostNewItemAsync(VerifiableString param, CancellationToken cancellationToken)
{
await Task.Delay(_random.Next(1000, 5000), cancellationToken);
param.Verified = VerifyState.Verified;
return param;
}
private async Task<VerifiableString> SendStringForProcessingAsync(CancellationToken t)
{
var item = InputData.First();
InputData.RemoveAt(0);
AddString(item);
return await PostNewItemAsync(item, t);
}
If I thrown an exception in "SendStringForProcessingAsync", my "error message" appears in my list (though nothing appears in the Debug logs). However, at this point, I can no longer continue executing commands.
Also, I used Observable.FromAsync so I can pass in the cancellation token, and cancel items in flight. I can't for the life of me though, figure out how to access the CancellationTokenSource so I can cancel these things ...
Have I missed something obvious ?