class IO::Event::Selector::Select
A pure-Ruby implementation of the event selector.
def again?(errno)
def again?(errno) EAGAIN or errno == EWOULDBLOCK
def blocking(&block)
def blocking(&block) fiber = Fiber.new(blocking: true, &block) return fiber.resume(fiber) end
def close
def close @interrupt.close @loop = nil @waiting = nil end
def initialize(loop)
def initialize(loop) @loop = loop @waiting = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) @idle_duration = 0.0 end
def io_read(fiber, io, buffer, length, offset = 0)
@parameter length [Integer] The minimum number of bytes to read.
Read from the given IO to the buffer.
def io_read(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.read(io, 0, offset)} if result < 0 if again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break else total += result break if total >= length offset += result end end end return total end
def io_read(fiber, io, buffer, length, offset = 0)
def io_read(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do maximum_size = buffer.size - offset while maximum_size > 0 result = Fiber.blocking{buffer.read(io, maximum_size, offset)} if again?(result) if length > 0 self.io_wait(fiber, io, IO::READABLE) else return result end elsif result < 0 return result else total += result offset += result break if total >= length end maximum_size = buffer.size - offset end end return total end
def io_read(fiber, _io, buffer, length, offset = 0)
def io_read(fiber, _io, buffer, length, offset = 0) # We need to avoid any internal buffering, so we use a duplicated IO object: io = IO.for_fd(_io.fileno, autoclose: false) total = 0 maximum_size = buffer.size - offset while maximum_size > 0 case result = blocking{io.read_nonblock(maximum_size, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return EWOULDBLOCK end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return EWOULDBLOCK end when nil break else buffer.set_string(result, offset) size = result.bytesize total += size offset += size break if size >= length length -= size end maximum_size = buffer.size - offset end return total rescue IOError => error return -Errno::EBADF::Errno rescue SystemCallError => error return -error.errno end
def io_select(readable, writable, priority, timeout)
@parameter writable [Array(IO)] The list of IO objects to wait for writability.
@parameter readable [Array(IO)] The list of IO objects to wait for readability.
Wait for multiple IO objects to become readable or writable.
def io_select(readable, writable, priority, timeout) Thread.new do IO.select(readable, writable, priority, timeout) end.value end
def io_wait(fiber, io, events)
@parameter io [IO] The IO object to wait on.
@parameter fiber [Fiber] The fiber that is waiting.
Wait for the given IO to become readable or writable.
def io_wait(fiber, io, events) waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io]) @loop.transfer ensure waiter&.invalidate end
def io_write(fiber, io, buffer, length, offset = 0)
@parameter length [Integer] The minimum number of bytes to write.
Write to the given IO from the buffer.
def io_write(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.write(io, 0, offset)} if result < 0 if again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break result else total += result break if total >= length offset += result end end end return total end
def io_write(fiber, io, buffer, length, offset = 0)
def io_write(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do maximum_size = buffer.size - offset while maximum_size > 0 result = Fiber.blocking{buffer.write(io, maximum_size, offset)} if again?(result) if length > 0 self.io_wait(fiber, io, IO::READABLE) else return result end elsif result < 0 return result else total += result offset += result break if total >= length end maximum_size = buffer.size - offset end end return total end
def io_write(fiber, _io, buffer, length, offset = 0)
def io_write(fiber, _io, buffer, length, offset = 0) # We need to avoid any internal buffering, so we use a duplicated IO object: io = IO.for_fd(_io.fileno, autoclose: false) total = 0 maximum_size = buffer.size - offset while maximum_size > 0 chunk = buffer.get_string(offset, maximum_size) case result = blocking{io.write_nonblock(chunk, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return EWOULDBLOCK end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return EWOULDBLOCK end else total += result offset += result break if result >= length length -= result end maximum_size = buffer.size - offset end return total rescue IOError => error return -Errno::EBADF::Errno rescue SystemCallError => error return -error.errno end
def pop_ready
def pop_ready @ready.empty? = @ready.size times do = @ready.pop .transfer if fiber.alive? true
def process_wait(fiber, pid, flags)
def process_wait(fiber, pid, flags) Thread.new do Process::Status.wait(pid, flags) end.value end
def push(fiber)
def push(fiber) @ready.push(fiber) end
def raise(fiber, *arguments)
def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end
def ready?
def ready? !@ready.empty? end
def resume(fiber, *arguments)
def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end
def select(duration = nil)
def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end readable = Array.new writable = Array.new priority = Array.new @waiting.each do |io, waiter| waiter.each do |fiber, events| if (events & IO::READABLE) > 0 readable << io end if (events & IO::WRITABLE) > 0 writable << io end if (events & IO::PRIORITY) > 0 priority << io end end end duration = 0 unless @ready.empty? error = nil if duration&.>(0) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) else @idle_duration = 0.0 end # We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR. Thread.handle_interrupt(::Exception => :on_blocking) do @blocked = true readable, writable, priority = ::IO.select(readable, writable, priority, duration) rescue ::Exception => error # Requeue below... ensure @blocked = false if start_time end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @idle_duration = end_time - start_time end end if error # Requeue the error into the pending exception queue: Thread.current.raise(error) return 0 end ready = Hash.new(0).compare_by_identity readable&.each do |io| ready[io] |= IO::READABLE end writable&.each do |io| ready[io] |= IO::WRITABLE end priority&.each do |io| ready[io] |= IO::PRIORITY end ready.each do |io, events| @waiting.delete(io).dispatch(events) do |waiter| # Re-schedule the waiting IO: waiter.tail = @waiting[io] @waiting[io] = waiter end end return ready.size end
def transfer
def transfer @loop.transfer end
def wakeup
def wakeup if @blocked @interrupt.signal return true end return false end
def yield
def yield optional = Optional.new(Fiber.current) @ready.push(optional) @loop.transfer ensure optional.nullify end