1
votes

I am looking for a "better" way of pausing and resuming execution of worker threads' tasks.

Usually one would use a boolean flag such as an AtomicBoolean to set a flag state which the worker threads check before a next iteration in their work.

Something like the code snippet below

    AtomicBoolean b = new AtomicBoolean(true);
    Runnable r = () -> {
        while (true) {
            // check for pause
            while (!b.get()) {
                // sleep thread then check again
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // do work
        }
    };
    Thread p = new Thread(r);
    p.start();

This is certainly not an elegant solution, and for this reason I have been experimenting with locks and conditions.

Before jumping into my attempt at using locks & conditions, first we take a look at the examples and docs. I will be using a ReentrantLock and a Condition as my flag for pause/resume states in my ExecutorService worker tasks.

ReentrantLock

Using ReentrantLock.lock() will pause execution of the current thread until the lock can be acquired, an alternative is ReentrantLock.tryLock() which allows us to acquire the lock if not held by another thread.

Condition

Condition by way of the example found here allows one to signal another thread to either wait or continue with execution by either using Condition.await() or Condition.signal() (or Condition.signalAll() ), see below

Problem:

By Reviewing the documentation specifically for Condition.await() and the 4 conditions in which await() can be called, it leads me to believe that Condition can only be used to resume a task, not pause a task.

The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:

  • Some other thread invokes the signal() method for this Condition and the current thread happens to be chosen as the thread to be awakened; or

This means I can call condition.signalAll() from my UI thread on a condition that is currently await()ing in a worker thread, thus scheduling it for execution. i.e. Resume

However I cannot see a way to arrive at this state i.e. Pause using conditions since for a condition to go into an await state, the documentation says:

The current thread is assumed to hold the lock associated with this Condition when this method is called. It is up to the implementation to determine if this is the case and if not, how to respond. Typically, an exception will be thrown (such as IllegalMonitorStateException) and the implementation must document that fact.


My test example using Android

Logic

  1. User clicks start button
  2. Executor Thread Pool starts incrementing a counter (indefinitely) and displaying it on the UI (symbolizing a long running function)
  3. User clicks pause button
  4. Executor thread awaits for lock to be unlocked (where condition comes in)
  5. User clicks resume button
  6. Condition signaled, executor thread continues incrementing till stop/exit

MainActivity.java

public class MainActivity extends AppCompatActivity {

    private TextView lblCounter;
    private Button btnStartStop, btnPauseResume;

    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition waitCondition = pauseLock.newCondition();
    private ExecutorService executorService = Executors.newWorkStealingPool(4);

    private AtomicBoolean stopStart = new AtomicBoolean(true);

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        lblCounter = findViewById(R.id.counter);
        btnStartStop = findViewById(R.id.startStop);
        btnPauseResume = findViewById(R.id.pauseResume);

        btnStartStop.setOnClickListener(v -> {
            if (stopStart.get()) {
                start();
            } else {
                stop();
            }
        });

        btnPauseResume.setOnClickListener(v -> {
            pauseResume();
        });
    }

    public void start() {
        btnStartStop.setText("Stop");
        AtomicInteger i = new AtomicInteger(0);
        executorService.execute(() -> {
            while (true) {

                // here we check if the lock is locked, if so, we should wait await until a signal is emmited
                while (pauseLock.isLocked()) {
                    try {
                        waitCondition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                int i1 = i.incrementAndGet();
                lblCounter.setText(String.valueOf(i1));
            }
        });
    }

    public void stop() {
        executorService.shutdownNow();
        btnStartStop.setText("Start");
    }

    public void pauseResume() {
        if (pauseLock.isLocked()) {
            pauseLock.unlock();
            waitCondition.signal();
            btnPauseResume.setText("Pause");
        } else {
            pauseLock.lock();
            btnPauseResume.setText("Resume");
        }
    }
}

Main Activity XML

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">

    <TextView
        android:id="@+id/counter"
        android:layout_width="0dp"
        android:layout_height="wrap_content"
        android:layout_marginStart="96dp"
        android:layout_marginTop="8dp"
        android:layout_marginEnd="96dp"
        android:text="123"
        android:textColor="#000000"
        android:textSize="36sp"
        android:textStyle="bold"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintLeft_toLeftOf="parent"
        app:layout_constraintRight_toRightOf="parent"
        app:layout_constraintStart_toStartOf="parent"
        app:layout_constraintTop_toBottomOf="@+id/textView" />

    <Button
        android:id="@+id/startStop"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginBottom="128dp"
        android:text="Start"
        app:layout_constraintBottom_toBottomOf="parent"
        app:layout_constraintEnd_toStartOf="@+id/pauseResume"
        app:layout_constraintHorizontal_bias="0.5"
        app:layout_constraintStart_toStartOf="parent" />

    <Button
        android:id="@+id/pauseResume"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="Pause"
        app:layout_constraintBottom_toBottomOf="@+id/startStop"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintHorizontal_bias="0.5"
        app:layout_constraintStart_toEndOf="@+id/startStop" />

    <TextView
        android:id="@+id/textView"
        android:layout_width="0dp"
        android:layout_height="wrap_content"
        android:layout_marginStart="96dp"
        android:layout_marginTop="128dp"
        android:layout_marginEnd="96dp"
        android:text="Counter"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintStart_toStartOf="parent"
        app:layout_constraintTop_toTopOf="parent" />

</androidx.constraintlayout.widget.ConstraintLayout>

A problem with this code impl above is in the while loop before surround this waitCondition.await(); I don't acquire a lock on pauseLock, thus when the user clicks the pauseResume button and I acquire a lock in pauseResume(), the worker thread executes the waitCondition.await() causing an exception below:

E/AndroidRuntime: FATAL EXCEPTION: ForkJoinPool-1-worker-1
Process: nmu.wrpv302.myapplication, PID: 11744
java.lang.IllegalMonitorStateException
    at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:156)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1291)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1752)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2064)
    at com.example.myapplication.MainActivity.lambda$start$2$MainActivity(MainActivity.java:61)

        ^ --------------------- PROBLEM -----------------------

    at com.example.myapplication.-$$Lambda$MainActivity$te94WnCx7dwprXfxnjJZuoEc1_8.run(Unknown Source:4)
    at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1411)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:285)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1155)
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1993)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1941)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Question:

