2
votes

I have a routine which reads data from a db and publishes that data via an IObservable.

After the data has been published, I want to update all those rows which have just been published to stop them being published again.

I am not sure of the "reactive" way of doing this. (Each time I revisit Rx I seem to have this problem!)

I think I need to do 2 things

1) Cache the data as it is published since it'll have the IDs I need to update subsequently - I had wondered whether to use a subject to cache the data being published or whether to have some other routine which wraps what I currently have, subscribing to it, caching it and then republishing

2) Update the data after it has been published. I am really not sure how to build that into the pipeline at all!

I've built this up from various things I've found on line (esp. Lee Campbell for the db polling - thx Lee!) but the other bits I've added on are mine and I may have got it badly wrong. I'm open to suggestions if some parts would be better implemented non-reactively. For example, I've made the db update routine observable, but I don't know it that is really necessary - or whether it is just easier to include it in the pipeline if it is implemented that way..

Here's the relevant bits of code ...

private IObservable<INotification> Poller() =>
    Observable
        .Timer(_pollingPeriod, _scheduler)
        .SelectMany(_ => NewNotifications(_cx))                
        .Timeout(_pollingPeriod + _queryTimeout, Observable.Return(TimeOut.Notification()), _scheduler) 
        .Catch<INotification, Exception>(err => Observable.Return(Error.Notification(err))) 
        .Repeat();  

private IObservable<INotification> NewNotifications(string cx)
{
    try
    {                
        return SqlRead<INotification>(cx, NewNotificationsSql(),sdr => EventBuilder(sdr), Empty.Notification());
    }
    catch (Exception ex)
    {
        throw ex;
    }
}

internal static IObservable<T> SqlRead<T>(string cx, string sql, Func<SqlDataReader, T> mapper, T noRows) =>
    Observable.Create<T>(o =>
    {                
        using (var conn = new SqlConnection(cx))
        {
            conn.Open();
            using (var cmd = new SqlCommand(sql, conn))
            {                        
                using (var rdr = cmd.ExecuteReader())
                {
                    if (!rdr.HasRows)
                    {
                        o.OnNext(noRows);
                    }
                    else
                    {
                        while (rdr.Read())
                        {             
                            o.OnNext(mapper(rdr));                                    
                        }
                    }
                }
            }
        }
        o.OnCompleted();                
        return Disposable.Empty;
    });

internal static IObservable<int> SqlWrite(string cx, string sql) =>
    Observable.Create<int>(o =>
    {
        using (var conn = new SqlConnection(cx))
        {
            conn.Open();
            using (var cmd = new SqlCommand(sql, conn))
            {
                o.OnNext(cmd.ExecuteNonQuery());
            }                    
        }
        o.OnCompleted();
        return Disposable.Empty;
    });
1

1 Answers

2
votes

Lets assume you have a UI with list of current notifications

public class NotificationListViewModel
{
    ObservableCollection<INotification> Items {get;}
}

To maintain this collection you need to know how notifications are changing. Lets have a class to show changes

enum ChangeType
{
   Add,
   Remove,
   Update
}

class Change<T>
{
    ChangeType Type {get;}
    T Value {get;}
}

Now you can expose notifications via changes

INotificationProvider
{
   public IObservable<Change<INotification>> Notifications {get;}
}

Lets add method Update in INotification for convinience

public class NotificationListViewModel
{
    public NotificationListViewModel(INotificationProvider provider)
    {
        provider.Notifications.Subscribe(change => 
        {
          if(change.Type == ChangeType.Add)
          {
            Items.Add(change.Value);
          }
          if(change.Type == ChangeType.Update)
          {
            Items.First(x => x.Id = change.Value.Id).Update(change.Value);
          }
          if(change.Type == ChangeType.Remove)
          {
            Items.Remove(change.Value);
          }
        });
    }
}

And to understand the type of change you receive, you need to maintain the list of existing notifications where you read them from database.

All this code i wrote to show the example of "how can you think about this things". There is a great library DynamicData where all this ideas are implemented in convinient and optimised way.