Skip to main content
Java

What happens when the ExecutorService has no available threads

7 mins

Ticket Counter with Multiple Windows, limited service windows (threads), queue of people (tasks).

How is the ExecutorService created? #

Before we can understand what happens when the ExecutorService runs out of threads, we need to understand how the ExecutorService is created.

Typically, you create an ExecutorService using the Executors factory class. The Executors class provides several factory methods to create different types of ExecutorService instances. For example, you can create a single-threaded ExecutorService using the newSingleThreadExecutor method, or a fixed-size thread pool using the newFixedThreadPool method.

ExecutorService executorService = Executors.newFixedThreadPool(10);

But the Executors is just a convenience class. Examining at the source code of the Executors class, you’ll see that the ThreadPoolExecutor uses a LinkedBlockingQueue to create the ExecutorService for a fixed thread pool.

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, 
                        TimeUnit.MILLISECONDS, 
                        new LinkedBlockingQueue());
    }

So the executor service is composed of a thread pool for executing tasks and a queue for holding tasks that are waiting to be executed.

Submitting tasks to the ExecutorService #

When you submit a task to an ExecutorService, it will be executed by one of the threads in the thread pool.

When you submit a task to the ExecutorService and there are available threads in the thread pool, the task will be executed immediately. If there are no available threads, the task will be added to the queue.

flowchart TD A[Client Code] -->|"submit(task)"| B[ExecutorService] B -->|Check| C{Is Thread Pool Full?} C -->|Yes| D[Place Task in Queue] C -->|No| E[Execute Task] D -->|Wait for| G{Thread Available?} G -->|Yes| E

The default size of the queue is huge and can lead to out of memory errors.

Since the LinkedBlockingQueue is constructed without a capacity, it will default to Integer.MAX_VALUE, effectively making it an unbounded queue. This means that the queue can grow indefinitely, which can lead to out-of-memory errors if you submit too many tasks to the ExecutorService, and the tasks are not being processed fast enough.

So, your first approach would be to limit the size of the queue by using a bounded queue, i.e. supply a capacity to the LinkedBlockingQueue constructor.

The following code starts an ExecutorService with a fixed thread pool of 2 threads and a queue with a capacity of 10. It then submits 10 tasks to the ExecutorService. The tasks will be executed by the two threads in the thread pool, and any additional tasks will be added to the queue.

import java.util.concurrent.*;

public class ProcessorSubmitter {
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10);
    private final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L,TimeUnit.MILLISECONDS, queue);

    public void start() {

        for (int i = 0; i < 10; i++) {
            final int id = i + 1;
            System.out.println("Submitting task: " + id + ", queue size: " + queue.size());
            executorService.submit(() -> {
                System.out.println("Thread: " + Thread.currentThread().getName() + ", id: " + id + " started");
                try {
                    Thread.sleep(500 * id);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("Thread: " + Thread.currentThread().getName() + ", id: " + id + " finished");
            });
        }

        executorService.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        ProcessorSubmitter executorBlocking = new ProcessorSubmitter();
        executorBlocking.start();
    }
}

Here’s the output of the code. Notice the increase of the queue size as the tasks are submitted.

Submitting task: 1, queue size: 0
Submitting task: 2, queue size: 0
Submitting task: 3, queue size: 0
Submitting task: 4, queue size: 1
Submitting task: 5, queue size: 2
Submitting task: 6, queue size: 3
Submitting task: 7, queue size: 4
Submitting task: 8, queue size: 5
Submitting task: 9, queue size: 6
Submitting task: 10, queue size: 7
Thread: pool-1-thread-1, id: 1 started
Thread: pool-1-thread-2, id: 2 started
Thread: pool-1-thread-1, id: 1 finished
Thread: pool-1-thread-1, id: 3 started
Thread: pool-1-thread-2, id: 2 finished
Thread: pool-1-thread-2, id: 4 started
Thread: pool-1-thread-1, id: 3 finished
Thread: pool-1-thread-1, id: 5 started
Thread: pool-1-thread-2, id: 4 finished
Thread: pool-1-thread-2, id: 6 started
Thread: pool-1-thread-1, id: 5 finished
Thread: pool-1-thread-1, id: 7 started
Thread: pool-1-thread-2, id: 6 finished
Thread: pool-1-thread-2, id: 8 started
Thread: pool-1-thread-1, id: 7 finished
Thread: pool-1-thread-1, id: 9 started
Thread: pool-1-thread-2, id: 8 finished
Thread: pool-1-thread-2, id: 10 started
Thread: pool-1-thread-1, id: 9 finished
Thread: pool-1-thread-2, id: 10 finished

What if the ExecutorService Queue is Full? #

To simulate the queue being full, we can reduce the capacity of the queue to 2. This will cause the queue to be full after the first two tasks are submitted.

private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(2);

