ThreadPoolExecutor Constructor Parameters
The ThreadPoolExecutor constructor provides the core parameters for configuring the thread pool:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Key parameters include corePoolSize, maximumPoolSize, the BlockingQueue, keepAliveTime, ThreadFactory, and the rejection policy. Determining the optimal thread count depends on the specific workload; there is no universal formula.
The execute() Method
When submitting tasks via execute(), the thread pool follows a three-step process:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- If the current worker count is below
corePoolSize, a new core worker is created directly. - Otherwise, the task is offered to the blocking queue.
- If the queue is full, a new non-core worker is created; failure triggers the rejection policy.
The Worker Class
Internally, each worker is a wrapper thread that executes submitted tasks:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
A Worker holds a reference to its first task and encapsulates a Thread created by the ThreadFactory.
addWorker() : Creating Workers
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
After creation, the worker thread is started. This triggers the run() method, which calls runWorker().
runWorker() : Core Loop for Task Execution
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
This loop continuously fetches tasks from the queue via getTask(). Pool threads are reused; after completing one task, the worker immediately retrieves the next one. When no tasks remain, the worker is cleaned up.
The critical line task.run() calls the user's Runnable directly, not start(). The actual thread is the worker itself.
BlockingQueue Implementations
The choice of BlockingQueue affects how tasks are buffered. Below are common implementations:
ArrayBlockingQueue
A bounded, array-based queue with a single lock.
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
offer(E e): Returnsfalseif the queue is full; otherwise, appends the element using a circular array.poll(long timeout, TimeUnit unit): Waits up to the timeout for an element.take(): Blocks indefinitely until an element is available.
LinkedBlockingQueue
An optionally bounded linked-list queue with separate locks for puts and takes.
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
Because it uses two locks (putLock and takeLock), enqueue and dequeue operations can proceed concurrently.
PriorityBlockingQueue
An unbounded priority queue based on a binary heap. Elements must implement Comparable or a Comparator must be provided.
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
- The queue grows dynamically when full.
poll(),take(): Removes the element with the highest priority (smallest according to ordering).
SynchronousQueue
A zero-capacity queue where each insert must wait for a corresponding remove, and vice versa. It does not store elements internally.
public SynchronousQueue() {
this(false);
}
transfer()method handles both put and take operations.- If a producer arrives and no consumer is waiting, it blocks (or returns
falseforoffer()). - Commonly used for handoff scenarios, for example when each submitted task should be immediately executed by a free thread.
TransferQueue vs TransferStack:
TransferQueue: Uses a FIFO queue of nodes. A put either matches with a waiting consumer or creates a new node.TransferStack: Uses a LIFO stack; default mode forSynchronousQueue(false).
DelayQueue
A queue where elements are only taken when thier delay has expired. Internally uses PriorityQueue.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final PriorityQueue<E> q = new PriorityQueue<E>();
}
Elements must implement Delayed, which extends Comparable<Delayed>. The queue uses a lock and a Condition to block until the earliest delay expires.
Summary
- Thread reuse: The core mechanism is the
runWorkerloop. Each worker thread repeatedly fetches and runs tasks from the queue, eliminating the need to create new threads for each task. - Queue comparison:
ArrayBlockingQueueandLinkedBlockingQueuestore tasks; the former uses a single lock, the latter uses separate locks.PriorityBlockingQueueorders tasks by priority.SynchronousQueuedoes not buffer; it hands off tasks directly to waiting workers.DelayQueuedelays task execution until a specified time elapses.
When creating a thread pool, it is recommended to explicitly tune the parameters instead of relying on Executors default factories, which may use unbounded queues or excessively large thread counts, leading to resource exhaustion.