ArrayBlockingQueue
ArrayBlockingQueue is a classic “bounded buffer” backed by a fixed-size array. It uses a single ReentrantLock and two Condition objects to manage access and blocking.
Source Code #
Implementation Mechanism #
ArrayBlockingQueue maintains its elements in a circular array. It uses takeIndex and putIndex to track the next element to remove and the next slot for insertion. Both producers and consumers must acquire the same lock to perform any operation.
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
int takeIndex; // Index for next take
int putIndex; // Index for next put
int count; // Number of elements
Implementation Details #
Circular Array Mechanics #
The queue uses modular arithmetic for circular indexing:
static final int inc(int i, int modulus) {
if (++i >= modulus) i = 0;
return i;
}
static final int dec(int i, int modulus) {
if (--i < 0) i = modulus - 1;
return i;
}
Core Operations #
Enqueue (put/offer):
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0; // Wrap around
count++;
notEmpty.signal(); // Wake up waiting consumers
}
Dequeue (take/poll):
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null; // Help GC
if (++takeIndex == items.length) takeIndex = 0; // Wrap around
count--;
if (itrs != null) itrs.elementDequeued();
notFull.signal(); // Wake up waiting producers
return e;
}
Blocking Operations #
Put (blocks when full):
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // Wait until not full
enqueue(e);
} finally {
lock.unlock();
}
}
Take (blocks when empty):
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // Wait until not empty
return dequeue();
} finally {
lock.unlock();
}
}
Timed Operations #
Timed versions use awaitNanos() for timeout handling:
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
Iterator Support #
The queue tracks active iterators with the Itrs class:
transient Itrs itrs; // Shared state for currently active iterators
static final class Itrs {
// ... iterator tracking logic
void elementDequeued() {
// Update iterator state when elements are removed
}
void removedAt(int removeIndex) {
// Handle interior removals
}
}
Removal Operations #
Interior removals require shifting elements:
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// Fast path: removing front item
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null) itrs.elementDequeued();
} else {
// Interior remove: slide over all others
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
items[pred] = items[i]; // Shift elements left
}
count--;
if (itrs != null) itrs.removedAt(removeIndex);
}
notFull.signal();
}
Construction #
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
The fairness parameter affects the ReentrantLock behavior:
fair = true: FIFO ordering for waiting threads (lower throughput)fair = false: Unspecified ordering (higher throughput)
Canonical Usage #
When to use: Use ArrayBlockingQueue when you have a fixed, known capacity requirement and want to minimize memory allocation overhead. It is best suited for scenarios where the producer and consumer are operating at similar speeds, as the single lock can become a bottleneck under high contention.
Common Patterns:
- Fairness Policy: You can construct it with
fair = trueto ensure FIFO ordering of waiting threads, although this significantly reduces throughput. - Bounded Work Queues: It is the standard choice for
ThreadPoolExecutorwhen a fixed capacity is needed to prevent memory exhaustion.
// Fixed-size buffer with 10 slots
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// Put operation (blocks if full)
queue.put(42);
// Take operation (blocks if empty)
Integer result = queue.take();
Performance Trade-offs #
- Pros: Zero per-element object allocation (no
Nodeobjects). Predictable memory footprint. - Cons: Single lock for both put and take operations. This causes contention between producers and consumers, limiting scalability on multi-core systems.