In Apache Camel 2.19.0, I want to produce messages and consume the result asynchronously on a concurrent seda queue while at the same time blocking if the executors on the seda queue are full. The use case behind it: I need to process large files with many lines and need to create batches for it because a single message for each individual line is too much overhead, whereas I cannot fit the entire file into heap. But in the end, I need to know whether all batches I triggered have completed successfully. So effectively, I need a back pressure mechanism to spam the queue while at the same time want to leverage multi-threaded processing.
Here is a quick example in Camel and Spring. The route I configured:
package com.test;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class AsyncCamelRoute extends RouteBuilder {
public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true";
@Override
public void configure() throws Exception {
from(ENDPOINT)
.process(exchange -> {
System.out.println("Processing message " + (String)exchange.getIn().getBody());
Thread.sleep(10_000);
});
}
}
The producer looks like this:
package com.test;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Component
public class AsyncProducer {
public static final int MAX_MESSAGES = 100;
@Autowired
private ProducerTemplate producerTemplate;
@EventListener
public void handleContextRefresh(ContextRefreshedEvent event) throws Exception {
new Thread(() -> {
// Just wait a bit so everything is initialized
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<CompletableFuture> futures = new ArrayList<>();
System.out.println("Producing messages");
for (int i = 0; i < MAX_MESSAGES; i++) {
CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i));
futures.add(future);
}
System.out.println("All messages produced");
System.out.println("Waiting for subtasks to finish");
futures.forEach(CompletableFuture::join);
System.out.println("Subtasks finished");
}).start();
}
}
The output of this code looks like:
Producing messages
All messages produced
Waiting for subtasks to finish
Processing message 6
Processing message 1
Processing message 2
Processing message 5
Processing message 8
Processing message 7
Processing message 9
...
Subtasks finished
So it seems that blockIfFull is ignored and all messages are created and put onto the queue prior to processing.
Is there any way to create messages so that I can use async processing in camel while at the same time making sure that putting elements onto the queue will block if there are too many unprocessed elements?
requestBody(..)
instead ofasyncRequestBody(..)
? It might be you end up with lots of blocked threads in a pool used to do the asynchronous message sending. Instead of blocking your client thread. - RalfasyncRequestBody(..)
won't be blocked unless the thread pool handling the async task is exhausted. But if threads get created as needed in the pool, then you will never see your looping thread being blocked. - Ralf