Skip to main content
  1. Java NIO (New I/O)/

Pipe

17 mins

The Pipe class provides a unidirectional inter-thread communication channel with two ends: a readable SourceChannel and a writable SinkChannel. It’s implemented using native OS pipes and supports both blocking and non-blocking I/O with selector integration.

Source Code #

View Source on GitHub

Core Implementation #

Pipe is an abstract class that encapsulates two nested channel classes: SourceChannel (read end) and SinkChannel (write end). The concrete implementation is PipeImpl in the sun.nio.ch package.

public abstract class Pipe {
    
    // Readable end of the pipe
    public abstract static class SourceChannel
        extends AbstractSelectableChannel
        implements ReadableByteChannel, ScatteringByteChannel {
        
        protected SourceChannel(SelectorProvider provider) {
            super(provider);
        }
        
        // Only supports read operations
        public final int validOps() {
            return SelectionKey.OP_READ;
        }
    }
    
    // Writable end of the pipe
    public abstract static class SinkChannel
        extends AbstractSelectableChannel
        implements WritableByteChannel, GatheringByteChannel {
        
        protected SinkChannel(SelectorProvider provider) {
            super(provider);
        }
        
        // Only supports write operations
        public final int validOps() {
            return SelectionKey.OP_WRITE;
        }
    }
    
    // Factory method
    public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
    }
    
    // Access to pipe ends
    public abstract SourceChannel source();
    public abstract SinkChannel sink();
}

PipeImpl Internal Structure #

The concrete implementation PipeImpl creates the underlying OS pipe and manages its two channels:

class PipeImpl extends Pipe {
    private final SourceChannelImpl source;
    private final SinkChannelImpl sink;
    
    PipeImpl(SelectorProvider sp) throws IOException {
        // Create native pipe (returns read and write FDs packed in long)
        long pipeFds = IOUtil.makePipe(true);
        int readFd = (int) (pipeFds >>> 32);  // High 32 bits: read end
        int writeFd = (int) pipeFds;          // Low 32 bits: write end
        
        // Create file descriptors
        FileDescriptor sourcefd = new FileDescriptor();
        IOUtil.setfdVal(sourcefd, readFd);
        
        FileDescriptor sinkfd = new FileDescriptor();
        IOUtil.setfdVal(sinkfd, writeFd);
        
        // Create channel implementations
        source = new SourceChannelImpl(sp, sourcefd);
        sink = new SinkChannelImpl(sp, sinkfd);
    }
    
    public SourceChannel source() { return source; }
    public SinkChannel sink() { return sink; }
}

Channel Implementations #

SourceChannelImpl (Read End) #

class SourceChannelImpl extends Pipe.SourceChannel implements SelChImpl {
    private final FileDescriptor fd;      // Read end file descriptor
    private final int fdVal;              // Integer FD for native calls
    
    // State management
    private static final int ST_INUSE = 0;    // Channel open and in use
    private static final int ST_CLOSING = 1;  // Channel closing in progress
    private static final int ST_CLOSED = 2;   // Channel closed
    private int state;                        // Current state
    
    // Locking strategy
    private final ReentrantLock readLock = new ReentrantLock(); // For read operations
    private final Object stateLock = new Object();              // For state changes
    
    // Thread tracking
    private long thread;                    // Native thread ID for blocking I/O
    private volatile boolean forcedNonBlocking; // Virtual thread integration
    
    // Valid operations (only OP_READ)
    public final int validOps() {
        return SelectionKey.OP_READ;
    }
}

SinkChannelImpl (Write End) #

class SinkChannelImpl extends Pipe.SinkChannel implements SelChImpl {
    private final FileDescriptor fd;      // Write end file descriptor
    private final int fdVal;              // Integer FD for native calls
    
    // Same state management as SourceChannelImpl
    private static final int ST_INUSE = 0;
    private static final int ST_CLOSING = 1;
    private static final int ST_CLOSED = 2;
    private int state;
    
    // Locking strategy
    private final ReentrantLock writeLock = new ReentrantLock(); // For write operations
    private final Object stateLock = new Object();               // For state changes
    
    // Thread tracking
    private long thread;
    private volatile boolean forcedNonBlocking;
    
    // Valid operations (only OP_WRITE)
    public final int validOps() {
        return SelectionKey.OP_WRITE;
    }
}

