BlockingQueue Interface
3 mins
BlockingQueue extends the standard Queue interface to handle cases where the queue is empty (blocking the consumer) or full (blocking the producer).
Source Code #
The “Four Sets of Methods” #
One of the most important aspects of BlockingQueue is how it handles operations that cannot be immediately satisfied. It provides four different behaviors for insertion, removal, and inspection:
| Operation | Throws Exception | Special Value | Blocks | Times Out |
|---|---|---|---|---|
| Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| Remove | remove() | poll() | take() | poll(time, unit) |
| Examine | element() | peek() | N/A | N/A |
Implementation Details #
Core Interface Methods #
Insert Operations:
// Throws exception if full
boolean add(E e) throws IllegalStateException;
// Returns false if full
boolean offer(E e);
// Blocks until space available
void put(E e) throws InterruptedException;
// Blocks with timeout
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Remove Operations:
// Throws exception if empty
E remove() throws NoSuchElementException;
// Returns null if empty
E poll();
// Blocks until element available
E take() throws InterruptedException;
// Blocks with timeout
E poll(long timeout, TimeUnit unit) throws InterruptedException;
Examine Operations:
// Throws exception if empty
E element() throws NoSuchElementException;
// Returns null if empty
E peek();
Key Properties #
Null Handling:
// All implementations throw NullPointerException for null elements
// null is used as sentinel value for poll() operations
Capacity Management:
// Returns remaining capacity or Integer.MAX_VALUE for unbounded queues
int remainingCapacity();
Bulk Operations:
// Drain elements to collection
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
Memory Consistency #
The interface specification guarantees:
// Actions in producer thread prior to put()
// happen-before actions in consumer thread after take()
// This provides safe publication of objects
Usage Example from JDK #
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... }
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... }
}
void consume(Object x) { ... }
}
Important Notes #
- No Intrinsic Shutdown: No built-in “close” or “shutdown” mechanism
- Poison Pills: Common pattern uses special end-of-stream objects
- Bulk Operations:
addAll,containsAll, etc. are not atomic - Thread Safety: All implementations are thread-safe
- Multiple Producers/Consumers: All implementations support multiple producers and consumers
Exception Behavior #
Null Handling:
// All methods throw NullPointerException for null arguments
// This is consistent across all implementations
InterruptedException:
// put(), take(), and timed operations throw InterruptedException
// when thread is interrupted while waiting
IllegalStateException:
// add() throws IllegalStateException when queue is full
Canonical Usage #
When to use: Use BlockingQueue as the primary communication channel between threads in a producer-consumer architecture. It eliminates the need for manual wait() and notify() calls.
Key Properties:
- Thread-Safe: All implementations are thread-safe.
- No Nulls:
BlockingQueuedoes not acceptnullelements (throwsNullPointerException). - Capacity Bounded: Most implementations have an optional or mandatory capacity bound.
BlockingQueue<String> queue = new LinkedBlockingQueue<>(1024);
// Producer thread
executor.execute(() -> {
try {
queue.put("Task Data"); // Blocks if queue is full
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
executor.execute(() -> {
try {
String data = queue.take(); // Blocks if queue is empty
process(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});