# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2021-2023, by Samuel Williams.
# Copyright, 2023, by Math Ieu.
require_relative '../interrupt'
require_relative '../support'
module IO::Event
module Selector
class Select
def initialize(loop)
@loop = loop
@waiting = Hash.new.compare_by_identity
@blocked = false
@ready = Queue.new
@interrupt = Interrupt.attach(self)
end
attr :loop
# If the event loop is currently sleeping, wake it up.
def wakeup
if @blocked
@interrupt.signal
return true
end
return false
end
def close
@interrupt.close
@loop = nil
@waiting = nil
end
Optional = Struct.new(:fiber) do
def transfer(*arguments)
fiber&.transfer(*arguments)
end
def alive?
fiber&.alive?
end
def nullify
self.fiber = nil
end
end
# Transfer from the current fiber to the event loop.
def transfer
@loop.transfer
end
# Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
def resume(fiber, *arguments)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.transfer(*arguments)
ensure
optional.nullify
end
# Yield from the current fiber back to the event loop. Put the current fiber into the ready list.
def yield
optional = Optional.new(Fiber.current)
@ready.push(optional)
@loop.transfer
ensure
optional.nullify
end
# Append the given fiber into the ready list.
def push(fiber)
@ready.push(fiber)
end
# Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
def raise(fiber, *arguments)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.raise(*arguments)
ensure
optional.nullify
end
def ready?
!@ready.empty?
end
Waiter = Struct.new(:fiber, :events, :tail) do
def alive?
self.fiber&.alive?
end
# Dispatch the given events to the list of waiting fibers. If the fiber was not waiting for the given events, it is reactivated by calling the given block.
def dispatch(events, &reactivate)
# We capture the tail here, because calling reactivate might modify it:
tail = self.tail
if fiber = self.fiber
if fiber.alive?
revents = events & self.events
if revents.zero?
reactivate.call(self)
else
self.fiber = nil
fiber.transfer(revents)
end
else
self.fiber = nil
end
end
tail&.dispatch(events, &reactivate)
end
def invalidate
self.fiber = nil
end
def each(&block)
if fiber = self.fiber
yield fiber, self.events
end
self.tail&.each(&block)
end
end
def io_wait(fiber, io, events)
waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
@loop.transfer
ensure
waiter&.invalidate
end
def io_select(readable, writable, priority, timeout)
Thread.new do
IO.select(readable, writable, priority, timeout)
end.value
end
EAGAIN = -Errno::EAGAIN::Errno
EWOULDBLOCK = -Errno::EWOULDBLOCK::Errno
def again?(errno)
errno == EAGAIN or errno == EWOULDBLOCK
end
if Support.fiber_scheduler_v3?
# Ruby 3.3+, full IO::Buffer support.
# @parameter length [Integer] The minimum number of bytes to read.
# @parameter offset [Integer] The offset into the buffer to read to.
def io_read(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.read(io, 0, offset)}
if result < 0
if again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break
else
total += result
break if total >= length
offset += result
end
end
end
return total
end
# @parameter length [Integer] The minimum number of bytes to write.
# @parameter offset [Integer] The offset into the buffer to write from.
def io_write(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.write(io, 0, offset)}
if result < 0
if again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break result
else
total += result
break if total >= length
offset += result
end
end
end
return total
end
elsif Support.fiber_scheduler_v2?
# Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.
def io_read(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
maximum_size = buffer.size - offset
while maximum_size > 0
result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
if again?(result)
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result < 0
return result
else
total += result
offset += result
break if total >= length
end
maximum_size = buffer.size - offset
end
end
return total
end
def io_write(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
maximum_size = buffer.size - offset
while maximum_size > 0
result = Fiber.blocking{buffer.write(io, maximum_size, offset)}
if again?(result)
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result < 0
return result
else
total += result
offset += result
break if total >= length
end
maximum_size = buffer.size - offset
end
end
return total
end
elsif Support.fiber_scheduler_v1?
# Ruby <= 3.1, limited IO::Buffer support.
def io_read(fiber, _io, buffer, length, offset = 0)
# We need to avoid any internal buffering, so we use a duplicated IO object:
io = IO.for_fd(_io.fileno, autoclose: false)
total = 0
maximum_size = buffer.size - offset
while maximum_size > 0
case result = blocking{io.read_nonblock(maximum_size, exception: false)}
when :wait_readable
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return EWOULDBLOCK
end
when :wait_writable
if length > 0
self.io_wait(fiber, io, IO::WRITABLE)
else
return EWOULDBLOCK
end
when nil
break
else
buffer.set_string(result, offset)
size = result.bytesize
total += size
offset += size
break if size >= length
length -= size
end
maximum_size = buffer.size - offset
end
return total
rescue IOError => error
return -Errno::EBADF::Errno
rescue SystemCallError => error
return -error.errno
end
def io_write(fiber, _io, buffer, length, offset = 0)
# We need to avoid any internal buffering, so we use a duplicated IO object:
io = IO.for_fd(_io.fileno, autoclose: false)
total = 0
maximum_size = buffer.size - offset
while maximum_size > 0
chunk = buffer.get_string(offset, maximum_size)
case result = blocking{io.write_nonblock(chunk, exception: false)}
when :wait_readable
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return EWOULDBLOCK
end
when :wait_writable
if length > 0
self.io_wait(fiber, io, IO::WRITABLE)
else
return EWOULDBLOCK
end
else
total += result
offset += result
break if result >= length
length -= result
end
maximum_size = buffer.size - offset
end
return total
rescue IOError => error
return -Errno::EBADF::Errno
rescue SystemCallError => error
return -error.errno
end
def blocking(&block)
fiber = Fiber.new(blocking: true, &block)
return fiber.resume(fiber)
end
end
def process_wait(fiber, pid, flags)
Thread.new do
Process::Status.wait(pid, flags)
end.value
end
private def pop_ready
unless @ready.empty?
count = @ready.size
count.times do
fiber = @ready.pop
fiber.transfer if fiber.alive?
end
return true
end
end
def select(duration = nil)
if pop_ready
# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
duration = 0
end
readable = Array.new
writable = Array.new
priority = Array.new
@waiting.each do |io, waiter|
waiter.each do |fiber, events|
if (events & IO::READABLE) > 0
readable << io
end
if (events & IO::WRITABLE) > 0
writable << io
end
if (events & IO::PRIORITY) > 0
priority << io
end
end
end
duration = 0 unless @ready.empty?
error = nil
# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
Thread.handle_interrupt(::Exception => :on_blocking) do
@blocked = true
readable, writable, priority = ::IO.select(readable, writable, priority, duration)
rescue ::Exception => error
# Requeue below...
ensure
@blocked = false
end
if error
# Requeue the error into the pending exception queue:
Thread.current.raise(error)
return 0
end
ready = Hash.new(0).compare_by_identity
readable&.each do |io|
ready[io] |= IO::READABLE
end
writable&.each do |io|
ready[io] |= IO::WRITABLE
end
priority&.each do |io|
ready[io] |= IO::PRIORITY
end
ready.each do |io, events|
@waiting.delete(io).dispatch(events) do |waiter|
# Re-schedule the waiting IO:
waiter.tail = @waiting[io]
@waiting[io] = waiter
end
end
return ready.size
end
end
end
end