I've accomplished something similar with SQS using Java's ExecutorService, Future, and the ConcurrentLinkedQueue.
The ExecutorService creates a thread pool that can execute classes that implement the Callable interface and returns a Future. As the ExecutorService creates the futures I push them onto a ConcurrentLinkedQueue that runs in a thread and processes the results as the futures complete.
Implement checking SQS and starting the work asynchronously:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class SqsProcessor {
private static final int THREAD_COUNT = 100;
private ExecutorService _executor = null;
private FutureResultProcessor futureResultProcessor = null;
public SqsProcessor() {
_executor = Executors.newFixedThreadPool(THREAD_COUNT);
_futureResultProcessor = new FutureResultProcessor();
}
public void waitReceive() {
// Receive a SQS message
// Start the work related to the SQS message
Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage);
Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);
// Send to the queue so the result can be processed when it completes
_futureResultProcessor.add(sqsFuture);
}
}
Class that does the work:
import java.util.concurrent.Callable;
public class MyWorker implements Callable<MyWorkerResult> {
private String _sqsMessage = null;
public MyWorker(String sqsMessage) {
_sqsMessage = sqsMessage;
}
@Override
public MyWorkerResult call() throws Exception {
// Do work relating to the SQS message
}
}
Holds the results of the work:
public class MyWorkerResult {
// Results set in MyWorker call()
}
ConcurrentLinkedQueue to receive and process the future results:
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FutureResultProcessor extends Thread {
private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
private final Integer CHECK_SLEEP = 300;
public FutureResultProcessor() {
}
public void run() {
while(true) {
Future<MyWorkerResult> myFuture = resultQueue.poll();
if(myFuture == null) {
// There's nothing to process
try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
continue;
}
// Process result
if(myFuture != null) {
MyFutureResult myFutureResult = myFuture.get();
// Process result
}
}
}
public void add(Future<MyWorkerResult> sqsFuture) {
resultQueue.offer(sqsFuture);
}
}
Alternatively you could collect a group of futures and wait for them all to finish before processing the results.
Akka could be a good fit. I haven't used it directly, but it provides a framework for running asynchronous tasks, provides error handling, and could even distribute the tasks to remote instances.