lib/io/event/selector/select.rb



# Copyright, 2021, by Samuel G. D. Williams. <http://www.codeotaku.com>
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative '../interrupt'

module IO::Event
	module Selector
		class Select
			def initialize(loop)
				@loop = loop
				
				@readable = Hash.new.compare_by_identity
				@writable = Hash.new.compare_by_identity
				
				@blocked = false
				
				@ready = Queue.new
				@interrupt = Interrupt.attach(self)
			end
			
			attr :loop
			
			# If the event loop is currently blocked, 
			def wakeup
				if @blocked
					@interrupt.signal
					
					return true
				end
				
				return false
			end
			
			def close
				@interrupt.close
				
				@loop = nil
				@readable = nil
				@writable = nil
			end
			
			Optional = Struct.new(:fiber) do
				def transfer(*arguments)
					fiber&.transfer(*arguments)
				end
				
				def alive?
					fiber&.alive?
				end
				
				def nullify
					self.fiber = nil
				end
			end
			
			# Transfer from the current fiber to the event loop.
			def transfer
				@loop.transfer
			end
			
			# Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
			def resume(fiber, *arguments)
				optional = Optional.new(Fiber.current)
				@ready.push(optional)
				
				fiber.transfer(*arguments)
			ensure
				optional.nullify
			end
			
			# Yield from the current fiber back to the event loop. Put the current fiber into the ready list.
			def yield
				optional = Optional.new(Fiber.current)
				@ready.push(optional)
				
				@loop.transfer
			ensure
				optional.nullify
			end
			
			# Append the given fiber into the ready list.
			def push(fiber)
				@ready.push(fiber)
			end
			
			# Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
			def raise(fiber, *arguments)
				optional = Optional.new(Fiber.current)
				@ready.push(optional)
				
				fiber.raise(*arguments)
			ensure
				optional.nullify
			end
			
			def ready?
				!@ready.empty?
			end
			
			def io_wait(fiber, io, events)
				remove_readable = remove_writable = false
				
				if (events & IO::READABLE) > 0 or (events & IO::PRIORITY) > 0
					@readable[io] = fiber
					remove_readable = true
				end
				
				if (events & IO::WRITABLE) > 0
					@writable[io] = fiber
					remove_writable = true
				end
				
				@loop.transfer
			ensure
				@readable.delete(io) if remove_readable
				@writable.delete(io) if remove_writable
			end
			
			if IO.const_defined?(:Buffer)
				EAGAIN = Errno::EAGAIN::Errno
				
				def io_read(fiber, io, buffer, length)
					offset = 0
					
					while true
						maximum_size = buffer.size - offset
						
						case result = blocking{io.read_nonblock(maximum_size, exception: false)}
						when :wait_readable
							if length > 0
								self.io_wait(fiber, io, IO::READABLE)
							else
								return -EAGAIN
							end
						when :wait_writable
							if length > 0
								self.io_wait(fiber, io, IO::WRITABLE)
							else
								return -EAGAIN
							end
						else
							break unless result
							
							buffer.copy(result, offset)
							
							size = result.bytesize
							offset += size
							break if size >= length
							length -= size
						end
					end
					
					return offset
				end
				
				def io_write(fiber, io, buffer, length)
					offset = 0
					
					while true
						maximum_size = buffer.size - offset
						
						chunk = buffer.to_str(offset, maximum_size)
						case result = blocking{io.write_nonblock(chunk, exception: false)}
						when :wait_readable
							if length > 0
								self.io_wait(fiber, io, IO::READABLE)
							else
								return -EAGAIN
							end
						when :wait_writable
							if length > 0
								self.io_wait(fiber, io, IO::WRITABLE)
							else
								return -EAGAIN
							end
						else
							offset += result
							break if result >= length
							length -= result
						end
					end
					
					return offset
				end
			end
			
			def process_wait(fiber, pid, flags)
				r, w = IO.pipe
				
				thread = Thread.new do
					Process::Status.wait(pid, flags)
				ensure
					w.close
				end
				
				self.io_wait(fiber, r, IO::READABLE)
				
				return thread.value
			ensure
				r.close
				w.close
				thread&.kill
			end
			
			private def pop_ready
				unless @ready.empty?
					count = @ready.size
					
					count.times do
						fiber = @ready.pop
						fiber.transfer if fiber.alive?
					end
					
					return true
				end
			end
			
			def select(duration = nil)
				if pop_ready
					# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
					duration = 0
				end
				
				@blocked = true
				duration = 0 unless @ready.empty?
				readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration)
				@blocked = false
				
				ready = Hash.new(0)
				
				readable&.each do |io|
					fiber = @readable.delete(io)
					ready[fiber] |= IO::READABLE
				end
				
				writable&.each do |io|
					fiber = @writable.delete(io)
					ready[fiber] |= IO::WRITABLE
				end
				
				ready.each do |fiber, events|
					fiber.transfer(events) if fiber.alive?
				end
				
				return ready.size
			end
			
			private def blocking(&block)
				Fiber.new(blocking: true, &block).resume
			end
		end
	end
end