I am not able to get the processing times for my worker threads. I have a thread monitoring the processing times for my worker threads and reports the time taken and how many of the worker threads are in processing state.
Questions
My monitoring thread, always reports that there are no workers in processing state? I suspect my monitoring thread is not able to evaluate the worker threads appropriately.
I have synchronised some of the methods that fetch and record the processing times and processing state. Is that good enough for my monitor thread to keep track of processing times without thread overwrites.
Should the isProcessing boolean variable be a volatile variable?
MyWorker class
public class MyWorker implements Runnable {
private final int WAIT_INTERVAL = 200;
private MyService myService;
private MyProvider myProvider;
private boolean stopRequested = false;
private boolean isProcessing; // Flag to indicate the worker is currently processing a message
private long processingStartTime; //captures when the worker started processing a message
private Logger logger = new Logger();
public MyWorker(MyService myService, MyProvider myProvider){
this.myService = myService;
this.myProvider = myProvider;
this.isProcessing = false;
this.processingStartTime = -1;
}
public void setStopRequested() {
stopRequested = true;
}
private synchronized void recordProcessingStart(long start){
isProcessing = true;
processingStartTime = start;
}
private synchronized void recordProcessingStop(){
isProcessing = false;
processingStartTime = -1;
}
public synchronized boolean isProcessing(){
return isProcessing;
}
public synchronized long getProcessingStartTime(){
return processingStartTime;
}
@Override
public void run() {
while (!stopRequested) {
boolean processingMessage = false;
List<Message> messages = myProvider.getPendingMessages();
if (messages.size() != 0) {
logger.log("We have " + messages.size() + " messages");
recordProcessingStart(System.currentTimeMillis());
for (Message message : messages) {
processMessage(messages);
}
recordProcessingStop();
}
if (!(processingMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private void processMessage(Message messages){
myService.process(messages);
}
}
WorkerManager class
public class WorkerManager implements Runnable {
private MyWorker[] workers;
private int workerCount;
private boolean stopRequested;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private Logger logger = new Logger();
public WorkerManager(int count) {
this.workerCount = count;
}
@Override
public void run() {
stopRequested = false;
boolean managerStarted = false;
while (!stopRequested) {
if (!managerStarted) {
workers = new MyWorker[workerCount];
for (int i = 0; i < workerCount; i++) {
final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1));
workerThread.start();
}
startProcessMonitoringThread();
managerStarted = true;
}
}
}
public void stop() {
stopRequested = true;
}
public void cleanUpOnExit() {
for (MyWorker w : workers) {
w.setStopRequested();
}
stopProcessMonitoringThread();
}
private void startProcessMonitoringThread(){
scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleWithFixedDelay( new WorkerMonitorThread(workers), 1, 1, TimeUnit.MINUTES);
logger.info("Monitoring of worker threads initialized ...");
}
private void stopProcessMonitoringThread(){
scheduledExecutorService.shutdown();
logger.info("Successfully shutdown worker monitoring thread ...");
}
private class WorkerMonitorThread implements Runnable{
private MyWorker[] workers;
WorkerMonitorThread(MyWorker workers[]){
this.workers = workers;
}
@Override
public void run() {
String trackingId = "["+ UUID.randomUUID().toString() + "] - ";
logger.info(trackingId + "Calculating processing times of worker threads ...");
StringBuilder sb = new StringBuilder();
int totalWorkersInProcess = 0;
for (int i = 0; i < workerCount; i++) {
MyWorker worker = workers[i];
if(worker.isProcessing()){
totalWorkersInProcess++;
long startTime = worker.getProcessingStartTime();
long currentTime = System.currentTimeMillis();
long duration = currentTime - startTime;
sb.append(trackingId + "Worker-" + (i + 1) + " has been processing for the last : " + duration + " ms");
}
}
sb.append(trackingId + "Total worker(s) in progress : " + totalWorkersInProcess);
logger.info(sb.toString());
}
}
}