I have a routine which reads data from a db and publishes that data via an IObservable.
After the data has been published, I want to update all those rows which have just been published to stop them being published again.
I am not sure of the "reactive" way of doing this. (Each time I revisit Rx I seem to have this problem!)
I think I need to do 2 things
1) Cache the data as it is published since it'll have the IDs I need to update subsequently - I had wondered whether to use a subject to cache the data being published or whether to have some other routine which wraps what I currently have, subscribing to it, caching it and then republishing
2) Update the data after it has been published. I am really not sure how to build that into the pipeline at all!
I've built this up from various things I've found on line (esp. Lee Campbell for the db polling - thx Lee!) but the other bits I've added on are mine and I may have got it badly wrong. I'm open to suggestions if some parts would be better implemented non-reactively. For example, I've made the db update routine observable, but I don't know it that is really necessary - or whether it is just easier to include it in the pipeline if it is implemented that way..
Here's the relevant bits of code ...
private IObservable<INotification> Poller() =>
Observable
.Timer(_pollingPeriod, _scheduler)
.SelectMany(_ => NewNotifications(_cx))
.Timeout(_pollingPeriod + _queryTimeout, Observable.Return(TimeOut.Notification()), _scheduler)
.Catch<INotification, Exception>(err => Observable.Return(Error.Notification(err)))
.Repeat();
private IObservable<INotification> NewNotifications(string cx)
{
try
{
return SqlRead<INotification>(cx, NewNotificationsSql(),sdr => EventBuilder(sdr), Empty.Notification());
}
catch (Exception ex)
{
throw ex;
}
}
internal static IObservable<T> SqlRead<T>(string cx, string sql, Func<SqlDataReader, T> mapper, T noRows) =>
Observable.Create<T>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (!rdr.HasRows)
{
o.OnNext(noRows);
}
else
{
while (rdr.Read())
{
o.OnNext(mapper(rdr));
}
}
}
}
}
o.OnCompleted();
return Disposable.Empty;
});
internal static IObservable<int> SqlWrite(string cx, string sql) =>
Observable.Create<int>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
o.OnNext(cmd.ExecuteNonQuery());
}
}
o.OnCompleted();
return Disposable.Empty;
});