0
votes

What is the best approach to check how long my worker threads have been running since it picked up a message for processing and then log an error message if it exceeds a threshold time limit. I presume that needs to be managed in the WorkerManager class.

  1. My WorkerManager kick starts the worker threads
  2. If there are messages from the provider, then the worker thread processes them by calling a service class.
  3. If there are no messages then it goes to sleep for a brief period.

When my worker is processing the messages and if it takes more than say 5 minutes to process, then I want to generate a warn message but still let the worker thread continue processing.

Question

I want to constantly check if my worker threads are exceeding processing of the messages by 5 minutes, if they exceed the threshold time, then I want to log an error message but still let the worker thread continue as is.

WorkerManager Class

public class WorkerManager implements Runnable {

    private MyWorker[] workers;
    private int workerCount;
    private boolean stopRequested;

    public WorkerManager(int count){
        this.workerCount = count;
    }

    @Override
    public void run(){
        stopRequested = false;
        boolean managerStarted = false;

        while (!stopRequested) {
            if(!managerStarted) {
                workers = new MyWorker[workerCount];
                for (int i = 0; i < workerCount; i++) {
                    final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1));
                    workerThread.start();
                }
                managerStarted = true;
            }
        }
    }

    public void stop(){
        stopRequested = true;
    }

    //Calll this
    public void cleanUpOnExit() {
        for(MyWorker w: workers){
            w.setStopRequested();
        }
    }
}

Worker Class

   public class MyWorker implements Runnable {

    private final int WAIT_INTERVAL = 200;
    private MyService myService;
    private MyProvider myProvider;
    private boolean stopRequested = false;

    public MyWorker(MyService myService, MyProvider myProvider){
        this.myService = myService;
        this.myProvider = myProvider;
    }

    public void setStopRequested() {
        stopRequested = true;
    }

    @Override
    public void run() {

        while (!stopRequested) {
            boolean processedMessage = false;
            List<Message> messages = myProvider.getPendingMessages();
            if (messages.size() != 0) {
                AdapterLog.debug("We have " + messages.size() + " messages");
                processedMessage = true;
                for (Message message : messages) {
                    processMessage(messages);
                }
            }

            if (!(processedMessage || stopRequested)) {
                // this is to stop the thread from spinning when there are no messages
                try {
                    Thread.sleep(WAIT_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void processMessage(Message messages){
        myService.process(messages);
    }
}
1

1 Answers

0
votes

Your WorkerManager needs a way to determine when the last message for each worker has been processed. So the workers will need to keep track of the timestamp of the last processed message.

Then, your WorkerManager could check the timestamps of each worker and generate the warnings if needed. In order to check the workers using a given period, you could use an exectutor:

    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(this::checkTimeoutProcessingMessages, 5l, 5l, TimeUnit.SECONDS);

And you could check the times getting the timestamp from each worker:

    public void checkTimeoutProcessingMessages() {
        for (MyWorker worker : workers) {
            long lastProcessed = worker.getLastProcessedMessageTimestamp();
            long currentTimestamp = System.currentTimeMillis();
            if (lastProcessed + 5000 > currentTimestamp) {
                //warn message
            }
        }
    }