Basic Pipe Usage #

Creating and Using Pipes #

// Basic pipe creation and usage
public class BasicPipeExample {
    public static void main(String[] args) throws IOException {
        // Create a pipe
        Pipe pipe = Pipe.open();
        Pipe.SinkChannel sink = pipe.sink();   // Write end
        Pipe.SourceChannel source = pipe.source(); // Read end
        
        // Write data to sink (producer thread)
        Thread producer = new Thread(() -> {
            try {
                String message = "Hello from producer!";
                ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
                sink.write(buffer);
                System.out.println("Producer sent: " + message);
                sink.close(); // Close sink when done
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        // Read data from source (consumer thread)
        Thread consumer = new Thread(() -> {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int bytesRead = source.read(buffer);
                if (bytesRead > 0) {
                    buffer.flip();
                    String message = StandardCharsets.UTF_8.decode(buffer).toString();
                    System.out.println("Consumer received: " + message);
                }
                source.close(); // Close source when done
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        producer.start();
        consumer.start();
        
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// Using try-with-resources for automatic closing
public static void pipeWithResources() throws IOException {
    try (Pipe pipe = Pipe.open();
         Pipe.SinkChannel sink = pipe.sink();
         Pipe.SourceChannel source = pipe.source()) {
        
        // Write data
        ByteBuffer writeBuffer = ByteBuffer.wrap("Test data".getBytes());
        sink.write(writeBuffer);
        
        // Read data
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        source.read(readBuffer);
        readBuffer.flip();
        
        System.out.println("Received: " + 
            StandardCharsets.UTF_8.decode(readBuffer).toString());
    }
}

Pipe for Inter-Thread Communication #

// Producer-Consumer pattern using Pipe
public class ProducerConsumerPipe {
    private final Pipe pipe;
    private final Pipe.SinkChannel sink;
    private final Pipe.SourceChannel source;
    
    public ProducerConsumerPipe() throws IOException {
        this.pipe = Pipe.open();
        this.sink = pipe.sink();
        this.source = pipe.source();
    }
    
    public void startProducerConsumer() {
        // Producer thread
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String message = "Message " + i;
                    ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
                    
                    // Write to pipe
                    while (buffer.hasRemaining()) {
                        sink.write(buffer);
                    }
                    
                    System.out.println("Produced: " + message);
                    Thread.sleep(100); // Simulate work
                }
                
                // Signal end of data
                sink.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        
        // Consumer thread
        Thread consumer = new Thread(() -> {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                while (true) {
                    buffer.clear();
                    int bytesRead = source.read(buffer);
                    
                    if (bytesRead == -1) {
                        // Pipe closed (end of data)
                        break;
                    }
                    
                    if (bytesRead > 0) {
                        buffer.flip();
                        String message = StandardCharsets.UTF_8.decode(buffer).toString();
                        System.out.println("Consumed: " + message);
                    }
                }
                
                source.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        producer.start();
        consumer.start();
        
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public void close() throws IOException {
        sink.close();
        source.close();
    }
}

Blocking vs Non-blocking Modes #

Blocking Mode (Default) #

// Blocking pipe operations
public static void blockingPipeExample() throws IOException {
    Pipe pipe = Pipe.open();
    Pipe.SinkChannel sink = pipe.sink();
    Pipe.SourceChannel source = pipe.source();
    
    // Both channels start in blocking mode
    assert sink.isBlocking() == true;
    assert source.isBlocking() == true;
    
    // Write operation blocks until data can be written
    Thread writer = new Thread(() -> {
        try {
            ByteBuffer buffer = ByteBuffer.wrap("Data".getBytes());
            sink.write(buffer); // Blocks if pipe buffer full
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    // Read operation blocks until data available
    Thread reader = new Thread(() -> {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            source.read(buffer); // Blocks until data arrives
            buffer.flip();
            System.out.println("Read: " + StandardCharsets.UTF_8.decode(buffer));
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    writer.start();
    reader.start();
}

Non-blocking Mode with Selector #

// Non-blocking pipe with selector
public static void nonBlockingPipeExample() throws IOException {
    Pipe pipe = Pipe.open();
    Pipe.SinkChannel sink = pipe.sink();
    Pipe.SourceChannel source = pipe.source();
    
    // Configure non-blocking mode
    sink.configureBlocking(false);
    source.configureBlocking(false);
    
    // Create selector for multiplexing
    Selector selector = Selector.open();
    
    // Register source for read events
    SelectionKey sourceKey = source.register(selector, SelectionKey.OP_READ);
    
    // Register sink for write events (if needed)
    SelectionKey sinkKey = sink.register(selector, SelectionKey.OP_WRITE);
    
    // Write data (non-blocking)
    Thread writer = new Thread(() -> {
        try {
            String data = "Hello, Pipe!";
            ByteBuffer buffer = ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8));
            
            // Try to write immediately
            int written = sink.write(buffer);
            
            if (written == 0) {
                // Would block - wait for write readiness via selector
                sinkKey.interestOps(SelectionKey.OP_WRITE);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    // Selector loop
    Thread selectorThread = new Thread(() -> {
        try {
            while (true) {
                selector.select();
                
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                for (SelectionKey key : selectedKeys) {
                    if (key.isReadable()) {
                        Pipe.SourceChannel channel = (Pipe.SourceChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        channel.read(buffer);
                        buffer.flip();
                        processData(buffer);
                    }
                    
                    if (key.isWritable()) {
                        Pipe.SinkChannel channel = (Pipe.SinkChannel) key.channel();
                        // Write pending data
                        key.interestOps(0); // Remove write interest
                    }
                }
                selectedKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    writer.start();
    selectorThread.start();
}

Virtual Thread Integration #

// Pipes automatically adapt for virtual threads
public static void pipeWithVirtualThreads() throws IOException {
    try (Pipe pipe = Pipe.open();
         Pipe.SinkChannel sink = pipe.sink();
         Pipe.SourceChannel source = pipe.source()) {
        
        // Virtual thread automatically configures non-blocking sockets
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // Producer virtual thread
            executor.submit(() -> {
                String data = "Data from virtual thread";
                ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
                
                // write() yields virtual thread if would block
                sink.write(buffer);
                System.out.println("Virtual thread produced data");
            });
            
            // Consumer virtual thread
            executor.submit(() -> {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                // read() yields virtual thread if no data
                source.read(buffer);
                buffer.flip();
                
                System.out.println("Virtual thread consumed: " +
                    StandardCharsets.UTF_8.decode(buffer));
            });
        }
    }
}

// Checking virtual thread configuration
public static void checkVirtualThreadConfiguration() throws IOException {
    Pipe pipe = Pipe.open();
    Pipe.SourceChannel source = pipe.source();
    
    // Initially blocking
    System.out.println("Initial blocking mode: " + source.isBlocking());
    
    // When accessed from virtual thread, becomes non-blocking
    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        executor.submit(() -> {
            try {
                // This triggers auto-configuration
                source.configureBlocking(false); // Explicit or implicit via I/O
                System.out.println("In virtual thread, forced non-blocking: " + 
                    isForcedNonBlocking(source));
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
}

// Helper method to check forced non-blocking state (reflection)
private static boolean isForcedNonBlocking(Pipe.SourceChannel channel) {
    try {
        // Access internal field (for demonstration only)
        Field forcedNonBlockingField = channel.getClass()
            .getDeclaredField("forcedNonBlocking");
        forcedNonBlockingField.setAccessible(true);
        return forcedNonBlockingField.getBoolean(channel);
    } catch (Exception e) {
        return false;
    }
}

Scatter/Gather Operations #

// Scatter read from pipe source
public static void scatterReadExample() throws IOException {
    try (Pipe pipe = Pipe.open();
         Pipe.SinkChannel sink = pipe.sink();
         Pipe.SourceChannel source = pipe.source()) {
        
        // Write data to pipe
        ByteBuffer data = ByteBuffer.wrap("HeaderBodyFooter".getBytes());
        sink.write(data);
        sink.close(); // Close sink to signal end of data
        
        // Scatter read into multiple buffers
        ByteBuffer header = ByteBuffer.allocate(6);  // "Header"
        ByteBuffer body = ByteBuffer.allocate(4);    // "Body"
        ByteBuffer footer = ByteBuffer.allocate(6);  // "Footer"
        
        ByteBuffer[] buffers = {header, body, footer};
        long totalRead = source.read(buffers);
        
        System.out.println("Total bytes read: " + totalRead);
        
        // Process each buffer
        header.flip();
        body.flip();
        footer.flip();
        
        System.out.println("Header: " + StandardCharsets.UTF_8.decode(header));
        System.out.println("Body: " + StandardCharsets.UTF_8.decode(body));
        System.out.println("Footer: " + StandardCharsets.UTF_8.decode(footer));
    }
}

// Gather write to pipe sink
public static void gatherWriteExample() throws IOException {
    try (Pipe pipe = Pipe.open();
         Pipe.SinkChannel sink = pipe.sink();
         Pipe.SourceChannel source = pipe.source()) {
        
        // Prepare multiple buffers
        ByteBuffer header = ByteBuffer.wrap("HEADER".getBytes());
        ByteBuffer payload = ByteBuffer.wrap("DATA".getBytes());
        ByteBuffer trailer = ByteBuffer.wrap("END".getBytes());
        
        ByteBuffer[] buffers = {header, payload, trailer};
        
        // Gather write (writes from all buffers)
        long totalWritten = sink.write(buffers);
        System.out.println("Total bytes written: " + totalWritten);
        
        sink.close(); // Close sink
        
        // Read back to verify
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        source.read(readBuffer);
        readBuffer.flip();
        
        System.out.println("Combined data: " + 
            StandardCharsets.UTF_8.decode(readBuffer));
    }
}

Performance Characteristics #

OperationBlocking ModeNon-blocking ModeVirtual Threads
read()Blocks until dataReturns 0 if no dataYields if no data
write()Blocks until spaceReturns 0 if no spaceYields if no space
close()May block for pending I/ONon-blockingNon-blocking
Selector registrationN/AO(1)O(1)
Native pipe creationO(1)O(1)O(1)

Key Performance Insights:

  • OS pipe buffers: Typically 64KB on Linux, can affect throughput
  • Zero-copy possible: Between threads in same process using native pipes
  • Selector overhead: Minimal for pipe channels
  • Virtual threads: Add minimal overhead with proper non-blocking configuration

Common Usage Patterns #

1. Thread Communication with Backpressure #

// Pipe with bounded capacity (simulated)
public class BoundedPipe {
    private final Pipe pipe;
    private final Semaphore availableCapacity;
    private final int capacity;
    
    public BoundedPipe(int capacity) throws IOException {
        this.pipe = Pipe.open();
        this.capacity = capacity;
        this.availableCapacity = new Semaphore(capacity);
    }
    
    public void write(ByteBuffer data) throws IOException, InterruptedException {
        // Acquire capacity permits (blocks if full)
        int bytes = data.remaining();
        availableCapacity.acquire(bytes);
        
        try {
            // Write to pipe
            Pipe.SinkChannel sink = pipe.sink();
            while (data.hasRemaining()) {
                sink.write(data);
            }
        } catch (IOException e) {
            // Release permits on error
            availableCapacity.release(bytes);
            throw e;
        }
    }
    
    public ByteBuffer read(int maxBytes) throws IOException {
        Pipe.SourceChannel source = pipe.source();
        ByteBuffer buffer = ByteBuffer.allocate(maxBytes);
        
        int bytesRead = source.read(buffer);
        if (bytesRead > 0) {
            buffer.flip();
            
            // Release capacity
            availableCapacity.release(bytesRead);
            return buffer;
        }
        
        return null;
    }
    
    public int availableCapacity() {
        return availableCapacity.availablePermits();
    }
}

2. Pipeline Processing #

// Multi-stage pipeline using pipes
public class ProcessingPipeline {
    private final List<Pipe> pipes;
    private final List<Thread> processors;
    
    public ProcessingPipeline(int stages) throws IOException {
        this.pipes = new ArrayList<>(stages);
        this.processors = new ArrayList<>(stages);
        
        // Create pipes for each stage
        for (int i = 0; i < stages; i++) {
            pipes.add(Pipe.open());
        }
    }
    
    public void start() {
        for (int i = 0; i < pipes.size(); i++) {
            final int stage = i;
            final Pipe inputPipe = (stage == 0) ? null : pipes.get(stage - 1);
            final Pipe outputPipe = pipes.get(stage);
            
            Thread processor = new Thread(() -> {
                try {
                    processStage(inputPipe, outputPipe, stage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            
            processors.add(processor);
            processor.start();
        }
    }
    
    private void processStage(Pipe inputPipe, Pipe outputPipe, int stage) 
            throws IOException {
        Pipe.SourceChannel source = (inputPipe != null) ? inputPipe.source() : null;
        Pipe.SinkChannel sink = outputPipe.sink();
        
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        
        while (true) {
            if (source != null) {
                // Read from previous stage
                buffer.clear();
                int bytesRead = source.read(buffer);
                
                if (bytesRead == -1) {
                    // End of data
                    sink.close();
                    break;
                }
                
                if (bytesRead > 0) {
                    buffer.flip();
                    // Process data
                    processData(buffer, stage);
                    
                    // Write to next stage
                    buffer.rewind();
                    sink.write(buffer);
                }
            } else {
                // First stage - generate data
                ByteBuffer data = generateData();
                sink.write(data);
                
                // Simulate end condition
                if (shouldStop()) {
                    sink.close();
                    break;
                }
            }
        }
    }
    
    public void feedData(ByteBuffer data) throws IOException {
        Pipe.SinkChannel firstSink = pipes.get(0).sink();
        firstSink.write(data);
    }
    
    public ByteBuffer collectResult() throws IOException {
        Pipe.SourceChannel lastSource = pipes.get(pipes.size() - 1).source();
        ByteBuffer result = ByteBuffer.allocate(1024);
        lastSource.read(result);
        result.flip();
        return result;
    }
}

3. Selector-based Multiplexing #

// Multiple pipes multiplexed with single selector
public class PipeMultiplexer {
    private final Selector selector;
    private final Map<SelectionKey, PipeHandler> handlers;
    
    public PipeMultiplexer() throws IOException {
        this.selector = Selector.open();
        this.handlers = new HashMap<>();
    }
    
    public void addPipe(Pipe pipe, String name) throws IOException {
        Pipe.SourceChannel source = pipe.source();
        source.configureBlocking(false);
        
        SelectionKey key = source.register(selector, SelectionKey.OP_READ);
        handlers.put(key, new PipeHandler(pipe, name));
        
        System.out.println("Added pipe: " + name);
    }
    
    public void run() throws IOException {
        System.out.println("Pipe multiplexer started");
        
        while (true) {
            selector.select();
            
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            for (SelectionKey key : selectedKeys) {
                if (key.isReadable()) {
                    handleReadable(key);
                }
            }
            selectedKeys.clear();
        }
    }
    
    private void handleReadable(SelectionKey key) throws IOException {
        PipeHandler handler = handlers.get(key);
        if (handler != null) {
            handler.handleRead();
        }
    }
    
    private static class PipeHandler {
        private final Pipe pipe;
        private final String name;
        
        PipeHandler(Pipe pipe, String name) {
            this.pipe = pipe;
            this.name = name;
        }
        
        void handleRead() throws IOException {
            Pipe.SourceChannel source = pipe.source();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            
            int bytesRead = source.read(buffer);
            if (bytesRead > 0) {
                buffer.flip();
                String data = StandardCharsets.UTF_8.decode(buffer).toString();
                System.out.println("[" + name + "] Received: " + data);
            } else if (bytesRead == -1) {
                System.out.println("[" + name + "] Pipe closed");
                source.close();
            }
        }
    }
}

Best Practices #

  1. Always Close Both Ends:

    // Correct - close both ends
    try (Pipe pipe = Pipe.open();
         Pipe.SinkChannel sink = pipe.sink();
         Pipe.SourceChannel source = pipe.source()) {
        // Use pipe
    }
    
    // Avoid - leaving channels open
    Pipe pipe = Pipe.open();
    pipe.sink().close();
    // Forgot to close source
    
  2. Handle Partial Reads/Writes:

    // Always check return values
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int totalRead = 0;
    
    while (totalRead < expectedBytes) {
        int read = source.read(buffer);
        if (read == -1) break; // EOF
        totalRead += read;
    }
    
    // For writes
    while (buffer.hasRemaining()) {
        int written = sink.write(buffer);
        if (written == 0) {
            // Would block in non-blocking mode
            break;
        }
    }
    
  3. Use Appropriate Buffer Sizes:

    // Match buffer size to typical message size
    public static final int PIPE_BUFFER_SIZE = 8192; // 8KB
    
    ByteBuffer buffer = ByteBuffer.allocate(PIPE_BUFFER_SIZE);
    
    // For large data, use scatter/gather
    ByteBuffer[] buffers = new ByteBuffer[] {
        ByteBuffer.allocate(128),   // Header
        ByteBuffer.allocate(1024),  // Body
        ByteBuffer.allocate(64)     // Footer
    };
    
  4. Configure Non-blocking Mode Early:

    // Configure before use if non-blocking needed
    Pipe pipe = Pipe.open();
    pipe.source().configureBlocking(false);
    pipe.sink().configureBlocking(false);
    
    // Register with selector
    Selector selector = Selector.open();
    pipe.source().register(selector, SelectionKey.OP_READ);
    
  5. Clean Up with Virtual Threads:

    // Virtual threads auto-configure pipes
    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        executor.submit(() -> {
            try (Pipe pipe = Pipe.open()) {
                // Pipe automatically non-blocking for virtual threads
                usePipe(pipe);
            }
        });
    }
    

Common Pitfalls #

  1. Forgetting to Close Sink for EOF Detection:

    // Wrong - consumer hangs waiting for EOF
    Pipe pipe = Pipe.open();
    pipe.sink().write(buffer);
    // Forgot to close sink
    
    // Consumer waits forever
    while (true) {
        int read = source.read(buffer); // Blocks forever
    }
    
    // Correct - close sink when done
    pipe.sink().write(buffer);
    pipe.sink().close(); // Signals EOF to reader
    
  2. Ignoring Return Values:

    // Wrong - assumes all data written
    ByteBuffer buffer = ByteBuffer.wrap(data);
    sink.write(buffer); // May write partial data
    
    // Correct - handle partial writes
    while (buffer.hasRemaining()) {
        int written = sink.write(buffer);
        if (written == 0) {
            // Handle would-block
            break;
        }
    }
    
  3. Buffer Underflow with Scatter/Gather:

    // Wrong - may not fill all buffers
    ByteBuffer[] buffers = new ByteBuffer[3];
    // ... initialize buffers
    source.read(buffers); // May only fill first buffer
    
    // Process assuming all filled
    buffers[2].flip(); // BufferUnderflowException if not filled
    
    // Correct - check each buffer
    long totalRead = source.read(buffers);
    for (ByteBuffer buf : buffers) {
        if (buf.position() > 0) {
            buf.flip();
            process(buf);
        }
    }
    
  4. Deadlock with Blocking Operations:

    // Wrong - potential deadlock
    Pipe pipe = Pipe.open();
    
    // Thread 1: writes large amount, blocks when pipe full
    sink.write(largeBuffer); // Blocks
    
    // Thread 2: supposed to read but never scheduled
    // Deadlock!
    
    // Correct - use non-blocking or ensure consumer runs
    sink.configureBlocking(false);
    // Or use separate threads properly synchronized
    
  5. Not Handling Interrupts:

    // Wrong - ignores interrupt during blocking I/O
    public void readFromPipe(Pipe.SourceChannel source) {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            source.read(buffer); // Blocks, may be interrupted
        } catch (IOException e) {
            // Lost interrupt status
        }
    }
    
    // Correct - preserve interrupt status
    public void readFromPipe(Pipe.SourceChannel source) {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            source.read(buffer);
        } catch (IOException e) {
            Thread.currentThread().interrupt(); // Restore interrupt
            throw new RuntimeException(e);
        }
    }
    

Internal Implementation Details #

Native Pipe Creation #

The IOUtil.makePipe(boolean blocking) native method creates an OS pipe:

// Native implementation creates pipe and sets blocking mode
private static native long makePipe0(boolean blocking);

// Java wrapper
static long makePipe(boolean blocking) throws IOException {
    long fds = makePipe0(blocking);
    if (fds == 0) {
        throw new IOException("Pipe creation failed");
    }
    return fds;
}

// The returned long packs two file descriptors:
// - High 32 bits: read end file descriptor
// - Low 32 bits: write end file descriptor

Platform-specific implementations:

  • Unix/Linux: Uses pipe() or pipe2() system call
  • Windows: Uses CreatePipe() API
  • Blocking mode: Configured via fcntl() (Unix) or pipe creation flags (Windows)

Dual-Lock Synchronization Strategy #

Pipe channels use two locks for thread safety:

// Read operation in SourceChannelImpl
public int read(ByteBuffer dst) throws IOException {
    readLock.lock();  // Only one thread can read at a time
    try {
        ensureOpen();
        boolean blocking = isBlocking();
        // ... perform read
        return implRead(dst, blocking);
    } finally {
        readLock.unlock();
    }
}

// State changes use separate lock
private void updateState(int newState) {
    synchronized (stateLock) {  // Protects state field
        this.state = newState;
    }
}

Lock Hierarchy Rules:

  1. Never hold stateLock while performing blocking I/O
  2. readLock/writeLock are per-operation locks
  3. State changes are atomic and protected by stateLock

Virtual Thread Integration #

// Automatic non-blocking configuration for virtual threads
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
    if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
        synchronized (stateLock) {
            ensureOpen();
            IOUtil.configureBlocking(fd, false);
            forcedNonBlocking = true; // Permanent change
        }
    }
}

// Used in all blocking operations
public int read(ByteBuffer dst) throws IOException {
    readLock.lock();
    try {
        configureSocketNonBlockingIfVirtualThread(); // Auto-configure
        // ... rest of read logic
    } finally {
        readLock.unlock();
    }
}

Selector Integration #

Pipe channels implement SelChImpl for selector support:

// Event translation for selector
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
    int intOps = ski.nioInterestOps();
    int newOps = initialOps;
    
    if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
        // Error/hangup makes all interested ops ready
        newOps = intOps;
    }
    
    if (((ops & Net.POLLIN) != 0) && 
        ((intOps & SelectionKey.OP_READ) != 0)) {
        newOps |= SelectionKey.OP_READ;
    }
    
    ski.nioReadyOps(newOps);
    return (newOps & ~oldOps) != 0;
}

Performance Optimization Techniques #

1. Buffer Pooling for High Throughput #

// Reuse buffers to reduce allocation overhead
public class PipeBufferPool {
    private final Deque<ByteBuffer> bufferPool;
    private final int bufferSize;
    
    public PipeBufferPool(int bufferSize, int poolSize) {
        this.bufferSize = bufferSize;
        this.bufferPool = new ArrayDeque<>(poolSize);
        
        // Pre-allocate buffers
        for (int i = 0; i < poolSize; i++) {
            bufferPool.add(ByteBuffer.allocateDirect(bufferSize));
        }
    }
    
    public ByteBuffer acquire() {
        ByteBuffer buffer = bufferPool.poll();
        if (buffer == null) {
            buffer = ByteBuffer.allocateDirect(bufferSize);
        }
        buffer.clear();
        return buffer;
    }
    
    public void release(ByteBuffer buffer) {
        if (buffer.isDirect() && buffer.capacity() == bufferSize) {
            bufferPool.offer(buffer);
        }
    }
}

2. Batch Processing #

// Process multiple messages in batch
public static void batchProcess(Pipe.SourceChannel source, 
                                Consumer<ByteBuffer> processor) throws IOException {
    List<ByteBuffer> batch = new ArrayList<>();
    ByteBuffer buffer = ByteBuffer.allocate(8192);
    
    source.configureBlocking(false);
    Selector selector = Selector.open();
    source.register(selector, SelectionKey.OP_READ);
    
    while (true) {
        // Wait for data
        selector.select(100); // 100ms timeout
        
        // Read all available data
        int read;
        do {
            buffer.clear();
            read = source.read(buffer);
            if (read > 0) {
                buffer.flip();
                ByteBuffer copy = ByteBuffer.allocate(read);
                copy.put(buffer);
                copy.flip();
                batch.add(copy);
            }
        } while (read > 0);
        
        // Process batch if we have data
        if (!batch.isEmpty()) {
            processor.accept(combineBuffers(batch));
            batch.clear();
        }
        
        // Check for EOF
        if (read == -1) {
            break;
        }
    }
}

3. Zero-Copy Between Threads #

// Using direct buffers for zero-copy between threads
public static void zeroCopyPipeTransfer() throws IOException {
    try (Pipe pipe = Pipe.open();
         Pipe.SinkChannel sink = pipe.sink();
         Pipe.SourceChannel source = pipe.source()) {
        
        // Allocate direct buffer (memory outside Java heap)
        ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024);
        
        // Producer writes directly
        directBuffer.put("Data".getBytes());
        directBuffer.flip();
        sink.write(directBuffer);
        
        // Consumer reads same memory (no copy)
        directBuffer.clear();
        source.read(directBuffer);
        directBuffer.flip();
        
        // Data transferred without copying between threads
    }
}