Selector
The Selector class provides the ability to multiplex I/O operations across multiple SelectableChannel objects, enabling a single thread to manage multiple network connections efficiently.
Source Code #
Core Implementation #
public abstract class Selector implements Closeable {
protected Selector() { }
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public abstract boolean isOpen();
public abstract SelectorProvider provider();
public abstract Set<SelectionKey> keys();
public abstract Set<SelectionKey> selectedKeys();
public abstract int selectNow() throws IOException;
public abstract int select(long timeout) throws IOException;
public abstract int select() throws IOException;
public int select(Consumer<SelectionKey> action, long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return doSelect(Objects.requireNonNull(action), timeout);
}
public int select(Consumer<SelectionKey> action) throws IOException {
return select(action, 0);
}
public int selectNow(Consumer<SelectionKey> action) throws IOException {
return doSelect(Objects.requireNonNull(action), -1);
}
private int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
synchronized (this) {
Set<SelectionKey> selectedKeys = selectedKeys();
synchronized (selectedKeys) {
selectedKeys.clear();
int numKeySelected;
if (timeout < 0) {
numKeySelected = selectNow();
} else {
numKeySelected = select(timeout);
}
// copy selected-key set as action may remove keys
Set<SelectionKey> keysToConsume = Set.copyOf(selectedKeys);
assert keysToConsume.size() == numKeySelected;
selectedKeys.clear();
// invoke action for each selected key
keysToConsume.forEach(k -> {
action.accept(k);
if (!isOpen())
throw new ClosedSelectorException();
});
return numKeySelected;
}
}
}
public abstract Selector wakeup();
public abstract void close() throws IOException;
}
Implementation Details #
SelectorImpl Base Class #
The platform-specific selector implementations extend the abstract SelectorImpl class which provides common functionality:
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys registered with this Selector
private final Set<SelectionKey> keys;
private final Set<SelectionKey> selectedKeys;
private final Set<SelectionKey> publicKeys;
private final Set<SelectionKey> publicSelectedKeys;
// pending cancelled keys for deregistration
private final Deque<SelectionKeyImpl> cancelledKeys = new ArrayDeque<>();
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys;
}
@Override
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}
protected abstract int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException;
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
@Override
public final int select() throws IOException {
return lockAndDoSelect(null, -1);
}
@Override
public final int selectNow() throws IOException {
return lockAndDoSelect(null, 0);
}
protected abstract void implClose() throws IOException;
protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;
public void cancel(SelectionKeyImpl ski) {
synchronized (cancelledKeys) {
cancelledKeys.addLast(ski);
}
}
protected final void processDeregisterQueue() throws IOException {
synchronized (cancelledKeys) {
SelectionKeyImpl ski;
while ((ski = cancelledKeys.pollFirst()) != null) {
implDereg(ski);
selectedKeys.remove(ski);
keys.remove(ski);
deregister(ski);
}
}
}
protected final int processReadyEvents(int rOps, SelectionKeyImpl ski, Consumer<SelectionKey> action) {
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
ensureOpen();
return 1;
}
} else {
if (selectedKeys.contains(ski)) {
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
return 1;
}
}
}
return 0;
}
protected abstract void setEventOps(SelectionKeyImpl ski);
}
Key Sets Management #
A selector maintains three sets of selection keys:
Key Set: Contains all keys representing current channel registrations.
- Returned by
keys()method - Immutable view (
Collections.unmodifiableSet) - Thread-safe for concurrent access
- Returned by
Selected-Key Set: Contains keys whose channels were detected ready during selection.
- Returned by
selectedKeys()method - Supports removal but not direct addition
- Not thread-safe - requires external synchronization
- Returned by
Cancelled-Key Set: Internal set of keys cancelled but not yet deregistered.
- Not directly accessible
- Processed during selection operations
// Key set initialization in SelectorImpl
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
Selection Operations #
Selection operations query the underlying OS for channel readiness:
selectNow(): Non-blocking, returns immediatelyselect(long timeout): Blocks up to specified millisecondsselect(): Blocks indefinitely until at least one channel readyselect(Consumer): Action-based selection (Java 11+)
Selection steps:
- Process cancelled keys (deregister channels)
- Query OS for ready channels
- Update selected-key set or invoke action
- Process any new cancelled keys added during step 2
// Selection logic in SelectorImpl.doSelect
processUpdateQueue();
processDeregisterQueue();
int numEntries = poll(...); // Platform-specific
processDeregisterQueue();
return processEvents(numEntries, action);
Concurrency Model #
- Selector & Key Set: Thread-safe for multiple concurrent threads
- Selected-Key Set: Not thread-safe; requires external synchronization
- Selection Operations: Synchronize on selector, then selected-key set
- Interest Set Changes: Effective only for next selection operation
- Key Cancellation/Channel Close: Safe at any time
Selection operations use double-checked locking:
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
Platform-Specific Implementations #
Selector implementations vary by operating system:
| Platform | Implementation | Underlying API | Max Channels | Performance |
|---|---|---|---|---|
| Linux | EPollSelectorImpl | epoll | File descriptor limit | Excellent |
| Unix (BSD, macOS) | PollSelectorImpl | poll | File descriptor limit | Good |
| Windows | WEPollSelectorImpl | WEPoll | Default 1024 | Good |
Each implementation overrides three key methods:
doSelect(): Performs the actual I/O multiplexingimplClose(): Cleans up native resourcesimplDereg(): Removes channel from native structuressetEventOps(): Updates interest operations for a key
Performance Characteristics #
Scalability #
Selector performance depends on the underlying OS multiplexing mechanism:
- Linux epoll: O(1) for adding/removing/modifying descriptors, O(n) for ready events
- Unix poll: O(n) for all operations, limited by file descriptor count
- Windows WEPoll: Similar to epoll but with some limitations
Typical performance characteristics:
| Operation | Linux epoll | Unix poll | Windows WEPoll |
|---|---|---|---|
| Registration | O(1) | O(1) | O(1) |
| Modification | O(1) | O(1) | O(1) |
| Deregistration | O(1) | O(1) | O(1) |
| Selection (10k channels) | <1ms | 10-50ms | 2-5ms |
| Selection (100k channels) | 1-2ms | 100-500ms | 20-50ms |
| Max supported channels | ~1M | ~10k | 1024 default |
Memory Usage #
- Linux epoll: ~1KB per 1000 channels + poll array memory
- Unix poll: Poll array grows with registered channels
- Windows WEPoll: Fixed-size internal structures
// Memory allocation in EPollSelectorImpl
private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);
private final long pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
Virtual Thread Integration #
Java 21+ integrates virtual threads with selectors:
- Virtual threads automatically force sockets to non-blocking mode
doSelect()handles virtual threads differently with parking- No thread-per-connection overhead
// Virtual thread handling in EPollSelectorImpl
if (Thread.currentThread().isVirtual()) {
numEntries = (timedPoll)
? timedPoll(TimeUnit.MILLISECONDS.toNanos(to))
: untimedPoll(blocking);
}
Common Usage Patterns #
Basic Selector Loop #
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int readyChannels = selector.select();
if (readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
acceptConnection(key, selector);
} else if (key.isConnectable()) {
finishConnection(key);
} else if (key.isReadable()) {
readData(key);
} else if (key.isWritable()) {
writeData(key);
}
keyIterator.remove();
}
}
Action-Based Selection (Java 11+) #
selector.select(key -> {
if (key.isAcceptable()) {
acceptConnection(key, selector);
} else if (key.isReadable()) {
readData(key);
}
});
Multi-Selector Patterns #
// Using multiple selectors for different channel types
Selector networkSelector = Selector.open();
Selector fileSelector = Selector.open();
// Thread per selector
new Thread(() -> handleSelector(networkSelector)).start();
new Thread(() -> handleSelector(fileSelector)).start();
Graceful Shutdown #
volatile boolean running = true;
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running = false;
selector.wakeup(); // Wake up blocking select
}));
while (running) {
selector.select();
// Process keys...
}
Best Practices #
- Reuse Selectors: Create selectors once and reuse them for the application lifetime
- Limit Registered Channels: Keep registered channels under 10k for poll-based systems
- Use epoll on Linux: Default on modern Linux, provides best scalability
- Clear Selected Keys: Always remove processed keys from selected-key set
- Handle Wakeups: Design for graceful shutdown using
wakeup() - Check Key Validity: Always check
key.isValid()before operations - Avoid Blocking in Actions: Never block in selection key actions
- Use Timeouts: Use
select(long timeout)for responsiveness - Monitor FD Limits: Ensure system file descriptor limits accommodate channel count
- Close Properly: Always close selectors to release native resources
Common Pitfalls #
- Forgotten keyIterator.remove(): Causes infinite loops
- Blocking in Selection Key Processing: Starves other channels
- Ignoring Wakeup Signals: Prevents graceful shutdown
- Assuming Thread Safety: Selected-key set not thread-safe
- Not Checking Key Validity: Operating on cancelled keys
- Interest Ops Accumulation: Forgetting to clear interest ops before setting new ones
- Selector Leak: Not closing selectors, causing native resource leaks
- FD Exhaustion: Registering too many channels hitting OS limits
- Virtual Thread Deadlock: Blocking virtual threads in selector actions
- Concurrent Modification: Modifying selected-key set while iterating without iterator
Internal Implementation Details #
EPollSelectorImpl (Linux) #
The Linux implementation uses the epoll system call for efficient I/O multiplexing:
class EPollSelectorImpl extends SelectorImpl {
private final int epfd; // epoll file descriptor
private final long pollArrayAddress; // native poll array
private final EventFD eventfd; // for interrupt signaling
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
this.epfd = EPoll.create();
this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
this.eventfd = new EventFD();
EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN);
}
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
processUpdateQueue();
processDeregisterQueue();
int numEntries;
if (Thread.currentThread().isVirtual()) {
// Virtual thread handling with parking
numEntries = (timeout > 0) ? timedPoll(timeout) : untimedPoll(timeout == 0);
} else {
begin(timeout != 0);
try {
do {
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, (int)timeout);
} while (numEntries == IOStatus.INTERRUPTED);
} finally {
end(timeout != 0);
}
}
processDeregisterQueue();
return processEvents(numEntries, action);
}
private void processUpdateQueue() {
synchronized (updateLock) {
SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) {
if (ski.isValid()) {
int fd = ski.getFDVal();
int newEvents = ski.translateInterestOps();
int registeredEvents = ski.registeredEvents();
if (newEvents != registeredEvents) {
if (newEvents == 0) {
EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
} else if (registeredEvents == 0) {
EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
} else {
EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
}
ski.registeredEvents(newEvents);
}
}
}
}
}
}
Key optimizations:
- EventFD for wakeup: More efficient than pipe-based wakeup
- Virtual thread parking: Avoids native thread blocking
- Batch event processing: Processes up to 1024 events per epoll_wait call
- FD to Key map: O(1) lookup for ready file descriptors
PollSelectorImpl (Unix) #
The Unix implementation uses the poll system call:
class PollSelectorImpl extends SelectorImpl {
private AllocatedNativeObject pollArray; // Native pollfd array
private final int fd0, fd1; // Pipe for wakeup
private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
PollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
this.pollArray = new AllocatedNativeObject(pollArrayCapacity * SIZE_POLLFD, false);
long fds = IOUtil.makePipe(false);
this.fd0 = (int) (fds >>> 32);
this.fd1 = (int) fds;
setFirst(fd0, Net.POLLIN);
}
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
processUpdateQueue();
processDeregisterQueue();
int numPolled;
try {
begin(timeout != 0);
do {
numPolled = poll(pollArray.address(), pollArraySize, (int)timeout);
} while (numPolled == IOStatus.INTERRUPTED);
} finally {
end(timeout != 0);
}
processDeregisterQueue();
return processEvents(action);
}
private void setFirst(int fd, int events) {
int offset = 0;
putDescriptor(offset, fd);
putEvents(offset, events);
putRevent(offset, 0);
}
}
Characteristics:
- Portable: Works across all Unix-like systems
- Linear scan: O(n) performance for each poll call
- Dynamic array: Grows as channels are registered
- Pipe-based wakeup: Uses Unix pipes for interrupt signaling
WEPollSelectorImpl (Windows) #
The Windows implementation uses the WEPoll library:
class WEPollSelectorImpl extends SelectorImpl {
private final long handle; // WEPoll handle
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
WEPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
this.handle = create();
// Windows-specific initialization
}
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
processUpdateQueue();
processDeregisterQueue();
int numEntries;
try {
begin(timeout != 0);
numEntries = poll(handle, pollArrayAddress, MAX_EVENTS, (int)timeout);
} finally {
end(timeout != 0);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}
}
Windows-specific considerations:
- Max 1024 channels: Default limit (configurable via registry)
- Socket-only: Only supports socket channels
- No file descriptor limit: Uses Windows HANDLE objects
- Completion port alternative: For higher scalability, consider I/O completion ports
Performance Optimization Techniques #
Selector Tuning #
// Increase maximum selector channels on Windows
System.setProperty("sun.nio.ch.maxUpdateArraySize", "65536");
// Use larger poll array for epoll
System.setProperty("sun.nio.ch.epoll.maxEvents", "4096");
// Disable unnecessary wakeup polling
System.setProperty("jdk.nio.enableFastSelectorLoop", "true");
Channel Management #
// Batch channel registration
List<SocketChannel> channels = createChannels();
Selector selector = Selector.open();
for (SocketChannel ch : channels) {
ch.configureBlocking(false);
ch.register(selector, SelectionKey.OP_READ);
}
// Use interest ops aggregation
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// Instead of separate calls:
// key.interestOps(SelectionKey.OP_READ);
// key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
Zero-Copy Selection #
// Direct buffer usage with selectors
ByteBuffer buffer = ByteBuffer.allocateDirect(65536);
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
channel.read(buffer); // Zero-copy to native memory
Selector Pooling #
// Pool selectors for different connection types
public class SelectorPool {
private final BlockingQueue<Selector> pool = new ArrayBlockingQueue<>(10);
public SelectorPool() throws IOException {
for (int i = 0; i < 10; i++) {
pool.add(Selector.open());
}
}
public Selector acquire() throws InterruptedException {
return pool.take();
}
public void release(Selector selector) {
selector.wakeup(); // Ensure it's not blocked
pool.offer(selector);
}
}
Related Classes #
SelectionKey #
- Represents a channel’s registration with a selector
- Contains interest set, ready set, and attachment
- SelectionKey Details
SelectableChannel #
- Channel that can be multiplexed by a selector
- Base class for socket, pipe, and datagram channels
- SelectableChannel Details
SelectorProvider #
- Service provider for selector implementations
- Allows custom selector implementations
- SelectorProvider Details
SocketChannel #
- TCP socket channel for stream-oriented connections
- Most commonly used with selectors
- SocketChannel Details
ServerSocketChannel #
- TCP server socket for accepting connections
- Used with
SelectionKey.OP_ACCEPT - ServerSocketChannel Details
DatagramChannel #
- UDP datagram channel for packet-oriented communication
- Used with
SelectionKey.OP_READandSelectionKey.OP_WRITE - DatagramChannel Details
Pipe #
- Intra-JVM communication channels
Pipe.SourceChannelandPipe.SinkChannelare selectable- Pipe Details