Understanding LinkedTransferQueue in J.U.C Collections Framework

LinkedTransferQueue Overview

LinkedTransferQueue is a specialized blocking queue introduced in JDK 1.7 within the J.U.C package. Beyond standard blocking queue functionality, it provides a distinctive transfer method that enables direct element passing between producer and consumer threads.

In traditional blocking queues, consumer threads calling take or poll block when the queue is empty, waiting for producers to add elements. However, the transfer method operates differently:

  1. When consumer threads are waiting, producers calling transfer pass elements directly to consumers instead of enqueueing them
  2. If no consumer is waiting, the producer enqueues the element and blocks until a consumer retrieves it

TransferQueue Interface

LinkedTransferQueue implements the TransferQueue interface, which defines the transfer semantics:

tryTransfer(E e) — If no consumer is waiting to receive an element, this method returns false immediately. Unlike transfer, it does not wait for consumers regardless of availability.

tryTransfer(E e, long timeout, TimeUnit unit) — This variant adds timeout functionality. It returns false if no consumer consumes the element within the specified time, otherwise returns true.

Key characteristics of LinkedTransferQueue:

  1. Unbounded blocking queue based on a singly linked list
  2. Two node types: data nodes and request nodes
  3. Lock-free algorithm implementation

LinkedTransferQueue Implementation

Internal Structure

LinkedTransferQueue provides only default constructors and does not support initial capacity configuration, making it an unbounded queue.

Node Definition

static final class Node {
    final boolean isData;   // true: data node; false: request node
    volatile Object item;   // node value
    volatile Node next;     // next node pointer
    volatile Thread waiter; // waiting thread

