0
votes

I am not able to get the processing times for my worker threads. I have a thread monitoring the processing times for my worker threads and reports the time taken and how many of the worker threads are in processing state.

Questions

  1. My monitoring thread, always reports that there are no workers in processing state? I suspect my monitoring thread is not able to evaluate the worker threads appropriately.

  2. I have synchronised some of the methods that fetch and record the processing times and processing state. Is that good enough for my monitor thread to keep track of processing times without thread overwrites.

  3. Should the isProcessing boolean variable be a volatile variable?

MyWorker class

public class MyWorker implements Runnable {

    private final int WAIT_INTERVAL = 200;
    private MyService myService;
    private MyProvider myProvider;
    private boolean stopRequested = false;
    private boolean isProcessing; // Flag to indicate the worker is currently processing a message
    private long processingStartTime; //captures when the worker started processing a message
    private Logger logger = new Logger();

public MyWorker(MyService myService, MyProvider myProvider){
        this.myService = myService;
        this.myProvider = myProvider;
        this.isProcessing = false;
        this.processingStartTime = -1;
    }

    public void setStopRequested() {
        stopRequested = true;
    }


    private synchronized void recordProcessingStart(long start){
        isProcessing = true;
        processingStartTime = start;
    }

    private synchronized void recordProcessingStop(){
        isProcessing = false;
        processingStartTime = -1;
    }

    public synchronized boolean isProcessing(){
        return isProcessing;
    }

    public synchronized long getProcessingStartTime(){
        return processingStartTime;
    }

    @Override
    public void run() {

        while (!stopRequested) {
            boolean processingMessage = false;
            List<Message> messages = myProvider.getPendingMessages();
            if (messages.size() != 0) {
                logger.log("We have " + messages.size() + " messages");
                recordProcessingStart(System.currentTimeMillis());
                for (Message message : messages) {
                    processMessage(messages);
                }
                recordProcessingStop();
            }

            if (!(processingMessage || stopRequested)) {
                // this is to stop the thread from spinning when there are no messages
                try {
                    Thread.sleep(WAIT_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void processMessage(Message messages){
        myService.process(messages);
    }
}

WorkerManager class

public class WorkerManager implements Runnable {

    private MyWorker[] workers;
    private int workerCount;
    private boolean stopRequested;
    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private Logger logger = new Logger();

    public WorkerManager(int count) {
        this.workerCount = count;
    }

    @Override
    public void run() {
        stopRequested = false;
        boolean managerStarted = false;

        while (!stopRequested) {
            if (!managerStarted) {
                workers = new MyWorker[workerCount];
                for (int i = 0; i < workerCount; i++) {
                    final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1));
                    workerThread.start();
                }
                startProcessMonitoringThread();
                managerStarted = true;
            }
        }
    }

    public void stop() {
        stopRequested = true;
    }

    public void cleanUpOnExit() {
        for (MyWorker w : workers) {
            w.setStopRequested();
        }
        stopProcessMonitoringThread();
    }

    private void startProcessMonitoringThread(){

        scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleWithFixedDelay( new WorkerMonitorThread(workers), 1, 1, TimeUnit.MINUTES);
        logger.info("Monitoring of worker threads initialized ...");
    }

    private void stopProcessMonitoringThread(){
        scheduledExecutorService.shutdown();
        logger.info("Successfully shutdown worker monitoring thread ...");
    }


    private class WorkerMonitorThread implements Runnable{

        private MyWorker[] workers;
        WorkerMonitorThread(MyWorker workers[]){
            this.workers = workers;

        }

        @Override
        public void run() {
            String trackingId = "["+ UUID.randomUUID().toString() + "] - ";
            logger.info(trackingId + "Calculating processing times of worker threads ...");

            StringBuilder sb = new StringBuilder();
            int totalWorkersInProcess = 0;
            for (int i = 0; i < workerCount; i++) {
                MyWorker worker = workers[i];
                if(worker.isProcessing()){
                    totalWorkersInProcess++;
                    long startTime = worker.getProcessingStartTime();
                    long currentTime = System.currentTimeMillis();
                    long duration = currentTime - startTime;
                    sb.append(trackingId + "Worker-" + (i + 1) + " has been processing for the last : " + duration + " ms");
                }
            }
            sb.append(trackingId + "Total worker(s) in progress : " + totalWorkersInProcess);
            logger.info(sb.toString());
        }
    }
}
1

1 Answers

0
votes

You haven“t created your workers.

Review that code of your WorkerManager run method:

            workers = new MyWorker[workerCount]; // Create an array of MyWorker, but each element of the array is null.
            for (int i = 0; i < workerCount; i++) {
                final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1)); // <-- Creates a thread and pass a null MyWorker.
                workerThread.start(); // NullPointers are coming on the other thread.
            }

To fix the bug your WorkerManager should create the workers or receive the workers to use. You could add to the constructor the dependencies to create the workers:

    public WorkerManager(int count, MyService myService, MyProvider myProvider) {
        this.workerCount = count;
        this.myService = myService;
        this.myProvider = myProvider;
    }

And then, create the workers correctly:

            workers = new MyWorker[workerCount];
            for (int i = 0; i < workerCount; i++) {
                workers[i] = new MyWorker(myService, myProvider);
            }