LinkedTransferQueue Overview
LinkedTransferQueue is a specialized blocking queue introduced in JDK 1.7 within the J.U.C package. Beyond standard blocking queue functionality, it provides a distinctive transfer method that enables direct element passing between producer and consumer threads.
In traditional blocking queues, consumer threads calling take or poll block when the queue is empty, waiting for producers to add elements. However, the transfer method operates differently:
- When consumer threads are waiting, producers calling
transferpass elements directly to consumers instead of enqueueing them - If no consumer is waiting, the producer enqueues the element and blocks until a consumer retrieves it
TransferQueue Interface
LinkedTransferQueue implements the TransferQueue interface, which defines the transfer semantics:
tryTransfer(E e) — If no consumer is waiting to receive an element, this method returns false immediately. Unlike transfer, it does not wait for consumers regardless of availability.
tryTransfer(E e, long timeout, TimeUnit unit) — This variant adds timeout functionality. It returns false if no consumer consumes the element within the specified time, otherwise returns true.
Key characteristics of LinkedTransferQueue:
- Unbounded blocking queue based on a singly linked list
- Two node types: data nodes and request nodes
- Lock-free algorithm implementation
LinkedTransferQueue Implementation
Internal Structure
LinkedTransferQueue provides only default constructors and does not support initial capacity configuration, making it an unbounded queue.
Node Definition
static final class Node {
final boolean isData; // true: data node; false: request node
volatile Object item; // node value
volatile Node next; // next node pointer
volatile Thread waiter; // waiting thread
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
final boolean casItem(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item);
this.isData = isData;
}
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
final boolean isUnmatchedRequest() {
return !isData && item == null;
}
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
final boolean tryMatchData() {
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Key observations about Node:
- Nodes have two types distinguished by
isDatafield; only different types can match each other - The
itemfield stores the node value, which changes during matching
Node state transitions:
| State / Type | Data Node | Request Node |
|---|---|---|
| Before Match | isData=true, item=value | isData=false, item=null |
| After Match | isData=true, item=null | isData=false, item=this |
A matched node satisfies: (item == this) || ((item == null) == isData).
Class Fields
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
private static final int FRONT_SPINS = 1 << 7;
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
static final int SWEEP_THRESHOLD = 32;
transient volatile Node head;
private transient volatile Node tail;
private transient volatile int sweepVotes;
private boolean casTail(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
private boolean casHead(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
private boolean casSweepVotes(int cmp, int val) {
return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
}
private static final int NOW = 0;
private static final int ASYNC = 1;
private static final int SYNC = 2;
private static final int TIMED = 3;
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long sweepVotesOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = LinkedTransferQueue.class;
headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes"));
} catch (Exception e) {
throw new Error(e);
}
}
}
The four mode constants control xfer method behavior:
NOW (0) — Immediate operations that may fail without blocking:
poll— retrieves and removes the head; returns null if emptytryTransfer— attempts direct transfer; returns false if no waiting consumer
ASYNC (1) — Asynchronous operations that always succeed:
offer— inserts at tail, returns immediately (unbounded queue)put— inserts at tail, returns immediatelyadd— inserts at tail, returns true
SYNC (2) — Synchronous operations that block the calling thread:
transfer— blocks until a consumer arrivestake— removes from head, blocks if empty
TIMED (3) — Timeout-based synchronous operations:
poll(long timeout, TimeUnit unit)tryTransfer(E e, long timeout, TimeUnit unit)
Tarnsfer Method
The transfer method passes element e to a consumer (via take/poll). If a consumer is waiting, the element transfers directly; otherwise the element is enqueued and the producer blocks until a consumer arrives:
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted();
throw new InterruptedException();
}
}
The transfer method delegates to xfer with mode SYNC=2:
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node node = null;
retry:
for (;;) {
for (Node h = head, p = h; p != null; ) {
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) {
if (isData == haveData)
break;
if (p.casItem(item, e)) {
for (Node q = p; q != h; ) {
Node n = q.next;
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
}
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break;
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head);
}
if (how != NOW) {
if (node == null)
node = new Node(e, haveData);
Node pred = tryAppend(node, haveData);
if (pred == null)
continue retry;
if (how != ASYNC)
return awaitMatch(node, pred, e, (how == TIMED), nanos);
}
return e;
}
}
TryAppend Method
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t; ; ) {
Node n, u;
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s;
} else if (p.cannotPrecede(haveData))
return null;
else if ((n = p.next) != null)
p = p != t && t != (u = tail) ? (t = u) :
(p != n) ? n : null;
else if (!p.casNext(null, s))
p = p.next;
else {
if (p != t) {
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null &&
(s = s.next) != null && s != t);
}
return p;
}
}
}
tryAppend returns:
nullon failure- The node itself when the queue has only one element
- The predecessor node when the queue has multiple elements
AwaitMatch Method
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1;
ThreadLocalRandom randomYields = null;
for (;;) {
Object item = s.item;
if (item != e) {
s.forgetContents();
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0))
&& s.casItem(e, s)) {
unsplice(pred, s);
return e;
}
if (spins < 0) {
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
} else if (spins > 0) {
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield();
} else if (s.waiter == null) {
s.waiter = w;
} else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
} else {
LockSupport.park(this);
}
}
}
awaitMatch implements a lock optimization pattern: spin → yield → block. Threads don't block immediately due to context switching overhead; they spin first with occasional yield operations, then block if no match appears.
Take Method
The take method removes an element from the head, blocking if the queue is empty:
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
The consumer thread calls xfer with e=null and haveData=false, representing a request node.
Operation Flow
Data Flow Example
Initial state: Empty queue.
ThreadA calls transfer("9") — Enqueues a data node and blocks in awaitMatch. The head and tail both point to the new node. Due to slack, tail doesn't immediately follow for performance optimization.
ThreadB calls transfer("2") — Traverses from head looking for a matching request node. The head node is a data node with the same type, so matching fails. Enqueues "2" at the tail and blocks.
ThreadC calls transfer("93") — Same as ThreadB, enqueues at tail and blocks.
ThreadD calls take() — Finds the first unmatched data node "9". Uses CAS to replace its item with null, unpacks ThreadA, and returns "9".
ThreadA unparks — In awaitMatch, the item is already null (matched), so it calls forgetContents() and returns null.
ThreadE calls take() — Head now points to a matched node, so it skips ahead to find the first unmatched node "2". Matches it with null, unparks ThreadB, returns "2".
ThreadB unparks — Same as ThreadA, returns null.
ThreadF calls take() — Head directly points to unmatched node "93", matches it immediately, unparks ThreadC, returns "93".
ThreadC unparks — Returns null.
Request Node Flow
When a consumer calls take on an empty queue, it creates a request node and blocks:
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t; ; ) {
Node n, u;
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s;
} else if (p.cannotPrecede(haveData))
return null;
else if ((n = p.next) != null)
p = p != t && t != (u = tail) ? (t = u) :
(p != n) ? n : null;
else if (!p.casNext(null, s))
p = p.next;
else {
if (p != t) {
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null &&
(s = s.next) != null && s != t);
}
return p;
}
}
}
This creates a request node with item=null and isData=false, then blocks waiting for a producer to arrive.
Blocking Queue Summary
| Queue Type | Bounded | Near-Unbounded | Unbounded | Special |
|---|---|---|---|---|
| Lock-based | ArrayBlockingQueue | LinkedBlockingQueue, LinkedBlockingDeque | - | PriorityBlockingQueue, DelayQueue |
| Lock-free | - | - | LinkedTransferQueue | SynchronousQueue |
LinkedTransferQueue combines SynchronousQueue semantics with lock-free performance while supporting actual data storage. It implements a Dual Queue using two node types (data and request), distinguished by the isData boolean field.
The slack optimization reduces CAS overhead by not immediately updating head/tail pointers when nodes are matched. The head and tail pointers update only when the distance to the nearest unmatched node exceeds the slack threshold (default: 2).