0
votes

I would like to implement the divide-and-conquer pattern http://zguide.zeromq.org/page:all#Divide-and-Conquer but with the difference that the sink would get the results in a specific order. In more detail, the ventilator would receive timestamped events and then dispatch them to workers. The sink should be able to queue in its output the events processed by the workers in the exact order received by the ventilator. I don't mind the additional latency incurred by this as I have multiple events arriving at the ventilator almost concurrently and the process time in the workers is practically fixed. I plan to distribute the events to workers randomly or using a back-pressure to keep the length of messages in each worker small.

I though of publishing the pair timestamp/assigned_worker_id from the ventilator directly to the sink and then looping the sink until the pair I want is ready to be pulled. In a similar manner, I could make a REQ/REP between the workers and the sink (but not sure which would make the REQ yet). Any simpler idea I don't see? Many thanks in advance, filimon

1

1 Answers

0
votes

A general solution to this can be found in the TCP protocol which also has to re-create the data order from unordered streams. The approach there uses a sliding window to store and sort the data. In addition, it handles missing data as well.

If you don't need to handle missing input data, you can just number the requests in the ventilator and queue them in the sink until next request arrives send it out. In C++/zeroMQ pseudo code, this would like

class Ventilator {
public:
    void newRequest(...) {
         // create multi-part message [counter, message]
         // and send it on the push socket
         out.send( counter++ );
         out.send( ... );
    }
private:
    zmq::socket_t out;
    int counter = 0;
};

// the workers receive the multi-part message, produce a result
// and send it with the counter to the sink
class Worker {
    void run() {
        auto counter = in.recv();
        auto request = in.recv();
        // process and create result message
        auto result = process(request);
        out.send( counter, ZMQ_SNDMORE);
        out.send( result );
    }
private:
    zmq::socket_t in;
    zmq::socket_t out;
};

class Sink {
    void run() {
        auto counter = in.recv();
        auto data = in.recv();
        auto const c = static_cast<int*>(counter.data());

        auto cmp = [](auto x, auto y) {return x.first < y.first;};
        q.push_back( make_pair(( c, data) );
        std::push_heap( q.begin(), q.end(), cmp );

        if (lastCounter == -1 || lastCounter+1 == c)
        {
            // this is the next message in order
            std::pop_heap( q.begin(), q.end(), cmp );
            out.send( q.back() );
            q.pop_back();
            lastCounter = c;
        }
    }        

    zmq::socket_t in;
    zmq::socket_t out;
    std::vector< std::pair<int, zmq::message_t>> q;
    int lastCounter = -1; // to handle first answer
}