The output will show that by the 5th task, the queue is full and a RejectedExecutionException is thrown.

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@4a574795[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@30f39991[Wrapped task = com.programmerpulse.blockingexecutor.ExecutorBlocking$$Lambda$15/0x00007810f4001218@452b3a41]] rejected from java.util.concurrent.ThreadPoolExecutor@f6f4d33[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:123)
	at com.programmerpulse.blockingexecutor.ExecutorBlocking.start(ExecutorBlocking.java:17)
	at com.programmerpulse.blockingexecutor.ExecutorBlocking.main(ExecutorBlocking.java:48)

The submit method does not block when the queue is full.

This is unexpected because the LinkedBlockingQueue is a blocking queue and the put method should block until there is space in the queue. So you would expect the submit method to also block until there is space in the queue.

To examine what it going on, replace the LinkedBlockingQueue with a logging version.

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class LoggingQueue<E> extends LinkedBlockingQueue<E> {

    @Override
    public boolean offer(E e) {
        System.out.println("Offering: " + e);
        return super.offer(e);
    }

    @Override
    public int size() {
        System.out.println(">> size(): called");
        return super.size();
    }
}

Update ProcessorSubmitter class to use the LoggingQueue.

private final BlockingQueue<Runnable> queue = new LoggingQueue<>(2);

Now, when you run the code, you’ll see that only the offer and size methods are called, which are non-blocking methods, and no call is made to the put method.

>> size(): called
Submitting task: 1, queue size: 0
>> size(): called
Submitting task: 2, queue size: 0
>> size(): called
Submitting task: 3, queue size: 0
>> offer() called
>> size(): called
Submitting task: 4, queue size: 1
>> offer() called
>> size(): called

So, the ThreadPoolExecutor uses the offer and size methods to check if there is space in the queue, and if there is no space, it will throw the RejectedExecutionException.

Implement Blocking with a Semaphore #

If you want to implement blocking behaviour, you cannot use a custom queue, as only the offer and size methods are called, which makes it difficult to implement lock and unlock mechanisms, e.g. with mutexes or semaphores.

Instead it would be better to use a Sempahore at the client level. The blocking behaviour is then not dependent on the ExecutorService, but on the client code.

import java.util.concurrent.*;

public class ProcessorSubmitter {
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(2);
    private final Semaphore semaphore = new Semaphore(2);

    private final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L,TimeUnit.MILLISECONDS, queue);

    public void start() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            final int id = i + 1;
            System.out.println("Submitting task: " + id + ", queue size: " + queue.size());
            System.out.println("Acquiring semaphore: " + id);
            semaphore.acquire();
            executorService.submit(() -> {
                System.out.println("Thread: " + Thread.currentThread().getName() + ", id: " + id + " started");
                try {
                    Thread.sleep(500 * id);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                    System.out.println("Semaphore released: " + id);
                }


                System.out.println("Thread: " + Thread.currentThread().getName() + ", id: " + id + " finished");
            });
        }

        executorService.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        ProcessorSubmitter executorBlocking = new ProcessorSubmitter();
        executorBlocking.start();
    }
}

The output will show that the semaphore acquire method blocks prior to the submission call, and is only allowed to proceed once the release is called. Eventually all the tasks are executed.

Submitting task: 1, queue size: 0
Acquiring semaphore: 1
Submitting task: 2, queue size: 0
Acquiring semaphore: 2
Submitting task: 3, queue size: 0
Acquiring semaphore: 3
Thread: pool-1-thread-1, id: 1 started
Thread: pool-1-thread-2, id: 2 started
Submitting task: 4, queue size: 1
Semaphore released: 1
Acquiring semaphore: 4
Thread: pool-1-thread-1, id: 1 finished
Thread: pool-1-thread-1, id: 3 started
Semaphore released: 2
Thread: pool-1-thread-2, id: 2 finished
Submitting task: 5, queue size: 1
Acquiring semaphore: 5
Thread: pool-1-thread-2, id: 4 started
Semaphore released: 3
Thread: pool-1-thread-1, id: 3 finished
Submitting task: 6, queue size: 1
Thread: pool-1-thread-1, id: 5 started
Acquiring semaphore: 6
Semaphore released: 4
Thread: pool-1-thread-2, id: 4 finished
Thread: pool-1-thread-2, id: 6 started
Submitting task: 7, queue size: 0
Acquiring semaphore: 7
Semaphore released: 5
Thread: pool-1-thread-1, id: 5 finished
Submitting task: 8, queue size: 1
Thread: pool-1-thread-1, id: 7 started
Acquiring semaphore: 8
Semaphore released: 6
Thread: pool-1-thread-2, id: 6 finished
Submitting task: 9, queue size: 1
Thread: pool-1-thread-2, id: 8 started
Acquiring semaphore: 9
Semaphore released: 7
Thread: pool-1-thread-1, id: 7 finished
Submitting task: 10, queue size: 1
Thread: pool-1-thread-1, id: 9 started
Acquiring semaphore: 10
Semaphore released: 8
Thread: pool-1-thread-2, id: 8 finished
Thread: pool-1-thread-2, id: 10 started
Semaphore released: 9
Thread: pool-1-thread-1, id: 9 finished
Semaphore released: 10
Thread: pool-1-thread-2, id: 10 finished