class IO::Event::Selector::Select
def blocking(&block)
def blocking(&block) ew(blocking: true, &block).resume
def close
def close @interrupt.close @loop = nil @readable = nil @writable = nil end
def initialize(loop)
def initialize(loop) @loop = loop @readable = Hash.new.compare_by_identity @writable = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) end
def io_read(fiber, io, buffer, length)
def io_read(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset case result = blocking{io.read_nonblock(maximum_size, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end else break unless result buffer.copy(result, offset) size = result.bytesize offset += size break if size >= length length -= size end end return offset end
def io_wait(fiber, io, events)
def io_wait(fiber, io, events) remove_readable = remove_writable = false if (events & IO::READABLE) > 0 or (events & IO::PRIORITY) > 0 @readable[io] = fiber remove_readable = true end if (events & IO::WRITABLE) > 0 @writable[io] = fiber remove_writable = true end @loop.transfer ensure @readable.delete(io) if remove_readable @writable.delete(io) if remove_writable end
def io_write(fiber, io, buffer, length)
def io_write(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset chunk = buffer.to_str(offset, maximum_size) case result = blocking{io.write_nonblock(chunk, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end else offset += result break if result >= length length -= result end end return offset end
def pop_ready
def pop_ready @ready.empty? = @ready.size times do = @ready.pop .transfer if fiber.alive? true
def process_wait(fiber, pid, flags)
def process_wait(fiber, pid, flags) r, w = IO.pipe thread = Thread.new do Process::Status.wait(pid, flags) ensure w.close end self.io_wait(fiber, r, IO::READABLE) return thread.value ensure r.close w.close thread&.kill end
def push(fiber)
def push(fiber) @ready.push(fiber) end
def raise(fiber, *arguments)
def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end
def ready?
def ready? !@ready.empty? end
def resume(fiber, *arguments)
def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end
def select(duration = nil)
def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end @blocked = true duration = 0 unless @ready.empty? readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration) @blocked = false ready = Hash.new(0) readable&.each do |io| fiber = @readable.delete(io) ready[fiber] |= IO::READABLE end writable&.each do |io| fiber = @writable.delete(io) ready[fiber] |= IO::WRITABLE end ready.each do |fiber, events| fiber.transfer(events) if fiber.alive? end return ready.size end
def transfer
def transfer @loop.transfer end
def wakeup
def wakeup if @blocked @interrupt.signal return true end return false end
def yield
def yield optional = Optional.new(Fiber.current) @ready.push(optional) @loop.transfer ensure optional.nullify end