Skip to main content
  1. Java Concurrency (java.util.concurrent)/

BlockingQueue Interface

3 mins

BlockingQueue extends the standard Queue interface to handle cases where the queue is empty (blocking the consumer) or full (blocking the producer).

Source Code #

View Source on GitHub

The “Four Sets of Methods” #

One of the most important aspects of BlockingQueue is how it handles operations that cannot be immediately satisfied. It provides four different behaviors for insertion, removal, and inspection:

OperationThrows ExceptionSpecial ValueBlocksTimes Out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()N/AN/A

Implementation Details #

Core Interface Methods #

Insert Operations:

// Throws exception if full
boolean add(E e) throws IllegalStateException;

// Returns false if full
boolean offer(E e);

// Blocks until space available
void put(E e) throws InterruptedException;

// Blocks with timeout
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

Remove Operations:

// Throws exception if empty
E remove() throws NoSuchElementException;

// Returns null if empty
E poll();

// Blocks until element available
E take() throws InterruptedException;

// Blocks with timeout
E poll(long timeout, TimeUnit unit) throws InterruptedException;

Examine Operations:

// Throws exception if empty
E element() throws NoSuchElementException;

// Returns null if empty
E peek();

Key Properties #

Null Handling:

// All implementations throw NullPointerException for null elements
// null is used as sentinel value for poll() operations

Capacity Management:

// Returns remaining capacity or Integer.MAX_VALUE for unbounded queues
int remainingCapacity();

Bulk Operations:

// Drain elements to collection
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);

Memory Consistency #

The interface specification guarantees:

// Actions in producer thread prior to put()
// happen-before actions in consumer thread after take()
// This provides safe publication of objects

Usage Example from JDK #

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { queue.put(produce()); }
        } catch (InterruptedException ex) { ... }
    }
    Object produce() { ... }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { consume(queue.take()); }
        } catch (InterruptedException ex) { ... }
    }
    void consume(Object x) { ... }
}

Important Notes #

  1. No Intrinsic Shutdown: No built-in “close” or “shutdown” mechanism
  2. Poison Pills: Common pattern uses special end-of-stream objects
  3. Bulk Operations: addAll, containsAll, etc. are not atomic
  4. Thread Safety: All implementations are thread-safe
  5. Multiple Producers/Consumers: All implementations support multiple producers and consumers

Exception Behavior #

Null Handling:

// All methods throw NullPointerException for null arguments
// This is consistent across all implementations

InterruptedException:

// put(), take(), and timed operations throw InterruptedException
// when thread is interrupted while waiting

IllegalStateException:

// add() throws IllegalStateException when queue is full

Canonical Usage #

When to use: Use BlockingQueue as the primary communication channel between threads in a producer-consumer architecture. It eliminates the need for manual wait() and notify() calls.

Key Properties:

  • Thread-Safe: All implementations are thread-safe.
  • No Nulls: BlockingQueue does not accept null elements (throws NullPointerException).
  • Capacity Bounded: Most implementations have an optional or mandatory capacity bound.
BlockingQueue<String> queue = new LinkedBlockingQueue<>(1024);

// Producer thread
executor.execute(() -> {
    try {
        queue.put("Task Data"); // Blocks if queue is full
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

// Consumer thread
executor.execute(() -> {
    try {
        String data = queue.take(); // Blocks if queue is empty
        process(data);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});