I am working to build a producer..consumer pattern using .NET reactive. Producer reads messages from Kafka message bus. Once the message is read, it needs to be handed over to the consumer to process the message.
I was able to do that using .NET reactive. However, I noticed that consumer is processing the message on the same thread as producer. Please see the code below. Objective is to: Have a single producer thread that reads messages from the bus. And, then hand it over to consumers on a separate thread to process the messages. The code I have is:
// Producer Code
private Subject<LGMessage> _onMessageSubject = new Subject<LGMessage>();
private IObserver<LGMessage> messageBusObserver;
protected IObservable<LGMessage> _onMessageObservable
{
get
{
return _onMessageSubject.AsObservable();
}
}
public void AddObserver(IObserver<LGMessage> observer)
{
_onMessageObservable.ObserveOn(NewThreadScheduler.Default).Subscribe(observer);
}
// Read is called when the message is read from the bus
public bool Read(Message<string, string> msg)
{
// add the message to the observable
_onMessageSubject.OnNext(msg.Value);
}
// Consumer Code
public virtual void OnNext(string value)
{
Console.WriteLine("Thread {0} Consuming",
Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("{1}: Message I got from bus: {0}", value.Key,
this.Name);
// Take Action
}