3
votes

I'm using twisted to get messages from internet connected sensors in order to store it to a db.
I want to check these messages without interfere these process,because I need compare every message with some base values at db, if some is matched I need trigger an alert for this, and the idea is not block any process...

My Idea is create a new process to check and alert, but I need after the first process store the message, it will send the message to the new process in order to check and alert if is required.

I'm need IPC for this, and I was thinking to use ZeroMQ, but also twisted have a approach to work with IPC, I think if I use ZeroMQ, but maybe it will be self-defeating...

What think you about my approach? Maybe I'm completely wrong at all?

Any advice are welcome.. Thanks

PD:This Process will run at a dedicated server, with a expected load of 6000 msg/hour of 1Kb each one

2
I think your question isn't right for StackOverflow. That kind of decisions depends too much on your current infrastructure, expected load, etc to be answerable in a concise and correct way.pcalcao
@pcalcao Thanks, I'm using architecture tag for this question, and I added some useful info about..Goku
Are you check Pyro? pypi.python.org/pypi/Pyro4chespinoza
Have you considered using or creating a simple load balancer that distributes your sensor messages to a cluster of machines to process? It may delay the need for a complex architecture with a message bus.Francis Avila
Is your load really only 6000 msg/hour, i.e. less than 2 msg/second? If so, this is a very low load and you don't even need to be non-blocking to handle it, let alone use complex message bus!Francis Avila

2 Answers

3
votes

All of these approaches are possible. I can only speak abstractly because I don't know the precise contours of your application.

If you already have a working application but it just isn't fast enough to handle the number of messages you throw at it, then identify the bottleneck. The two likely causes of your holdup are DB access or alert-triggering because either one of these are probably synchronous IO operations.

How you deal with this depends on your workload:

  1. If your message rate is high and constant, then you need to make sure your database can handle this rate. If your DB can't handle it, then no amount of non-blocking message passing will help you! In this order:
    1. Try tuning your database.
    2. Try putting your database on a bigger comp with more memory.
    3. Try sharding your database across multiple machines to distribute the workload. Once you know your db can handle the message rate, you can deal with other bottlenecks using other forms of parallelism.
  2. If your message rate is bursty then you can use queueing to handle the bursts. In this order:
    1. Put a load balancer in front of a cluster of message processors. All this balancer should do is redistribute sensor messages to different machines for check-and-alert processing. The advantage of this approach is that you will probably not need to change your existing application, just run it on more machines. This works best if your load balancer does not need to wait for a response, just forward the message.
    2. If your communication needs are more complex or are bidirectional, you can use a message bus (such as ZeroMQ) as the communication layer between message-processors, alert-senders, and database-checkers. The idea is to increase parallelism by having non-blocking communication occur through the bus and having each node on the bus do one thing only. You can then alter the ratio of node types depending on how long each stage of message processing takes. (I.e. to make the queue depth equal across the entire message processing process.)
1
votes

When you get a message, do two things:

  • Check to see if it should trigger an alert (and send the alert if necessary, presumably)
  • Insert it into the database

You don't need a message queue, multiple processes, IPC, or any of those things. For example:

def messageReceived(self, message):
    self.checkForAlerts(message).addCallbacks(self.maybeAlert, log.err)
    self.saveMessageToDatabase(message).addErrback(log.err)