class NIO::Selector
Selectors monitor IO objects for events of interest
def self.backends
Return supported backends as symbols
def self.backends [:ruby] end
def backend
* :io_uring - libev w\ Linux io_uring (experimental)
* :linuxaio - libev w\ Linux AIO io_submit (experimental)
* :port - libev w\ I/O completion ports
* :select - libev w\ SysV select
* :kqueue - libev w\ BSD kqueue
* :poll - libev w\ POSIX poll
* :epoll - libev w\ Linux epoll
* :java - Java NIO on JRuby
* :ruby - pure Ruby (i.e IO.select)
Supported backends are:
Return a symbol representing the backend I/O multiplexing mechanism used.
def backend :ruby end
def close
def close @lock.synchronize do return if @closed begin @wakeup.close rescue IOError end begin @waker.close rescue IOError end @closed = true end end
def closed?
def closed? @closed end
def deregister(io)
def deregister(io) @lock.synchronize do monitor = @selectables.delete IO.try_convert(io) monitor.close(false) if monitor && !monitor.closed? monitor end end
def empty?
def empty? @selectables.empty? end
def initialize(backend = :ruby)
def initialize(backend = :ruby) raise ArgumentError, "unsupported backend: #{backend}" unless [:ruby, nil].include?(backend) @selectables = {} @lock = Mutex.new # Other threads can wake up a selector @wakeup, @waker = IO.pipe @closed = false end
def register(io, interest)
* :w - is the IO writeable?
* :r - is the IO readable?
of events. Valid event types for interest are:
Register interest in an IO object with the selector for the given types
def register(io, interest) unless defined?(::OpenSSL) && io.is_a?(::OpenSSL::SSL::SSLSocket) io = IO.try_convert(io) end @lock.synchronize do raise IOError, "selector is closed" if closed? monitor = @selectables[io] raise ArgumentError, "already registered as #{monitor.interests.inspect}" if monitor monitor = Monitor.new(io, interest, self) @selectables[monitor.io] = monitor monitor end end
def registered?(io)
def registered?(io) @lock.synchronize { @selectables.key? io } end
def select(timeout = nil)
def select(timeout = nil) selected_monitors = Set.new @lock.synchronize do readers = [@wakeup] writers = [] @selectables.each do |io, monitor| readers << io if monitor.interests == :r || monitor.interests == :rw writers << io if monitor.interests == :w || monitor.interests == :rw monitor.readiness = nil end ready_readers, ready_writers = Kernel.select(readers, writers, [], timeout) return unless ready_readers # timeout ready_readers.each do |io| if io == @wakeup # Clear all wakeup signals we've received by reading them # Wakeups should have level triggered behavior @wakeup.read(@wakeup.stat.size) else monitor = @selectables[io] monitor.readiness = :r selected_monitors << monitor end end ready_writers.each do |io| monitor = @selectables[io] monitor.readiness = monitor.readiness == :r ? :rw : :w selected_monitors << monitor end end if block_given? selected_monitors.each { |m| yield m } selected_monitors.size else selected_monitors.to_a end end
def wakeup
has the same effect as invoking it just once. In other words, it provides
Invoking this method more than once between two successive select calls
any such thread exists.
Wake up a thread that's in the middle of selecting on this selector, if
def wakeup # Send the selector a signal in the form of writing data to a pipe begin @waker.write_nonblock "\0" rescue IO::WaitWritable # This indicates the wakeup pipe is full, which means the other thread # has already received many wakeup calls, but not processed them yet. # The other thread will completely drain this pipe when it wakes up, # so it's ok to ignore this exception if it occurs: we know the other # thread has already been signaled to wake up end nil end