I am trying to write a Batch Mail Service which has two methods:
add(Mail mail)
: Mails can be sent, is called by Producers
flushMailService()
: flush the service. Consumers should take a List, and call another (expensive) method. Usually the expensive method should only be called after reaching the batch size.
This is somewhat similar to this question: Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch
It is possible to do that with poll()
which has timeouts. But a producer should be able to flush the mail service if it does not want to wait for the timeout, but cause the producer to send any mails which are in the queue.
poll(20, TimeUnit.SECONDS)
can be interrupted. If it is interrupted, all mails in the queue should be sent regardless if the batch size is reached until the queue is empty (using poll()
, which returns null
immediately if the queue is empty. Once it is empty, the mails which were sent by the producer which interrupted have already be sent. Then, the producer should call then blocking version of poll
again until interrupted by any other producer and so on.
This seems to work with the given implementation.
I tried to use ExecutorServices with Futures, but it seems that a Future can only be interrupted once since they are considered canceled after the first interrupt. Therefore, I resorted to Threads which can be interrupted multiple times.
Currently I have the following implementation which seems to work (but is using "raw" threads).
Is this a reasonable approach? Or maybe another approach can be used?
public class BatchMailService {
private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();
private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>();
private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class);
public void checkMails() {
int batchSize = 100;
int timeout = 20;
int consumerCount = 5;
Runnable runnable = () -> {
boolean wasInterrupted = false;
while (true) {
List<Mail> buffer = new ArrayList<>();
while (buffer.size() < batchSize) {
try {
Mail mail;
wasInterrupted |= Thread.interrupted();
if (wasInterrupted) {
mail = queue.poll(); // non-blocking call
} else {
mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call
}
if (mail != null) { // mail found immediately, or within timeout
buffer.add(mail);
} else { // no mail in queue, or timeout reached
LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread());
wasInterrupted = false;
break;
}
} catch (InterruptedException e) {
LOGGER.info("{} interrupted", Thread.currentThread());
wasInterrupted = true;
break;
}
}
if (!buffer.isEmpty()) {
LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size());
mailService.sendMails(buffer);
}
}
};
LOGGER.info("starting 5 threads ");
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
thread.start();
}
}
public void addMail(Mail mail) {
queue.add(mail);
}
public void flushMailService() {
LOGGER.info("flushing BatchMailService");
for (Thread t : threads) {
t.interrupt();
}
}
}
Another approach without interrupt, but a variant of poison pill (Mail POISON_PILL = new Mail()
) might be the following. Probably works best when there is one consumer thread. At least, for one poison pill, only one consumer will continue.
Runnable runnable = () -> {
boolean flush = false;
boolean shutdown = false;
while (!shutdown) {
List<Mail> buffer = new ArrayList<>();
while (buffer.size() < batchSize && !shutdown) {
try {
Mail mail;
if (flush){
mail = queue.poll();
if (mail == null) {
LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed");
flush = false;
break;
}
}else {
mail = queue.poll(5, TimeUnit.SECONDS); // blocking call
}
if (mail == POISON_PILL){ // flush
LOGGER.info(Thread.currentThread() + " got flush");
flush = true;
}
else if (mail != null){
buffer.add(mail);
}
} catch (InterruptedException e) {
LOGGER.info(Thread.currentThread() + " interrupted");
shutdown = true;
}
}
if (!buffer.isEmpty()) {
LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails");
mailService.sendEmails(buffer);
}
}
};
public void flushMailService() {
LOGGER.info("flushing BatchMailService");
queue.add(POISON_PILL);
}