Pipe
The Pipe class provides a unidirectional inter-thread communication channel with two ends: a readable SourceChannel and a writable SinkChannel. It’s implemented using native OS pipes and supports both blocking and non-blocking I/O with selector integration.
Source Code #
Core Implementation #
Pipe is an abstract class that encapsulates two nested channel classes: SourceChannel (read end) and SinkChannel (write end). The concrete implementation is PipeImpl in the sun.nio.ch package.
public abstract class Pipe {
// Readable end of the pipe
public abstract static class SourceChannel
extends AbstractSelectableChannel
implements ReadableByteChannel, ScatteringByteChannel {
protected SourceChannel(SelectorProvider provider) {
super(provider);
}
// Only supports read operations
public final int validOps() {
return SelectionKey.OP_READ;
}
}
// Writable end of the pipe
public abstract static class SinkChannel
extends AbstractSelectableChannel
implements WritableByteChannel, GatheringByteChannel {
protected SinkChannel(SelectorProvider provider) {
super(provider);
}
// Only supports write operations
public final int validOps() {
return SelectionKey.OP_WRITE;
}
}
// Factory method
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
// Access to pipe ends
public abstract SourceChannel source();
public abstract SinkChannel sink();
}
PipeImpl Internal Structure #
The concrete implementation PipeImpl creates the underlying OS pipe and manages its two channels:
class PipeImpl extends Pipe {
private final SourceChannelImpl source;
private final SinkChannelImpl sink;
PipeImpl(SelectorProvider sp) throws IOException {
// Create native pipe (returns read and write FDs packed in long)
long pipeFds = IOUtil.makePipe(true);
int readFd = (int) (pipeFds >>> 32); // High 32 bits: read end
int writeFd = (int) pipeFds; // Low 32 bits: write end
// Create file descriptors
FileDescriptor sourcefd = new FileDescriptor();
IOUtil.setfdVal(sourcefd, readFd);
FileDescriptor sinkfd = new FileDescriptor();
IOUtil.setfdVal(sinkfd, writeFd);
// Create channel implementations
source = new SourceChannelImpl(sp, sourcefd);
sink = new SinkChannelImpl(sp, sinkfd);
}
public SourceChannel source() { return source; }
public SinkChannel sink() { return sink; }
}
Channel Implementations #
SourceChannelImpl (Read End) #
class SourceChannelImpl extends Pipe.SourceChannel implements SelChImpl {
private final FileDescriptor fd; // Read end file descriptor
private final int fdVal; // Integer FD for native calls
// State management
private static final int ST_INUSE = 0; // Channel open and in use
private static final int ST_CLOSING = 1; // Channel closing in progress
private static final int ST_CLOSED = 2; // Channel closed
private int state; // Current state
// Locking strategy
private final ReentrantLock readLock = new ReentrantLock(); // For read operations
private final Object stateLock = new Object(); // For state changes
// Thread tracking
private long thread; // Native thread ID for blocking I/O
private volatile boolean forcedNonBlocking; // Virtual thread integration
// Valid operations (only OP_READ)
public final int validOps() {
return SelectionKey.OP_READ;
}
}
SinkChannelImpl (Write End) #
class SinkChannelImpl extends Pipe.SinkChannel implements SelChImpl {
private final FileDescriptor fd; // Write end file descriptor
private final int fdVal; // Integer FD for native calls
// Same state management as SourceChannelImpl
private static final int ST_INUSE = 0;
private static final int ST_CLOSING = 1;
private static final int ST_CLOSED = 2;
private int state;
// Locking strategy
private final ReentrantLock writeLock = new ReentrantLock(); // For write operations
private final Object stateLock = new Object(); // For state changes
// Thread tracking
private long thread;
private volatile boolean forcedNonBlocking;
// Valid operations (only OP_WRITE)
public final int validOps() {
return SelectionKey.OP_WRITE;
}
}
Basic Pipe Usage #
Creating and Using Pipes #
// Basic pipe creation and usage
public class BasicPipeExample {
public static void main(String[] args) throws IOException {
// Create a pipe
Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink(); // Write end
Pipe.SourceChannel source = pipe.source(); // Read end
// Write data to sink (producer thread)
Thread producer = new Thread(() -> {
try {
String message = "Hello from producer!";
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
sink.write(buffer);
System.out.println("Producer sent: " + message);
sink.close(); // Close sink when done
} catch (IOException e) {
e.printStackTrace();
}
});
// Read data from source (consumer thread)
Thread consumer = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = source.read(buffer);
if (bytesRead > 0) {
buffer.flip();
String message = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("Consumer received: " + message);
}
source.close(); // Close source when done
} catch (IOException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Using try-with-resources for automatic closing
public static void pipeWithResources() throws IOException {
try (Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
Pipe.SourceChannel source = pipe.source()) {
// Write data
ByteBuffer writeBuffer = ByteBuffer.wrap("Test data".getBytes());
sink.write(writeBuffer);
// Read data
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
source.read(readBuffer);
readBuffer.flip();
System.out.println("Received: " +
StandardCharsets.UTF_8.decode(readBuffer).toString());
}
}
Pipe for Inter-Thread Communication #
// Producer-Consumer pattern using Pipe
public class ProducerConsumerPipe {
private final Pipe pipe;
private final Pipe.SinkChannel sink;
private final Pipe.SourceChannel source;
public ProducerConsumerPipe() throws IOException {
this.pipe = Pipe.open();
this.sink = pipe.sink();
this.source = pipe.source();
}
public void startProducerConsumer() {
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
// Write to pipe
while (buffer.hasRemaining()) {
sink.write(buffer);
}
System.out.println("Produced: " + message);
Thread.sleep(100); // Simulate work
}
// Signal end of data
sink.close();
} catch (Exception e) {
e.printStackTrace();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
int bytesRead = source.read(buffer);
if (bytesRead == -1) {
// Pipe closed (end of data)
break;
}
if (bytesRead > 0) {
buffer.flip();
String message = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("Consumed: " + message);
}
}
source.close();
} catch (IOException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void close() throws IOException {
sink.close();
source.close();
}
}
Blocking vs Non-blocking Modes #
Blocking Mode (Default) #
// Blocking pipe operations
public static void blockingPipeExample() throws IOException {
Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
Pipe.SourceChannel source = pipe.source();
// Both channels start in blocking mode
assert sink.isBlocking() == true;
assert source.isBlocking() == true;
// Write operation blocks until data can be written
Thread writer = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.wrap("Data".getBytes());
sink.write(buffer); // Blocks if pipe buffer full
} catch (IOException e) {
e.printStackTrace();
}
});
// Read operation blocks until data available
Thread reader = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
source.read(buffer); // Blocks until data arrives
buffer.flip();
System.out.println("Read: " + StandardCharsets.UTF_8.decode(buffer));
} catch (IOException e) {
e.printStackTrace();
}
});
writer.start();
reader.start();
}
Non-blocking Mode with Selector #
// Non-blocking pipe with selector
public static void nonBlockingPipeExample() throws IOException {
Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
Pipe.SourceChannel source = pipe.source();
// Configure non-blocking mode
sink.configureBlocking(false);
source.configureBlocking(false);
// Create selector for multiplexing
Selector selector = Selector.open();
// Register source for read events
SelectionKey sourceKey = source.register(selector, SelectionKey.OP_READ);
// Register sink for write events (if needed)
SelectionKey sinkKey = sink.register(selector, SelectionKey.OP_WRITE);
// Write data (non-blocking)
Thread writer = new Thread(() -> {
try {
String data = "Hello, Pipe!";
ByteBuffer buffer = ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8));
// Try to write immediately
int written = sink.write(buffer);
if (written == 0) {
// Would block - wait for write readiness via selector
sinkKey.interestOps(SelectionKey.OP_WRITE);
}
} catch (IOException e) {
e.printStackTrace();
}
});
// Selector loop
Thread selectorThread = new Thread(() -> {
try {
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (SelectionKey key : selectedKeys) {
if (key.isReadable()) {
Pipe.SourceChannel channel = (Pipe.SourceChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
processData(buffer);
}
if (key.isWritable()) {
Pipe.SinkChannel channel = (Pipe.SinkChannel) key.channel();
// Write pending data
key.interestOps(0); // Remove write interest
}
}
selectedKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
});
writer.start();
selectorThread.start();
}
Virtual Thread Integration #
// Pipes automatically adapt for virtual threads
public static void pipeWithVirtualThreads() throws IOException {
try (Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
Pipe.SourceChannel source = pipe.source()) {
// Virtual thread automatically configures non-blocking sockets
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Producer virtual thread
executor.submit(() -> {
String data = "Data from virtual thread";
ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
// write() yields virtual thread if would block
sink.write(buffer);
System.out.println("Virtual thread produced data");
});
// Consumer virtual thread
executor.submit(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// read() yields virtual thread if no data
source.read(buffer);
buffer.flip();
System.out.println("Virtual thread consumed: " +
StandardCharsets.UTF_8.decode(buffer));
});
}
}
}
// Checking virtual thread configuration
public static void checkVirtualThreadConfiguration() throws IOException {
Pipe pipe = Pipe.open();
Pipe.SourceChannel source = pipe.source();
// Initially blocking
System.out.println("Initial blocking mode: " + source.isBlocking());
// When accessed from virtual thread, becomes non-blocking
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
try {
// This triggers auto-configuration
source.configureBlocking(false); // Explicit or implicit via I/O
System.out.println("In virtual thread, forced non-blocking: " +
isForcedNonBlocking(source));
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
// Helper method to check forced non-blocking state (reflection)
private static boolean isForcedNonBlocking(Pipe.SourceChannel channel) {
try {
// Access internal field (for demonstration only)
Field forcedNonBlockingField = channel.getClass()
.getDeclaredField("forcedNonBlocking");
forcedNonBlockingField.setAccessible(true);
return forcedNonBlockingField.getBoolean(channel);
} catch (Exception e) {
return false;
}
}
Scatter/Gather Operations #
// Scatter read from pipe source
public static void scatterReadExample() throws IOException {
try (Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
Pipe.SourceChannel source = pipe.source()) {
// Write data to pipe
ByteBuffer data = ByteBuffer.wrap("HeaderBodyFooter".getBytes());
sink.write(data);
sink.close(); // Close sink to signal end of data
// Scatter read into multiple buffers
ByteBuffer header = ByteBuffer.allocate(6); // "Header"
ByteBuffer body = ByteBuffer.allocate(4); // "Body"
ByteBuffer footer = ByteBuffer.allocate(6); // "Footer"
ByteBuffer[] buffers = {header, body, footer};
long totalRead = source.read(buffers);
System.out.println("Total bytes read: " + totalRead);
// Process each buffer
header.flip();
body.flip();
footer.flip();
System.out.println("Header: " + StandardCharsets.UTF_8.decode(header));
System.out.println("Body: " + StandardCharsets.UTF_8.decode(body));
System.out.println("Footer: " + StandardCharsets.UTF_8.decode(footer));
}
}
// Gather write to pipe sink
public static void gatherWriteExample() throws IOException {
try (Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
Pipe.SourceChannel source = pipe.source()) {
// Prepare multiple buffers
ByteBuffer header = ByteBuffer.wrap("HEADER".getBytes());
ByteBuffer payload = ByteBuffer.wrap("DATA".getBytes());
ByteBuffer trailer = ByteBuffer.wrap("END".getBytes());
ByteBuffer[] buffers = {header, payload, trailer};
// Gather write (writes from all buffers)
long totalWritten = sink.write(buffers);
System.out.println("Total bytes written: " + totalWritten);
sink.close(); // Close sink
// Read back to verify
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
source.read(readBuffer);
readBuffer.flip();
System.out.println("Combined data: " +
StandardCharsets.UTF_8.decode(readBuffer));
}
}
Performance Characteristics #
| Operation | Blocking Mode | Non-blocking Mode | Virtual Threads |
|---|---|---|---|
read() | Blocks until data | Returns 0 if no data | Yields if no data |
write() | Blocks until space | Returns 0 if no space | Yields if no space |
close() | May block for pending I/O | Non-blocking | Non-blocking |
| Selector registration | N/A | O(1) | O(1) |
| Native pipe creation | O(1) | O(1) | O(1) |
Key Performance Insights:
- OS pipe buffers: Typically 64KB on Linux, can affect throughput
- Zero-copy possible: Between threads in same process using native pipes
- Selector overhead: Minimal for pipe channels
- Virtual threads: Add minimal overhead with proper non-blocking configuration
Common Usage Patterns #
1. Thread Communication with Backpressure #
// Pipe with bounded capacity (simulated)
public class BoundedPipe {
private final Pipe pipe;
private final Semaphore availableCapacity;
private final int capacity;
public BoundedPipe(int capacity) throws IOException {
this.pipe = Pipe.open();
this.capacity = capacity;
this.availableCapacity = new Semaphore(capacity);
}
public void write(ByteBuffer data) throws IOException, InterruptedException {
// Acquire capacity permits (blocks if full)
int bytes = data.remaining();
availableCapacity.acquire(bytes);
try {
// Write to pipe
Pipe.SinkChannel sink = pipe.sink();
while (data.hasRemaining()) {
sink.write(data);
}
} catch (IOException e) {
// Release permits on error
availableCapacity.release(bytes);
throw e;
}
}
public ByteBuffer read(int maxBytes) throws IOException {
Pipe.SourceChannel source = pipe.source();
ByteBuffer buffer = ByteBuffer.allocate(maxBytes);
int bytesRead = source.read(buffer);
if (bytesRead > 0) {
buffer.flip();
// Release capacity
availableCapacity.release(bytesRead);
return buffer;
}
return null;
}
public int availableCapacity() {
return availableCapacity.availablePermits();
}
}
2. Pipeline Processing #
// Multi-stage pipeline using pipes
public class ProcessingPipeline {
private final List<Pipe> pipes;
private final List<Thread> processors;
public ProcessingPipeline(int stages) throws IOException {
this.pipes = new ArrayList<>(stages);
this.processors = new ArrayList<>(stages);
// Create pipes for each stage
for (int i = 0; i < stages; i++) {
pipes.add(Pipe.open());
}
}
public void start() {
for (int i = 0; i < pipes.size(); i++) {
final int stage = i;
final Pipe inputPipe = (stage == 0) ? null : pipes.get(stage - 1);
final Pipe outputPipe = pipes.get(stage);
Thread processor = new Thread(() -> {
try {
processStage(inputPipe, outputPipe, stage);
} catch (IOException e) {
e.printStackTrace();
}
});
processors.add(processor);
processor.start();
}
}
private void processStage(Pipe inputPipe, Pipe outputPipe, int stage)
throws IOException {
Pipe.SourceChannel source = (inputPipe != null) ? inputPipe.source() : null;
Pipe.SinkChannel sink = outputPipe.sink();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
if (source != null) {
// Read from previous stage
buffer.clear();
int bytesRead = source.read(buffer);
if (bytesRead == -1) {
// End of data
sink.close();
break;
}
if (bytesRead > 0) {
buffer.flip();
// Process data
processData(buffer, stage);
// Write to next stage
buffer.rewind();
sink.write(buffer);
}
} else {
// First stage - generate data
ByteBuffer data = generateData();
sink.write(data);
// Simulate end condition
if (shouldStop()) {
sink.close();
break;
}
}
}
}
public void feedData(ByteBuffer data) throws IOException {
Pipe.SinkChannel firstSink = pipes.get(0).sink();
firstSink.write(data);
}
public ByteBuffer collectResult() throws IOException {
Pipe.SourceChannel lastSource = pipes.get(pipes.size() - 1).source();
ByteBuffer result = ByteBuffer.allocate(1024);
lastSource.read(result);
result.flip();
return result;
}
}
3. Selector-based Multiplexing #
// Multiple pipes multiplexed with single selector
public class PipeMultiplexer {
private final Selector selector;
private final Map<SelectionKey, PipeHandler> handlers;
public PipeMultiplexer() throws IOException {
this.selector = Selector.open();
this.handlers = new HashMap<>();
}
public void addPipe(Pipe pipe, String name) throws IOException {
Pipe.SourceChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, SelectionKey.OP_READ);
handlers.put(key, new PipeHandler(pipe, name));
System.out.println("Added pipe: " + name);
}
public void run() throws IOException {
System.out.println("Pipe multiplexer started");
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (SelectionKey key : selectedKeys) {
if (key.isReadable()) {
handleReadable(key);
}
}
selectedKeys.clear();
}
}
private void handleReadable(SelectionKey key) throws IOException {
PipeHandler handler = handlers.get(key);
if (handler != null) {
handler.handleRead();
}
}
private static class PipeHandler {
private final Pipe pipe;
private final String name;
PipeHandler(Pipe pipe, String name) {
this.pipe = pipe;
this.name = name;
}
void handleRead() throws IOException {
Pipe.SourceChannel source = pipe.source();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = source.read(buffer);
if (bytesRead > 0) {
buffer.flip();
String data = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("[" + name + "] Received: " + data);
} else if (bytesRead == -1) {
System.out.println("[" + name + "] Pipe closed");
source.close();
}
}
}
}
Best Practices #
Always Close Both Ends:
// Correct - close both ends try (Pipe pipe = Pipe.open(); Pipe.SinkChannel sink = pipe.sink(); Pipe.SourceChannel source = pipe.source()) { // Use pipe } // Avoid - leaving channels open Pipe pipe = Pipe.open(); pipe.sink().close(); // Forgot to close sourceHandle Partial Reads/Writes:
// Always check return values ByteBuffer buffer = ByteBuffer.allocate(1024); int totalRead = 0; while (totalRead < expectedBytes) { int read = source.read(buffer); if (read == -1) break; // EOF totalRead += read; } // For writes while (buffer.hasRemaining()) { int written = sink.write(buffer); if (written == 0) { // Would block in non-blocking mode break; } }Use Appropriate Buffer Sizes:
// Match buffer size to typical message size public static final int PIPE_BUFFER_SIZE = 8192; // 8KB ByteBuffer buffer = ByteBuffer.allocate(PIPE_BUFFER_SIZE); // For large data, use scatter/gather ByteBuffer[] buffers = new ByteBuffer[] { ByteBuffer.allocate(128), // Header ByteBuffer.allocate(1024), // Body ByteBuffer.allocate(64) // Footer };Configure Non-blocking Mode Early:
// Configure before use if non-blocking needed Pipe pipe = Pipe.open(); pipe.source().configureBlocking(false); pipe.sink().configureBlocking(false); // Register with selector Selector selector = Selector.open(); pipe.source().register(selector, SelectionKey.OP_READ);Clean Up with Virtual Threads:
// Virtual threads auto-configure pipes try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { executor.submit(() -> { try (Pipe pipe = Pipe.open()) { // Pipe automatically non-blocking for virtual threads usePipe(pipe); } }); }
Common Pitfalls #
Forgetting to Close Sink for EOF Detection:
// Wrong - consumer hangs waiting for EOF Pipe pipe = Pipe.open(); pipe.sink().write(buffer); // Forgot to close sink // Consumer waits forever while (true) { int read = source.read(buffer); // Blocks forever } // Correct - close sink when done pipe.sink().write(buffer); pipe.sink().close(); // Signals EOF to readerIgnoring Return Values:
// Wrong - assumes all data written ByteBuffer buffer = ByteBuffer.wrap(data); sink.write(buffer); // May write partial data // Correct - handle partial writes while (buffer.hasRemaining()) { int written = sink.write(buffer); if (written == 0) { // Handle would-block break; } }Buffer Underflow with Scatter/Gather:
// Wrong - may not fill all buffers ByteBuffer[] buffers = new ByteBuffer[3]; // ... initialize buffers source.read(buffers); // May only fill first buffer // Process assuming all filled buffers[2].flip(); // BufferUnderflowException if not filled // Correct - check each buffer long totalRead = source.read(buffers); for (ByteBuffer buf : buffers) { if (buf.position() > 0) { buf.flip(); process(buf); } }Deadlock with Blocking Operations:
// Wrong - potential deadlock Pipe pipe = Pipe.open(); // Thread 1: writes large amount, blocks when pipe full sink.write(largeBuffer); // Blocks // Thread 2: supposed to read but never scheduled // Deadlock! // Correct - use non-blocking or ensure consumer runs sink.configureBlocking(false); // Or use separate threads properly synchronizedNot Handling Interrupts:
// Wrong - ignores interrupt during blocking I/O public void readFromPipe(Pipe.SourceChannel source) { try { ByteBuffer buffer = ByteBuffer.allocate(1024); source.read(buffer); // Blocks, may be interrupted } catch (IOException e) { // Lost interrupt status } } // Correct - preserve interrupt status public void readFromPipe(Pipe.SourceChannel source) { try { ByteBuffer buffer = ByteBuffer.allocate(1024); source.read(buffer); } catch (IOException e) { Thread.currentThread().interrupt(); // Restore interrupt throw new RuntimeException(e); } }
Internal Implementation Details #
Native Pipe Creation #
The IOUtil.makePipe(boolean blocking) native method creates an OS pipe:
// Native implementation creates pipe and sets blocking mode
private static native long makePipe0(boolean blocking);
// Java wrapper
static long makePipe(boolean blocking) throws IOException {
long fds = makePipe0(blocking);
if (fds == 0) {
throw new IOException("Pipe creation failed");
}
return fds;
}
// The returned long packs two file descriptors:
// - High 32 bits: read end file descriptor
// - Low 32 bits: write end file descriptor
Platform-specific implementations:
- Unix/Linux: Uses
pipe()orpipe2()system call - Windows: Uses
CreatePipe()API - Blocking mode: Configured via
fcntl()(Unix) or pipe creation flags (Windows)
Dual-Lock Synchronization Strategy #
Pipe channels use two locks for thread safety:
// Read operation in SourceChannelImpl
public int read(ByteBuffer dst) throws IOException {
readLock.lock(); // Only one thread can read at a time
try {
ensureOpen();
boolean blocking = isBlocking();
// ... perform read
return implRead(dst, blocking);
} finally {
readLock.unlock();
}
}
// State changes use separate lock
private void updateState(int newState) {
synchronized (stateLock) { // Protects state field
this.state = newState;
}
}
Lock Hierarchy Rules:
- Never hold
stateLockwhile performing blocking I/O readLock/writeLockare per-operation locks- State changes are atomic and protected by
stateLock
Virtual Thread Integration #
// Automatic non-blocking configuration for virtual threads
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
forcedNonBlocking = true; // Permanent change
}
}
}
// Used in all blocking operations
public int read(ByteBuffer dst) throws IOException {
readLock.lock();
try {
configureSocketNonBlockingIfVirtualThread(); // Auto-configure
// ... rest of read logic
} finally {
readLock.unlock();
}
}
Selector Integration #
Pipe channels implement SelChImpl for selector support:
// Event translation for selector
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
int intOps = ski.nioInterestOps();
int newOps = initialOps;
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
// Error/hangup makes all interested ops ready
newOps = intOps;
}
if (((ops & Net.POLLIN) != 0) &&
((intOps & SelectionKey.OP_READ) != 0)) {
newOps |= SelectionKey.OP_READ;
}
ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
Performance Optimization Techniques #
1. Buffer Pooling for High Throughput #
// Reuse buffers to reduce allocation overhead
public class PipeBufferPool {
private final Deque<ByteBuffer> bufferPool;
private final int bufferSize;
public PipeBufferPool(int bufferSize, int poolSize) {
this.bufferSize = bufferSize;
this.bufferPool = new ArrayDeque<>(poolSize);
// Pre-allocate buffers
for (int i = 0; i < poolSize; i++) {
bufferPool.add(ByteBuffer.allocateDirect(bufferSize));
}
}
public ByteBuffer acquire() {
ByteBuffer buffer = bufferPool.poll();
if (buffer == null) {
buffer = ByteBuffer.allocateDirect(bufferSize);
}
buffer.clear();
return buffer;
}
public void release(ByteBuffer buffer) {
if (buffer.isDirect() && buffer.capacity() == bufferSize) {
bufferPool.offer(buffer);
}
}
}
2. Batch Processing #
// Process multiple messages in batch
public static void batchProcess(Pipe.SourceChannel source,
Consumer<ByteBuffer> processor) throws IOException {
List<ByteBuffer> batch = new ArrayList<>();
ByteBuffer buffer = ByteBuffer.allocate(8192);
source.configureBlocking(false);
Selector selector = Selector.open();
source.register(selector, SelectionKey.OP_READ);
while (true) {
// Wait for data
selector.select(100); // 100ms timeout
// Read all available data
int read;
do {
buffer.clear();
read = source.read(buffer);
if (read > 0) {
buffer.flip();
ByteBuffer copy = ByteBuffer.allocate(read);
copy.put(buffer);
copy.flip();
batch.add(copy);
}
} while (read > 0);
// Process batch if we have data
if (!batch.isEmpty()) {
processor.accept(combineBuffers(batch));
batch.clear();
}
// Check for EOF
if (read == -1) {
break;
}
}
}
3. Zero-Copy Between Threads #
// Using direct buffers for zero-copy between threads
public static void zeroCopyPipeTransfer() throws IOException {
try (Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
Pipe.SourceChannel source = pipe.source()) {
// Allocate direct buffer (memory outside Java heap)
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024);
// Producer writes directly
directBuffer.put("Data".getBytes());
directBuffer.flip();
sink.write(directBuffer);
// Consumer reads same memory (no copy)
directBuffer.clear();
source.read(directBuffer);
directBuffer.flip();
// Data transferred without copying between threads
}
}
Related Classes #
- Pipe.SourceChannel: Readable end of the pipe
- Pipe.SinkChannel: Writable end of the pipe
- Selector: Multiplexes multiple pipe channels
- SelectionKey: Represents channel registration with selector
- ByteBuffer: Data container for pipe I/O
- ReadableByteChannel: Interface implemented by SourceChannel
- WritableByteChannel: Interface implemented by SinkChannel