1
votes

This is how my existing system works.

I have batch written using spring batch which writes messages to queues ASYNCHRONOUSLY. The writers once send certain number of messages to queue, starts listening to LINKED_BLOCKING_QUEUE for same number of messages.

I have spring amqp listeners which consumes messages and process them. Once processed, consumer replies back on reply queue. There are listeners which listens to reply queue to check whether messages are successfully processed or not. The reply listener retrives response and add it to LINKED_BLOCKING_QUEUE which is then fetched by writer. Once writer fetch all responses finishes batch. If there is exception, it stops the batch.

This is my job configurations

<beans:bean id="computeListener" class="com.st.symfony.Foundation"
    p:symfony-ref="symfony" p:replyTimeout="${compute.reply.timeout}" />

<rabbit:queue name="${compute.queue}" />
<rabbit:queue name="${compute.reply.queue}" />

<rabbit:direct-exchange name="${compute.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${compute.queue}" key="${compute.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${compute.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${compute.queue}" ref="computeListener"
        method="run" />
</rabbit:listener-container>


<beans:beans profile="master">

    <beans:bean id="computeLbq" class="java.util.concurrent.LinkedBlockingQueue" />

    <beans:bean id="computeReplyHandler" p:blockingQueue-ref="computeLbq"
        class="com.st.batch.foundation.ReplyHandler" />

    <rabbit:listener-container
        connection-factory="rabbitConnectionFactory" concurrency="1"
        requeue-rejected="false">
        <rabbit:listener queues="${compute.reply.queue}" ref="computeReplyHandler"
            method="onMessage" />
    </rabbit:listener-container>


    <beans:bean id="computeItemWriter"
        class="com.st.batch.foundation.AmqpAsynchItemWriter"
        p:template-ref="amqpTemplate" p:queue="${compute.queue}"
        p:replyQueue="${compute.reply.queue}" p:exchange="${compute.exchange}"
        p:replyTimeout="${compute.reply.timeout}" p:routingKey="${compute.routing.key}"
        p:blockingQueue-ref="computeLbq"
        p:logFilePath="${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/log.txt"
        p:admin-ref="rabbitmqAdmin" scope="step" />


    <job id="computeJob" restartable="true">
        <step id="computeStep">
            <tasklet transaction-manager="transactionManager">
                <chunk reader="computeFileItemReader" processor="computeItemProcessor"
                    writer="computeItemWriter" commit-interval="${compute.commit.interval}" />
            </tasklet>
        </step>
    </job>      


</beans:beans>

This is my writer code,

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T> {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;
    protected AmqpAdmin admin;
    BlockingQueue<Object> blockingQueue;
    String logFilePath;
    long replyTimeout;


    // Getters and Setters

    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {


            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);
        }

        for (T item : items) {

            Object msg = blockingQueue
                    .poll(this.replyTimeout, TimeUnit.MILLISECONDS);

            if (msg instanceof Exception) {

                admin.purgeQueue(this.queue, true);
                throw (Exception) msg;

            } else if (msg == null) {
                throw new Exception("reply timeout...");
            } 

        }

        System.out.println("All items are processed.. Command completed.  ");

    }

}

Listener pojo

public class Foundation {

    Symfony symfony;

    long replyTimeout;

    //Getters Setters

    public Object run(String command) {

        System.out.println("Running:" + command);

        try {
            symfony.run(command, this.replyTimeout);
        } catch (Exception e) {
            return e;
        }

        return "Completed : " + command;
    }
}

This is reply handler

public class ReplyHandler {

    BlockingQueue<Object> blockingQueue;

    public void onMessage(Object msgContent) {

        try {

            blockingQueue.put(msgContent);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }
    }

}

Now, the problem is, I want to run multiple batches with unique batch id simultaneously which will process different data (of same type) for different batches.

As the number of batches are going to be increased in future, I don't want to keep adding separate queues and reply queues for each batch.

And also, to process messages simultaneously, I have multiple listeners (set with listener concurrency ) listening to queue. If I add different queue for different batches, number of listeners running will be increased which may overload servers (CPU/Memory usage goes high).

So I don't want to replicate same infrastructure for each type of batch I am going to add. I want to use same infrastructure just writers of specific batch should get only its responses not the responses of other batches running simultaneously.

Can we use same instances of item writers which use same blocking queue instances for multiple instances of batches running parallel ?

2

2 Answers

0
votes

You may want to look into JMS Message Selectors.

As per Docs

The createConsumer and createDurableSubscriber methods allow you to specify a message selector as an argument when you create a message consumer.

The message consumer then receives only messages whose headers and properties match the selector.

0
votes

There is no equivalent of a JMS message selector expression in the AMQP (RabbitMQ) world.

Each consumer has to have his own queue and you use an exchange to route to the appropriate queue, using a routing key set by the sender.

It is not as burdensome as you might think; you don't have to statically configure the broker; the consumers can use a RabbitAdmin to declare/delete exchanges, queues, bindings on demand.

See Configuring the Broker in the Spring AMQP documentation.