class IO::Event::Selector::Select
def close
def close @interrupt.close @loop = nil @readable = nil @writable = nil end
def initialize(loop)
def initialize(loop) @loop = loop @readable = Hash.new.compare_by_identity @writable = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) end
def io_read(fiber, io, buffer, length)
def io_read(fiber, io, buffer, length) offset = 0 while length > 0 # The maximum size we can read: maximum_size = buffer.size - offset case result = io.read_nonblock(maximum_size, exception: false) when :wait_readable self.io_wait(fiber, io, READABLE) when :wait_writable self.io_wait(fiber, io, WRITABLE) else break unless result buffer.copy(result, offset) offset += result.bytesize length -= result.bytesize end end return offset end
def io_wait(fiber, io, events)
def io_wait(fiber, io, events) remove_readable = remove_writable = false if (events & READABLE) > 0 or (events & PRIORITY) > 0 @readable[io] = fiber remove_readable = true end if (events & WRITABLE) > 0 @writable[io] = fiber remove_writable = true end @loop.transfer ensure @readable.delete(io) if remove_readable @writable.delete(io) if remove_writable end
def io_write(fiber, io, buffer, length)
def io_write(fiber, io, buffer, length) offset = 0 while length > 0 # From offset until the end: chunk = buffer.to_str(offset, length) case result = io.write_nonblock(chunk, exception: false) when :wait_readable self.io_wait(fiber, io, READABLE) when :wait_writable self.io_wait(fiber, io, WRITABLE) else offset += result length -= result end end return offset end
def pop_ready
def pop_ready @ready.empty? = @ready.size times do = @ready.pop .transfer if fiber.alive? true
def process_wait(fiber, pid, flags)
def process_wait(fiber, pid, flags) r, w = IO.pipe thread = Thread.new do Process::Status.wait(pid, flags) ensure w.close end self.io_wait(fiber, r, READABLE) return thread.value ensure r.close w.close thread&.kill end
def push(fiber)
def push(fiber) @ready.push(fiber) end
def raise(fiber, *arguments)
def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end
def ready?
def ready? !@ready.empty? end
def resume(fiber, *arguments)
def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end
def select(duration = nil)
def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end # The GVL ensures this is sufficiently synchronised for `#wakeup` to work correctly. @blocked = true readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration) @blocked = false ready = Hash.new(0) readable&.each do |io| fiber = @readable.delete(io) ready[fiber] |= READABLE end writable&.each do |io| fiber = @writable.delete(io) ready[fiber] |= WRITABLE end ready.each do |fiber, events| fiber.transfer(events) if fiber.alive? end return ready.size end
def transfer
def transfer @loop.transfer end
def wakeup
def wakeup if @blocked @interrupt.signal end end
def yield
def yield optional = Optional.new(Fiber.current) @ready.push(optional) @loop.transfer ensure optional.nullify end