    final boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    final boolean casItem(Object cmp, Object val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    Node(Object item, boolean isData) {
        UNSAFE.putObject(this, itemOffset, item);
        this.isData = isData;
    }

    final void forgetNext() {
        UNSAFE.putObject(this, nextOffset, this);
    }

    final void forgetContents() {
        UNSAFE.putObject(this, itemOffset, this);
        UNSAFE.putObject(this, waiterOffset, null);
    }

    final boolean isMatched() {
        Object x = item;
        return (x == this) || ((x == null) == isData);
    }

    final boolean isUnmatchedRequest() {
        return !isData && item == null;
    }

    final boolean cannotPrecede(boolean haveData) {
        boolean d = isData;
        Object x;
        return d != haveData && (x = item) != this && (x != null) == d;
    }

    final boolean tryMatchData() {
        Object x = item;
        if (x != null && x != this && casItem(x, null)) {
            LockSupport.unpark(waiter);
            return true;
        }
        return false;
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;
    private static final long waiterOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Key observations about Node:

  1. Nodes have two types distinguished by isData field; only different types can match each other
  2. The item field stores the node value, which changes during matching

Node state transitions:

State / Type Data Node Request Node
Before Match isData=true, item=value isData=false, item=null
After Match isData=true, item=null isData=false, item=this

A matched node satisfies: (item == this) || ((item == null) == isData).

Class Fields

public class LinkedTransferQueue<E> extends AbstractQueue<E>
        implements TransferQueue<E>, java.io.Serializable {

    private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
    private static final int FRONT_SPINS = 1 << 7;
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
    static final int SWEEP_THRESHOLD = 32;

    transient volatile Node head;
    private transient volatile Node tail;
    private transient volatile int sweepVotes;

    private boolean casTail(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }

    private boolean casHead(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
    }

    private boolean casSweepVotes(int cmp, int val) {
        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
    }

    private static final int NOW = 0;
    private static final int ASYNC = 1;
    private static final int SYNC = 2;
    private static final int TIMED = 3;

    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long sweepVotesOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = LinkedTransferQueue.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
            sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

The four mode constants control xfer method behavior:

NOW (0) — Immediate operations that may fail without blocking:

  • poll — retrieves and removes the head; returns null if empty
  • tryTransfer — attempts direct transfer; returns false if no waiting consumer

ASYNC (1) — Asynchronous operations that always succeed:

  • offer — inserts at tail, returns immediately (unbounded queue)
  • put — inserts at tail, returns immediately
  • add — inserts at tail, returns true

SYNC (2) — Synchronous operations that block the calling thread:

  • transfer — blocks until a consumer arrives
  • take — removes from head, blocks if empty

TIMED (3) — Timeout-based synchronous operations:

  • poll(long timeout, TimeUnit unit)
  • tryTransfer(E e, long timeout, TimeUnit unit)

Tarnsfer Method

The transfer method passes element e to a consumer (via take/poll). If a consumer is waiting, the element transfers directly; otherwise the element is enqueued and the producer blocks until a consumer arrives:

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

The transfer method delegates to xfer with mode SYNC=2:

private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();

    Node node = null;

    retry:
    for (;;) {
        for (Node h = head, p = h; p != null; ) {
            boolean isData = p.isData;
            Object item = p.item;
            if (item != p && (item != null) == isData) {
                if (isData == haveData)
                    break;
                if (p.casItem(item, e)) {
                    for (Node q = p; q != h; ) {
                        Node n = q.next;
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }
                        if ((h = head) == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;
                    }
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head);
        }

        if (how != NOW) {
            if (node == null)
                node = new Node(e, haveData);
            Node pred = tryAppend(node, haveData);
            if (pred == null)
                continue retry;
            if (how != ASYNC)
                return awaitMatch(node, pred, e, (how == TIMED), nanos);
        }
        return e;
    }
}

TryAppend Method

private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;
        } else if (p.cannotPrecede(haveData))
            return null;
        else if ((n = p.next) != null)
            p = p != t && t != (u = tail) ? (t = u) :
                (p != n) ? n : null;
        else if (!p.casNext(null, s))
            p = p.next;
        else {
            if (p != t) {
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null &&
                    (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

tryAppend returns:

  • null on failure
  • The node itself when the queue has only one element
  • The predecessor node when the queue has multiple elements

AwaitMatch Method

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = -1;
    ThreadLocalRandom randomYields = null;

    for (;;) {
        Object item = s.item;
        if (item != e) {
            s.forgetContents();
            return LinkedTransferQueue.<E>cast(item);
        }
        if ((w.isInterrupted() || (timed && nanos <= 0))
            && s.casItem(e, s)) {
            unsplice(pred, s);
            return e;
        }

        if (spins < 0) {
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        } else if (spins > 0) {
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();
        } else if (s.waiter == null) {
            s.waiter = w;
        } else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        } else {
            LockSupport.park(this);
        }
    }
}

awaitMatch implements a lock optimization pattern: spin → yield → block. Threads don't block immediately due to context switching overhead; they spin first with occasional yield operations, then block if no match appears.

Take Method

The take method removes an element from the head, blocking if the queue is empty:

public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

The consumer thread calls xfer with e=null and haveData=false, representing a request node.

Operation Flow

Data Flow Example

Initial state: Empty queue.

ThreadA calls transfer("9") — Enqueues a data node and blocks in awaitMatch. The head and tail both point to the new node. Due to slack, tail doesn't immediately follow for performance optimization.

ThreadB calls transfer("2") — Traverses from head looking for a matching request node. The head node is a data node with the same type, so matching fails. Enqueues "2" at the tail and blocks.

ThreadC calls transfer("93") — Same as ThreadB, enqueues at tail and blocks.

ThreadD calls take() — Finds the first unmatched data node "9". Uses CAS to replace its item with null, unpacks ThreadA, and returns "9".

ThreadA unparks — In awaitMatch, the item is already null (matched), so it calls forgetContents() and returns null.

ThreadE calls take() — Head now points to a matched node, so it skips ahead to find the first unmatched node "2". Matches it with null, unparks ThreadB, returns "2".

ThreadB unparks — Same as ThreadA, returns null.

ThreadF calls take() — Head directly points to unmatched node "93", matches it immediately, unparks ThreadC, returns "93".

ThreadC unparks — Returns null.

Request Node Flow

When a consumer calls take on an empty queue, it creates a request node and blocks:

private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;
        } else if (p.cannotPrecede(haveData))
            return null;
        else if ((n = p.next) != null)
            p = p != t && t != (u = tail) ? (t = u) :
                (p != n) ? n : null;
        else if (!p.casNext(null, s))
            p = p.next;
        else {
            if (p != t) {
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null &&
                    (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

This creates a request node with item=null and isData=false, then blocks waiting for a producer to arrive.

Blocking Queue Summary

Queue Type Bounded Near-Unbounded Unbounded Special
Lock-based ArrayBlockingQueue LinkedBlockingQueue, LinkedBlockingDeque - PriorityBlockingQueue, DelayQueue
Lock-free - - LinkedTransferQueue SynchronousQueue

LinkedTransferQueue combines SynchronousQueue semantics with lock-free performance while supporting actual data storage. It implements a Dual Queue using two node types (data and request), distinguished by the isData boolean field.

The slack optimization reduces CAS overhead by not immediately updating head/tail pointers when nodes are matched. The head and tail pointers update only when the distance to the nearest unmatched node exceeds the slack threshold (default: 2).

Tags: java Concurrency JUC Collections lock-free

Posted on Wed, 20 May 2026 07:39:34 +0000 by php4tric