Skip to main content
  1. Java Concurrency (java.util.concurrent)/

ArrayBlockingQueue

4 mins

ArrayBlockingQueue is a classic “bounded buffer” backed by a fixed-size array. It uses a single ReentrantLock and two Condition objects to manage access and blocking.

Source Code #

View Source on GitHub

Implementation Mechanism #

ArrayBlockingQueue maintains its elements in a circular array. It uses takeIndex and putIndex to track the next element to remove and the next slot for insertion. Both producers and consumers must acquire the same lock to perform any operation.

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

int takeIndex; // Index for next take
int putIndex;  // Index for next put
int count;     // Number of elements

Implementation Details #

Circular Array Mechanics #

The queue uses modular arithmetic for circular indexing:

static final int inc(int i, int modulus) {
    if (++i >= modulus) i = 0;
    return i;
}

static final int dec(int i, int modulus) {
    if (--i < 0) i = modulus - 1;
    return i;
}

Core Operations #

Enqueue (put/offer):

private void enqueue(E e) {
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0; // Wrap around
    count++;
    notEmpty.signal(); // Wake up waiting consumers
}

Dequeue (take/poll):

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null; // Help GC
    if (++takeIndex == items.length) takeIndex = 0; // Wrap around
    count--;
    if (itrs != null) itrs.elementDequeued();
    notFull.signal(); // Wake up waiting producers
    return e;
}

Blocking Operations #

Put (blocks when full):

public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await(); // Wait until not full
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

Take (blocks when empty):

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); // Wait until not empty
        return dequeue();
    } finally {
        lock.unlock();
    }
}

Timed Operations #

Timed versions use awaitNanos() for timeout handling:

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    Objects.requireNonNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

Iterator Support #

The queue tracks active iterators with the Itrs class:

transient Itrs itrs; // Shared state for currently active iterators

static final class Itrs {
    // ... iterator tracking logic
    void elementDequeued() {
        // Update iterator state when elements are removed
    }
    void removedAt(int removeIndex) {
        // Handle interior removals
    }
}

Removal Operations #

Interior removals require shifting elements:

void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // Fast path: removing front item
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null) itrs.elementDequeued();
    } else {
        // Interior remove: slide over all others
        for (int i = removeIndex, putIndex = this.putIndex;;) {
            int pred = i;
            if (++i == items.length) i = 0;
            if (i == putIndex) {
                items[pred] = null;
                this.putIndex = pred;
                break;
            }
            items[pred] = items[i]; // Shift elements left
        }
        count--;
        if (itrs != null) itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

Construction #

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

The fairness parameter affects the ReentrantLock behavior:

  • fair = true: FIFO ordering for waiting threads (lower throughput)
  • fair = false: Unspecified ordering (higher throughput)

Canonical Usage #

When to use: Use ArrayBlockingQueue when you have a fixed, known capacity requirement and want to minimize memory allocation overhead. It is best suited for scenarios where the producer and consumer are operating at similar speeds, as the single lock can become a bottleneck under high contention.

Common Patterns:

  • Fairness Policy: You can construct it with fair = true to ensure FIFO ordering of waiting threads, although this significantly reduces throughput.
  • Bounded Work Queues: It is the standard choice for ThreadPoolExecutor when a fixed capacity is needed to prevent memory exhaustion.
// Fixed-size buffer with 10 slots
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

// Put operation (blocks if full)
queue.put(42);

// Take operation (blocks if empty)
Integer result = queue.take();

Performance Trade-offs #

  • Pros: Zero per-element object allocation (no Node objects). Predictable memory footprint.
  • Cons: Single lock for both put and take operations. This causes contention between producers and consumers, limiting scalability on multi-core systems.