class IO::Event::Selector::Select
def blocking(&block)
def blocking(&block) ew(blocking: true, &block).resume
def close
def close @interrupt.close @loop = nil @waiting = nil end
def initialize(loop)
def initialize(loop) @loop = loop @waiting = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) end
def io_read(fiber, io, buffer, length)
def io_read(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset case result = blocking{io.read_nonblock(maximum_size, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end when nil break else buffer.set_string(result, offset) size = result.bytesize offset += size break if size >= length length -= size end end return offset end
def io_wait(fiber, io, events)
def io_wait(fiber, io, events) waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io]) @loop.transfer ensure waiter&.invalidate end
def io_write(fiber, io, buffer, length)
def io_write(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset chunk = buffer.get_string(offset, maximum_size) case result = blocking{io.write_nonblock(chunk, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end else offset += result break if result >= length length -= result end end return offset end
def pop_ready
def pop_ready @ready.empty? = @ready.size times do = @ready.pop .transfer if fiber.alive? true
def process_wait(fiber, pid, flags)
def process_wait(fiber, pid, flags) r, w = IO.pipe thread = Thread.new do Process::Status.wait(pid, flags) ensure w.close end self.io_wait(fiber, r, IO::READABLE) return thread.value ensure r.close w.close thread&.kill end
def push(fiber)
def push(fiber) @ready.push(fiber) end
def raise(fiber, *arguments)
def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end
def ready?
def ready? !@ready.empty? end
def resume(fiber, *arguments)
def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end
def select(duration = nil)
def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end readable = Array.new writable = Array.new @waiting.each do |io, waiter| waiter.each do |fiber, events| if (events & IO::READABLE) > 0 readable << io end if (events & IO::WRITABLE) > 0 writable << io end end end @blocked = true duration = 0 unless @ready.empty? readable, writable, _ = ::IO.select(readable, writable, nil, duration) @blocked = false ready = Hash.new(0) readable&.each do |io| ready[io] |= IO::READABLE end writable&.each do |io| ready[io] |= IO::WRITABLE end ready.each do |io, events| @waiting.delete(io).transfer(events) end return ready.size end
def transfer
def transfer @loop.transfer end
def wakeup
def wakeup if @blocked @interrupt.signal return true end return false end
def yield
def yield optional = Optional.new(Fiber.current) @ready.push(optional) @loop.transfer ensure optional.nullify end