Using Conditions & Locks, how can I improve on the standard boolean flag check + thread.sleep() for worker threads to pause/resume execution

1

1 Answers

1
votes

I think that for your particular example of logic (to manage a counter) Conditions, Locks etc. seem to be overkill.

  1. If we see this process as one single long living process, old good Object.wait()/notify()/notifyAll() with explicitly instantiated Thread should work just perfectly:

     final Object lock = new Object();
    
     AtomicBoolean b = new AtomicBoolean(true);
     Runnable r = () -> {
         _end:
         while (true) {
             while (b.get()) {
                 if (Thread.currentThread().isInterrupted()) {
                     break _end;
                 }
                 // increment the counter
             }
             synchronized (lock) {
                 while (!b.get()) {
                     try {
                         lock.wait();
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
                         break _end;
                     }
                 }
             }
         }
     };
     Thread p = new Thread(r);
     p.start();
    
     ...
     b.set(false); // disable
     ...
     synchronized (lock) {
         b.set(true); // enable
         lock.notify(); // to notify for enabled state change
     }
     ...
     b.set(false); // disable
    
  2. ReentrantLock + Condition work exactly like Object.wait()/notify()/notifyAll() (inside synchonized block). All Condition's methods must be called when the current thread holds the lock, otherwise you get IllegalMonitorStateException (see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html):

     final ReentrantLock lock = new ReentrantLock();
     final Condition enabled = lock.newCondition();
    
     AtomicBoolean b = new AtomicBoolean(true);
     Runnable r = () -> {
         _end:
         while (true) {
             while (b.get()) {
                 if (Thread.currentThread().isInterrupted()) {
                     break _end;
                 }
                 // increment the counter
             }
             lock.lock();
             try {
                 while (!b.get()) {
                     try {
                         enabled.await();
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
                         break _end;
                     }
                 }
             } finally {
                 lock.unlock();
             }
         }
     };
     Thread p = new Thread(r);
     p.start();
    
     ...        
     b.set(false); // disable
    
     ...
     lock.lock(); // if you don't lock before signal, you get IllegalMonitorStateException
     try {
         b.set(true); // enable
         enabled.signal(); // to notify for enabled state change
     } finally {
         lock.unlock();
     }
    
     ...
     b.set(false); // disable
    
  3. We also can see the process as a sequence of increment tasks. In this case ExecutorService is a good choice. Just store the reference to your task to cancel it when required:

     ExecutorService executor = Executors.newSingleThreadExecutor(); // one single thread - no race conditions between sequentially submitted tasks guaranteed
    
     Future task;
    
     task = executor.submit(() -> { // start increment
         while (!Thread.currentThread().isInterrupted()) {
             // increment the counter
         }
     });
     ...
    
     task.cancel(true); // stop increment
    
     ...
    
     task = executor.submit(() -> { // start increment again
         while (!Thread.currentThread().isInterrupted()) {
             // increment the counter
         }
     });
     ...
     task.cancel(true); // stop increment
    

Additional notes:

  • If you need not just to increment counter, but to wait for a job to be ready to be executed, I'd suggest to use a BlockingQueue to communicate threads (Work/Job/Task Queue, Producer-Consumer patterns) (see, for example https://www.baeldung.com/java-blocking-queue).
  • If you make blocking IO calls (on a socket, for example), you better use explicitly a thread and close() (called from another thread) as described https://stackoverflow.com/a/4426050/3319725. Set a volatile flag isClosing before to know that SocketException just happened is not an error, but just a side effect of the closing and it can be ignored.
  • If you catch an InterruptedException, don't forget to restore the Thread.interrupted flag with Thread.currentThread().interrupt() if there is a chance that someone else can verify it later (https://www.ibm.com/developerworks/library/j-jtp05236/index.html)