0
votes

I've been using .NET Reactive Extensions to observe log events as they come in. I'm currently using a class that derives from IObservable and uses a ReplaySubject to store the logs, that way I can filter and replay the logs (for example: Show me all the Error logs, or show me all the Verbose logs) without losing the logs I've buffered.

The problem is, even though I've set a buffer size on the subject:

this.subject = new ReplaySubject<LogEvent>(10);

The memory usage of my program goes through the roof when I use OnNext to add to the observable collection on an infinite loop:

internal void WatchForNewEvents()
        {
            Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        dynamic parameters = new ExpandoObject();
                        // TODO: Add parameters for getting specific log events

                        if (this.logEventRepository.GetManyHasNewResults(parameters))
                        {
                            foreach (var recentEvent in this.logEventRepository.GetMany(parameters))
                            {
                                this.subject.OnNext(recentEvent);
                            }
                        }

                        // Commented this out for now to really see the memory go up 
                        // Thread.Sleep(1000); 
                    }
                });
        }

Does the buffer size on ReplaySubject not work? It doesn't seem to be clearing the buffer when the buffer size is reached. Any help much appreciated!

UPDATE:

I add subscribers like this (Is this wrong?):

public IDisposable Subscribe(IObserver<LogEvent> observer)
        {
            return this.subject.Subscribe(observer);
        }

...which is called like:

// Inserts into UI ListView
    this.logEventObservable.Subscribe(evt => this.InsertNewLogEvent(evt));
1
What happens if you add a subscriber to the ReplaySubject? I think it shouldn't leak like this, but I'm still curious to see what happens - Ana Betts
Thanks @Paul, I've added more info about how I add subscribers. - Nick Ramirez
I have isolated the problem, although I don't know the solution yet. Subscribing to the subject BEFORE calling OnNext in a loop prevents the memory leak. Subscribing AFTER causes it. - Nick Ramirez

1 Answers

0
votes

I'm not sure if this is the definitive answer, but I suspect that you're hitting an issue because of concurrency around the scheduler you're using. The constructor you're calling on ReplaySubject looks like this:

public ReplaySubject(int bufferSize)
    : this(bufferSize, TimeSpan.MaxValue, Scheduler.CurrentThread)
{ }

The Scheduler.CurrentThread worries me. Try changing it to Scheduler.ThreadPool and see if that helps.

Also, as a side note, you seem to be mixing Rx with TPL and old fashioned thread sleeping. It's usually best to avoid doing that. You could change your WatchForNewEvents code to look like this:

dynamic parameters = new ExpandoObject();

var newEvents =
    from n in Observable.Interval(TimeSpan.FromSeconds(1.0))
    where this.logEventRepository.GetManyHasNewResults(parameters)
    from recentEvent in
        this.logEventRepository.GetMany(parameters).ToObservable()
    select recentEvent;

newEvents.Subscribe(this.subject);

That's a nice compact Rx-y way of doing things.