This is how my existing system works.
I have batch written using spring batch which writes messages to queues ASYNCHRONOUSLY. The writers once send certain number of messages to queue, starts listening to LINKED_BLOCKING_QUEUE for same number of messages.
I have spring amqp listeners which consumes messages and process them. Once processed, consumer replies back on reply queue. There are listeners which listens to reply queue to check whether messages are successfully processed or not. The reply listener retrives response and add it to LINKED_BLOCKING_QUEUE which is then fetched by writer. Once writer fetch all responses finishes batch. If there is exception, it stops the batch.
This is my job configurations
<beans:bean id="computeListener" class="com.st.symfony.Foundation"
p:symfony-ref="symfony" p:replyTimeout="${compute.reply.timeout}" />
<rabbit:queue name="${compute.queue}" />
<rabbit:queue name="${compute.reply.queue}" />
<rabbit:direct-exchange name="${compute.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${compute.queue}" key="${compute.routing.key}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${compute.listener.concurrency}"
requeue-rejected="false" prefetch="1">
<rabbit:listener queues="${compute.queue}" ref="computeListener"
method="run" />
</rabbit:listener-container>
<beans:beans profile="master">
<beans:bean id="computeLbq" class="java.util.concurrent.LinkedBlockingQueue" />
<beans:bean id="computeReplyHandler" p:blockingQueue-ref="computeLbq"
class="com.st.batch.foundation.ReplyHandler" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="1"
requeue-rejected="false">
<rabbit:listener queues="${compute.reply.queue}" ref="computeReplyHandler"
method="onMessage" />
</rabbit:listener-container>
<beans:bean id="computeItemWriter"
class="com.st.batch.foundation.AmqpAsynchItemWriter"
p:template-ref="amqpTemplate" p:queue="${compute.queue}"
p:replyQueue="${compute.reply.queue}" p:exchange="${compute.exchange}"
p:replyTimeout="${compute.reply.timeout}" p:routingKey="${compute.routing.key}"
p:blockingQueue-ref="computeLbq"
p:logFilePath="${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/log.txt"
p:admin-ref="rabbitmqAdmin" scope="step" />
<job id="computeJob" restartable="true">
<step id="computeStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="computeFileItemReader" processor="computeItemProcessor"
writer="computeItemWriter" commit-interval="${compute.commit.interval}" />
</tasklet>
</step>
</job>
</beans:beans>
This is my writer code,
public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T> {
protected String exchange;
protected String routingKey;
protected String queue;
protected String replyQueue;
protected RabbitTemplate template;
protected AmqpAdmin admin;
BlockingQueue<Object> blockingQueue;
String logFilePath;
long replyTimeout;
// Getters and Setters
@Override
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Message message = MessageBuilder
.withBody(item.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setReplyTo(this.replyQueue)
.setCorrelationId(item.toString().getBytes()).build();
template.send(this.exchange, this.routingKey, message);
}
for (T item : items) {
Object msg = blockingQueue
.poll(this.replyTimeout, TimeUnit.MILLISECONDS);
if (msg instanceof Exception) {
admin.purgeQueue(this.queue, true);
throw (Exception) msg;
} else if (msg == null) {
throw new Exception("reply timeout...");
}
}
System.out.println("All items are processed.. Command completed. ");
}
}
Listener pojo
public class Foundation {
Symfony symfony;
long replyTimeout;
//Getters Setters
public Object run(String command) {
System.out.println("Running:" + command);
try {
symfony.run(command, this.replyTimeout);
} catch (Exception e) {
return e;
}
return "Completed : " + command;
}
}
This is reply handler
public class ReplyHandler {
BlockingQueue<Object> blockingQueue;
public void onMessage(Object msgContent) {
try {
blockingQueue.put(msgContent);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Now, the problem is, I want to run multiple batches with unique batch id simultaneously which will process different data (of same type) for different batches.
As the number of batches are going to be increased in future, I don't want to keep adding separate queues and reply queues for each batch.
And also, to process messages simultaneously, I have multiple listeners (set with listener concurrency ) listening to queue. If I add different queue for different batches, number of listeners running will be increased which may overload servers (CPU/Memory usage goes high).
So I don't want to replicate same infrastructure for each type of batch I am going to add. I want to use same infrastructure just writers of specific batch should get only its responses not the responses of other batches running simultaneously.
Can we use same instances of item writers which use same blocking queue instances for multiple instances of batches running parallel ?