class IO::Event::Selector::Select

A pure-Ruby implementation of the event selector.

def again?(errno)

Whether the given error code indicates that the operation should be retried.
def again?(errno)
EAGAIN or errno == EWOULDBLOCK

def blocking(&block)

def blocking(&block)
	fiber = Fiber.new(blocking: true, &block)
	return fiber.resume(fiber)
end

def close

Close the selector and release any resources.
def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

def initialize(loop)

Initialize the selector with the given event loop fiber.
def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
	
	@idle_duration = 0.0
end

def io_read(fiber, io, buffer, length, offset = 0)

@parameter offset [Integer] The offset into the buffer to read to.
@parameter length [Integer] The minimum number of bytes to read.

Read from the given IO to the buffer.
def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.read(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

def io_read(fiber, io, buffer, length, offset = 0)

Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.
def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		maximum_size = buffer.size - offset
		while maximum_size > 0
			result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
			
			if again?(result)
				if length > 0
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result < 0
				return result
			else
				total += result
				offset += result
				break if total >= length
			end
			
			maximum_size = buffer.size - offset
		end
	end
	
	return total
end

def io_read(fiber, _io, buffer, length, offset = 0)

Ruby <= 3.1, limited IO::Buffer support.
def io_read(fiber, _io, buffer, length, offset = 0)
	# We need to avoid any internal buffering, so we use a duplicated IO object:
	io = IO.for_fd(_io.fileno, autoclose: false)
	
	total = 0
	
	maximum_size = buffer.size - offset
	while maximum_size > 0
		case result = blocking{io.read_nonblock(maximum_size, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return EWOULDBLOCK
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return EWOULDBLOCK
			end
		when nil
			break
		else
			buffer.set_string(result, offset)
			
			size = result.bytesize
			total += size
			offset += size
			break if size >= length
			length -= size
		end
		
		maximum_size = buffer.size - offset
	end
	
	return total
rescue IOError => error
	return -Errno::EBADF::Errno
rescue SystemCallError => error
	return -error.errno
end

def io_select(readable, writable, priority, timeout)

@parameter priority [Array(IO)] The list of IO objects to wait for priority events.
@parameter writable [Array(IO)] The list of IO objects to wait for writability.
@parameter readable [Array(IO)] The list of IO objects to wait for readability.

Wait for multiple IO objects to become readable or writable.
def io_select(readable, writable, priority, timeout)
	Thread.new do
		IO.select(readable, writable, priority, timeout)
	end.value
end

def io_wait(fiber, io, events)

@parameter events [Integer] The events to wait for.
@parameter io [IO] The IO object to wait on.
@parameter fiber [Fiber] The fiber that is waiting.

Wait for the given IO to become readable or writable.
def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer
ensure
	waiter&.invalidate
end

def io_write(fiber, io, buffer, length, offset = 0)

@parameter offset [Integer] The offset into the buffer to write from.
@parameter length [Integer] The minimum number of bytes to write.

Write to the given IO from the buffer.
def io_write(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.write(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break result
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

def io_write(fiber, io, buffer, length, offset = 0)

def io_write(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		maximum_size = buffer.size - offset
		while maximum_size > 0
			result = Fiber.blocking{buffer.write(io, maximum_size, offset)}
			
			if again?(result)
				if length > 0
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result < 0
				return result
			else
				total += result
				offset += result
				break if total >= length
			end
			
			maximum_size = buffer.size - offset
		end
	end
	
	return total
end

def io_write(fiber, _io, buffer, length, offset = 0)

def io_write(fiber, _io, buffer, length, offset = 0)
	# We need to avoid any internal buffering, so we use a duplicated IO object:
	io = IO.for_fd(_io.fileno, autoclose: false)
	
	total = 0
	
	maximum_size = buffer.size - offset
	while maximum_size > 0
		chunk = buffer.get_string(offset, maximum_size)
		case result = blocking{io.write_nonblock(chunk, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return EWOULDBLOCK
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return EWOULDBLOCK
			end
		else
			total += result
			offset += result
			break if result >= length
			length -= result
		end
		
		maximum_size = buffer.size - offset
	end
	
	return total
rescue IOError => error
	return -Errno::EBADF::Errno
rescue SystemCallError => error
	return -error.errno
end

def pop_ready

def pop_ready
@ready.empty?
= @ready.size
times do
 = @ready.pop
.transfer if fiber.alive?
 true

def process_wait(fiber, pid, flags)

def process_wait(fiber, pid, flags)
	Thread.new do
		Process::Status.wait(pid, flags)
	end.value
end

def push(fiber)

Append the given fiber into the ready list.
def push(fiber)
	@ready.push(fiber)
end

def raise(fiber, *arguments)

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

def ready?

@returns [Boolean] Whether the ready list is not empty, i.e. there are fibers ready to be resumed.
def ready?
	!@ready.empty?
end

def resume(fiber, *arguments)

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

def select(duration = nil)

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	readable = Array.new
	writable = Array.new
	priority = Array.new
	
	@waiting.each do |io, waiter|
		waiter.each do |fiber, events|
			if (events & IO::READABLE) > 0
				readable << io
			end
			
			if (events & IO::WRITABLE) > 0
				writable << io
			end
			
			if (events & IO::PRIORITY) > 0
				priority << io
			end
		end
	end
	
	duration = 0 unless @ready.empty?
	error = nil
	
	if duration&.>(0)
		start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
	else
		@idle_duration = 0.0
	end
	
	# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
	Thread.handle_interrupt(::Exception => :on_blocking) do
		@blocked = true
		readable, writable, priority = ::IO.select(readable, writable, priority, duration)
	rescue ::Exception => error
		# Requeue below...
	ensure
		@blocked = false
		if start_time
			end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
			@idle_duration = end_time - start_time
		end
	end
	
	if error
		# Requeue the error into the pending exception queue:
		Thread.current.raise(error)
		return 0
	end
	
	ready = Hash.new(0).compare_by_identity
	
	readable&.each do |io|
		ready[io] |= IO::READABLE
	end
	
	writable&.each do |io|
		ready[io] |= IO::WRITABLE
	end
	
	priority&.each do |io|
		ready[io] |= IO::PRIORITY
	end
	
	ready.each do |io, events|
		@waiting.delete(io).dispatch(events) do |waiter|
			# Re-schedule the waiting IO:
			waiter.tail = @waiting[io]
			@waiting[io] = waiter
		end
	end
	
	return ready.size
end

def transfer

Transfer from the current fiber to the event loop.
def transfer
	@loop.transfer
end

def wakeup

Wake up the event loop if it is currently sleeping.
def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

def yield

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.
def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end