In-Depth Java ThreadPoolExecutor Source Code Analysis

ThreadPoolExecutor Constructor Parameters

The ThreadPoolExecutor constructor provides the core parameters for configuring the thread pool:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

Key parameters include corePoolSize, maximumPoolSize, the BlockingQueue, keepAliveTime, ThreadFactory, and the rejection policy. Determining the optimal thread count depends on the specific workload; there is no universal formula.

The execute() Method

When submitting tasks via execute(), the thread pool follows a three-step process:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  1. If the current worker count is below corePoolSize, a new core worker is created directly.
  2. Otherwise, the task is offered to the blocking queue.
  3. If the queue is full, a new non-core worker is created; failure triggers the rejection policy.

The Worker Class

Internally, each worker is a wrapper thread that executes submitted tasks:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

A Worker holds a reference to its first task and encapsulates a Thread created by the ThreadFactory.

addWorker() : Creating Workers

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

After creation, the worker thread is started. This triggers the run() method, which calls runWorker().

runWorker() : Core Loop for Task Execution

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

This loop continuously fetches tasks from the queue via getTask(). Pool threads are reused; after completing one task, the worker immediately retrieves the next one. When no tasks remain, the worker is cleaned up.

The critical line task.run() calls the user's Runnable directly, not start(). The actual thread is the worker itself.

BlockingQueue Implementations

The choice of BlockingQueue affects how tasks are buffered. Below are common implementations:

ArrayBlockingQueue

A bounded, array-based queue with a single lock.

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
  • offer(E e): Returns false if the queue is full; otherwise, appends the element using a circular array.
  • poll(long timeout, TimeUnit unit): Waits up to the timeout for an element.
  • take(): Blocks indefinitely until an element is available.

LinkedBlockingQueue

An optionally bounded linked-list queue with separate locks for puts and takes.

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

Because it uses two locks (putLock and takeLock), enqueue and dequeue operations can proceed concurrently.

PriorityBlockingQueue

An unbounded priority queue based on a binary heap. Elements must implement Comparable or a Comparator must be provided.

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
  • The queue grows dynamically when full.
  • poll(), take(): Removes the element with the highest priority (smallest according to ordering).

SynchronousQueue

A zero-capacity queue where each insert must wait for a corresponding remove, and vice versa. It does not store elements internally.

public SynchronousQueue() {
    this(false);
}
  • transfer() method handles both put and take operations.
  • If a producer arrives and no consumer is waiting, it blocks (or returns false for offer()).
  • Commonly used for handoff scenarios, for example when each submitted task should be immediately executed by a free thread.

TransferQueue vs TransferStack:

  • TransferQueue: Uses a FIFO queue of nodes. A put either matches with a waiting consumer or creates a new node.
  • TransferStack: Uses a LIFO stack; default mode for SynchronousQueue(false).

DelayQueue

A queue where elements are only taken when thier delay has expired. Internally uses PriorityQueue.

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final PriorityQueue<E> q = new PriorityQueue<E>();
}

Elements must implement Delayed, which extends Comparable<Delayed>. The queue uses a lock and a Condition to block until the earliest delay expires.

Summary

  • Thread reuse: The core mechanism is the runWorker loop. Each worker thread repeatedly fetches and runs tasks from the queue, eliminating the need to create new threads for each task.
  • Queue comparison:
    • ArrayBlockingQueue and LinkedBlockingQueue store tasks; the former uses a single lock, the latter uses separate locks.
    • PriorityBlockingQueue orders tasks by priority.
    • SynchronousQueue does not buffer; it hands off tasks directly to waiting workers.
    • DelayQueue delays task execution until a specified time elapses.

When creating a thread pool, it is recommended to explicitly tune the parameters instead of relying on Executors default factories, which may use unbounded queues or excessively large thread counts, leading to resource exhaustion.

Tags: java ThreadPoolExecutor BlockingQueue Concurrency source code analysis

Posted on Sat, 27 Jun 2026 18:01:22 +0000 by zoidberg