2
votes

I am trying to write a Batch Mail Service which has two methods:

add(Mail mail): Mails can be sent, is called by Producers

flushMailService(): flush the service. Consumers should take a List, and call another (expensive) method. Usually the expensive method should only be called after reaching the batch size.

This is somewhat similar to this question: Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch

It is possible to do that with poll() which has timeouts. But a producer should be able to flush the mail service if it does not want to wait for the timeout, but cause the producer to send any mails which are in the queue.

poll(20, TimeUnit.SECONDS) can be interrupted. If it is interrupted, all mails in the queue should be sent regardless if the batch size is reached until the queue is empty (using poll(), which returns null immediately if the queue is empty. Once it is empty, the mails which were sent by the producer which interrupted have already be sent. Then, the producer should call then blocking version of poll again until interrupted by any other producer and so on.

This seems to work with the given implementation.

I tried to use ExecutorServices with Futures, but it seems that a Future can only be interrupted once since they are considered canceled after the first interrupt. Therefore, I resorted to Threads which can be interrupted multiple times.

Currently I have the following implementation which seems to work (but is using "raw" threads).

Is this a reasonable approach? Or maybe another approach can be used?

public class BatchMailService {   
   private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();
   private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>();
   private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class);

   public void checkMails() {

        int batchSize = 100;
        int timeout = 20;
        int consumerCount = 5;

        Runnable runnable = () -> {
            boolean wasInterrupted = false;

            while (true) {
                List<Mail> buffer = new ArrayList<>();
                while (buffer.size() < batchSize) {
                    try {
                        Mail mail;
                        wasInterrupted |= Thread.interrupted();
                        if (wasInterrupted) {
                            mail = queue.poll(); // non-blocking call
                        } else {
                            mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call
                        }
                        if (mail != null) {  // mail found immediately, or within timeout
                            buffer.add(mail);
                        } else { // no mail in queue, or timeout reached
                            LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread());
                            wasInterrupted = false;
                            break;
                        }
                    } catch (InterruptedException e) {
                        LOGGER.info("{} interrupted", Thread.currentThread());
                        wasInterrupted = true;
                        break;
                    }
                }
                if (!buffer.isEmpty()) {
                    LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size());
                    mailService.sendMails(buffer);
                }
            }
        };

        LOGGER.info("starting 5 threads ");
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(runnable);
            threads.add(thread);
            thread.start();
        }

    }

    public void addMail(Mail mail) {
        queue.add(mail);
    }

    public void flushMailService() {
        LOGGER.info("flushing BatchMailService");
        for (Thread t : threads) {
            t.interrupt();
        }
    }
}

Another approach without interrupt, but a variant of poison pill (Mail POISON_PILL = new Mail()) might be the following. Probably works best when there is one consumer thread. At least, for one poison pill, only one consumer will continue.

Runnable runnable = () -> {
      boolean flush = false;
      boolean shutdown = false;

      while (!shutdown) {
           List<Mail> buffer = new ArrayList<>();
           while (buffer.size() < batchSize && !shutdown) {
               try {
                   Mail mail;
                   if (flush){
                       mail = queue.poll();
                       if (mail == null) {
                           LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed");
                           flush = false;
                           break;
                       }
                   }else {
                      mail = queue.poll(5, TimeUnit.SECONDS); // blocking call
                   }

                   if (mail == POISON_PILL){  // flush
                       LOGGER.info(Thread.currentThread() + " got flush");
                       flush = true;
                   }
                   else if (mail != null){
                       buffer.add(mail);
                   }
               } catch (InterruptedException e) {
                   LOGGER.info(Thread.currentThread() + " interrupted");
                   shutdown = true;
               }
           }
           if (!buffer.isEmpty()) {
               LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails");
               mailService.sendEmails(buffer);
           }
       }
    };

public void flushMailService() {
    LOGGER.info("flushing BatchMailService");
    queue.add(POISON_PILL);
}
1

1 Answers

1
votes

How about using signal and await instead of interrupt?

Producers put mail and signal if it needs to be flushed. Dispatcher waits for signal or timeout and proceeds to send emails in consumer threads.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BatchMailService {

    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();

    public static final int BATCH_SIZE = 100;
    public static final int TIMEOUT = 20;
    public static final int CONSUMER_COUNT = 5;

    private final Lock flushLock = new ReentrantLock();
    private final Condition flushCondition = flushLock.newCondition();

    MailService mailService = new MailService();

    public void checkMails() {

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(CONSUMER_COUNT);

        while (true) {

            try {
                // wait for timeout or for signal to come
                flushLock.lock();
                flushCondition.await(TIMEOUT, TimeUnit.SECONDS);

                // flush all present emails
                final List<Mail> toFLush = new ArrayList<>();
                queue.drainTo(toFLush);

                if (!toFLush.isEmpty()) {
                    consumerExecutor.submit(() -> {
                        LOGGER.info("{} sending {} mails", Thread.currentThread(), toFLush.size());
                        mailService.sendEmails(toFLush);
                    });
                }

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break; // terminate execution in case of external interrupt
            } finally {
                flushLock.unlock();
            }
        }

    }

    public void addMail(Mail mail) {

        queue.add(mail);

        // check batch size and flush if necessary
        if (queue.size() >= BATCH_SIZE) {

            try {
                flushLock.lock();
                if (queue.size() >= BATCH_SIZE) {
                    flushMailService();
                }
            } finally {
                flushLock.unlock();
            }
        }
    }

    public void flushMailService() {
        LOGGER.info("flushing BatchMailService");
        try {
            flushLock.lock();
            flushCondition.signal();
        } finally {
            flushLock.unlock();
        }
    }

}