Java Concurrency Utilities: CountDownLatch, Semaphore, and CyclicBarrier

This article explores three essential synchronization aids in Java: CountDownLatch, Semaphore, and CyclicBarrier. Thece tools simplify thread coordination for common concurrency patterns.

CountDownLatch

CountDownLatch allows one or more threads to wait until a set of operations being performed in other threads completes. It is initialized with a count. The await() method blocks until the count reaches zero via countDown() calls.

Basic Example

import java.util.concurrent.CountDownLatch;

public class LatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " working");
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + " done");
        }, "worker-1").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " working");
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + " done");
        }, "worker-2").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " working");
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + " done");
        }, "worker-3").start();

        latch.await();
        System.out.println("All workers finished");
    }
}

This behaves similarly to Thread.join() but offers more flexibility. The constructor takes an int initial count. Each countDown() decrements it.

Simulating High Concurrency

import java.util.concurrent.CountDownLatch;

public class ConcurrentStarter {
    private static final CountDownLatch startLatch = new CountDownLatch(1);

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                try {
                    startLatch.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Thread: " + Thread.currentThread().getName());
            }).start();
        }
        startLatch.countDown(); // release all at once
    }
}

Any scenario requiring a task to wait for prerequisite operations can use CountDownLatch.

Under the Hood: AQS Shared Lock

CountDownLatch uses an internal Sync extending AbstractQueuedSynchronizer (AQS) in shared mode. The await() method calls acquireSharedInterruptibly; countDown() calls releaseShared.

acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

If three threads call await() while state > 0, they join an AQS queue and block.

countDown() and Release

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0) return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

doReleaseShared propagates wake‑up to all waiting threads in a loop, handling head node changes.

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

Once woken, the thread executes setHeadAndPropagate to make itself the new head and propagate the signal to the next waiter.

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

Semaphore

Semaphore (or "semaphore") controls the number of threads that can access a resource concurrently. Threads acquire permits via acquire() and release them via release(). It is commonly used for rate‑limiting.

Parking Lot Example

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class ParkingDemo {
    public static void main(String[] args) {
        Semaphore spots = new Semaphore(5);
        for (int i = 0; i < 10; i++) {
            new Car(i, spots).start();
        }
    }

    static class Car extends Thread {
        private final int id;
        private final Semaphore semaphore;

        Car(int id, Semaphore semaphore) {
            this.id = id;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("Car " + id + " parked");
                TimeUnit.SECONDS.sleep(2);
                semaphore.release();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Output shows that at most five cars park at a time.

Use Case

Most common is flow control (rate limiting).

Fair vs Non‑fair

Semaphore implements FairSync and NonfairSync based on AQS shared lock.

FairSync

static final class FairSync extends Sync {
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

NonfairSync

static final class NonfairSync extends Sync {
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

The only difference is hasQueuedPredecessors() check, which enforces fairness.

CyclicBarrier

CyclicBarrier allows a set of threads to wait for each other to reach a common barrier point. Unlike CountDownLatch, it is reusable (cyclic). Threads call await(); the barrier trips when the predefined number of parties have arrived.

Use Case

When all subtasks must complete before the main task can proceed.

Data Import Example

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class DataImporter extends Thread {
    private final CyclicBarrier barrier;
    private final String filePath;

    DataImporter(CyclicBarrier barrier, String filePath) {
        this.barrier = barrier;
        this.filePath = filePath;
    }

    @Override
    public void run() {
        System.out.println("Importing data from " + filePath);
        try {
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class CyclicBarrierExample {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, () ->
            System.out.println("All imports done. Start analysis.")
        );
        new DataImporter(barrier, "file1").start();
        new DataImporter(barrier, "file2").start();
        new DataImporter(barrier, "file3").start();
    }
}

Key Points

  • If fewer than the expected number of threads reach the barrier, all wait indefinitely (unless timeout).
  • Timeout variant: await(timeout, unit).
  • Calling reset() causes waiting threads to receive BrokenBarrierException.
  • The optional Runnable barrierAction is executed by the last thread to arrive.

Implementation

CyclicBarrier is built on top of ReentrantLock and Condition, making it simpler internally than CountDownLatch. It supports reuse after a barrier is tripped.

Tags: CountDownLatch Semaphore CyclicBarrier AQS Java Concurrency

Posted on Sat, 30 May 2026 22:15:49 +0000 by macpaul