9
votes

Let's say we have a thread-pool with a limited number of threads.

Executor executor = Executors.newFixedThreadPool(3);

Now let's say one of the active tasks must sleep for 3 seconds (for whatever reason).

executor.execute(() -> {
    try {
        Thread.sleep(3000L);
    } catch (InterruptedException ignore) {}
});

How can we implement such a thread-pool in way that, when a task sleeps (or waits on a monitor/condition), the thread1 can be used effectively to run another task?

1By thread I do not mean the "physical" Java thread, because that would be impossible while the thread is asleep. What I mean is, the thread-pool to have an abstract implementation which virtually seems to allow a thread to run another task during sleeping. The key point is that there are always N simultaneously running (non-sleeping) tasks.

Somewhat similar to the way a monitor handles access to a critical region:

  • If a thread waits on a resource, the resource can be used by another thread.
  • If the thread is notified, it is placed into the waiting set to (re-)gain access to that resource.
2
Make it do some job instead of sleeping, I guess.Pavel Smirnov
@PavelSmirnov Are you able to construct a working abstraction on that? One which accounts for cases with waiting on a resource? What if the executed job also sleeps (executes another job), then another, then another, etc... Then the original task would never get a chance to continue.Snackoverflow
well, if you have a job to do that requires a lock, a thread can try to acquire the lock and do the job only if it succeeded. Otherwise, it can switch to another job waiting in a queue and try to do that, or return to the original thread's task. The point I wanted to show is that you'd better do some job, if it's possible, instead of just sleeping.Pavel Smirnov
This reminds me of coroutines in lua - look it up. It may be some inspiration for you.Felix
Interesting question. I don't believe you'll get a satisfying answer as you're basically trying to implement a thread scheduler on top of the OS. You might get by using locks and P > K background threads, with custom "sleep" methods.ttzn

2 Answers

2
votes

What you are asking for is essentially implementing coroutines/fibers on top of JVM/OS thread. Nice talk was given by Sanhong Li about the way how Alibaba's engineers implemented such construction - the idea is instead of relying on OS thread scheduler you need to rely on your own Selector.

See also Loom project for fibers (user-land green threads).

0
votes

I implemented a minimal working example which basically does what I think you want.

A Task interface (much like the runnable interface, just with a passed Context to perform waiting)

package io.medev.stackoverflow;

import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;

public interface Task {

    /**
     * Wraps the given runnable into a Task with a not guessable execution time (meaning guessExecutionTime always returns Long.MAX_VALUE)
     * @param runnable The runnable to wrap
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable) {
        return wrap(runnable, Long.MAX_VALUE);
    }

    /**
     * Wraps the given runnable using the given guessedExecutionTimeMillis
     * @param runnable The runnable to wrap
     * @param guessedExecutionTimeMillis The guessed execution time in millis for this runnable
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable, long guessedExecutionTimeMillis) {
        return new Task() {
            @Override
            public long guessExecutionTimeMillis() {
                return guessedExecutionTimeMillis;
            }

            @Override
            public void run(Context context) {
                runnable.run();
            }
        };
    }

    /**
     * Should more or less guess how long this task will run
     * @return The execution time of this Task in milliseconds
     */
    long guessExecutionTimeMillis();

    void run(Context context);

    interface Context {

        /**
         * Block until the condition is met, giving other Tasks time to execute
         * @param condition the condition to check
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition) throws InterruptedException;

        /**
         * Blocks at least for the given duration, giving other Tasks time to execute
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(long timeout, TimeUnit timeUnit) throws InterruptedException;

        /**
         * Blocks until the condition is met or the timeout expires, giving other Tasks time to execute
         * @param condition the condition to check
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException;
    }
}

And a basic fixed thread-pool Executor - but you have to depend on the concrete implementation here:

package io.medev.stackoverflow;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class TimeEfficientExecutor implements Executor {

    private final BlockingQueue<Task> taskQueue;
    private final CountDownLatch latch;
    private volatile boolean alive;

    public TimeEfficientExecutor(int threads) {
        this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
        this.latch = new CountDownLatch(threads);
        this.alive = true;

        for (int i = 0; i < threads; i++) {
            Thread thread = new Thread(new TimeEfficientExecutorRunnable());
            thread.start();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        execute(Task.wrap(runnable));
    }

    public void execute(Runnable runnable, long guessedExecutionTimeMillis) {
        execute(Task.wrap(runnable, guessedExecutionTimeMillis));
    }

    public void execute(Task task) {
        this.taskQueue.offer(task);
    }

    public void shutdown() {
        this.alive = false;
    }

    public void awaitShutdown() throws InterruptedException {
        this.latch.await();
    }

    public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.latch.await(timeout, timeUnit);
    }

    private class TimeEfficientExecutorRunnable implements Runnable {

        @Override
        public void run() {
            try {
                while (TimeEfficientExecutor.this.alive) {
                    Task task = TimeEfficientExecutor.this.taskQueue.poll();

                    if (task != null) {
                        try {
                            task.run(new IdleTaskContext());
                        } catch (Exception e) {
                            // TODO: logging
                        }
                    }
                }
            } finally {
                TimeEfficientExecutor.this.latch.countDown();
            }
        }
    }

    private class IdleTaskContext implements Task.Context {

        @Override
        public void idle(BooleanSupplier condition) throws InterruptedException {
            idle(condition, Long.MAX_VALUE);
        }

        @Override
        public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(() -> false, timeout, timeUnit);
        }

        @Override
        public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(condition, System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }

        private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException {
            long leftMillis = idleUntilTs - System.currentTimeMillis();

            while (TimeEfficientExecutor.this.alive && !condition.getAsBoolean() && leftMillis >= 1L) {
                Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
                leftMillis = idleUntilTs - System.currentTimeMillis();

                if (task != null) {
                    if (leftMillis >= 1L && task.guessExecutionTimeMillis() < leftMillis) {
                        task.run(new IdleTaskContext());
                    } else {
                        TimeEfficientExecutor.this.taskQueue.offer(task);
                    }
                }
            }
        }
    }
}

Note that you can't just step down the stack - and the stack is bound to the executing thread. That means that it is not possible to jump back into an underlying idleing task if some "Sub"-Task starts idleing. You have to "trust" what each task returns in the guessExecutionTimeMillis-Method.

Thanks to the PriorityQueue used in the Executor, the queue will always return the task with the lowest exeuction time.