class Async::Wrapper

Represents an asynchronous IO within a reactor.

def cancel_monitor

def cancel_monitor
	if @readable
		readable = @readable
		@readable = nil
		
		readable.resume(Cancelled.new)
	end
	
	if @writable
		writable = @writable
		@writable = nil
		
		writable.resume(Cancelled.new)
	end
	
	if @any
		any = @any
		@any = nil
		
		any.resume(Cancelled.new)
	end
	
	if @monitor
		@monitor.close
		@monitor = nil
	end
end

def close

Close the io and monitor.
def close
	cancel_monitor
	
	@io.close
end

def closed?

def closed?
	@io.closed?
end

def initialize(io, reactor = nil)

Parameters:
  • reactor (Reactor) -- the reactor that is managing this wrapper, or not specified, it's looked up by way of {Task.current}.
  • io () -- the native object to wrap.
def initialize(io, reactor = nil)
	@io = io
	
	@reactor = reactor
	@monitor = nil
	
	@readable = nil
	@writable = nil
	@any = nil
end

def interests

What an abomination.
def interests
	if @any
		return :rw
	elsif @readable
		if @writable
			return :rw
		else
			return :r
		end
	elsif @writable
		return :w
	end
	
	return nil
end

def reactor= reactor

Binding to a reactor is purely a performance consideration. Generally, I don't like APIs that exist only due to optimisations. This is borderline, so consider this functionality semi-private.
Bind this wrapper to a different reactor. Assign nil to convert to an unbound wrapper (can be used from any reactor/task but with slightly increased overhead.)
def reactor= reactor
	return if @reactor.equal?(reactor)
	
	cancel_monitor
	
	@reactor = reactor
end

def resume(*args)

def resume(*args)
	readiness = @monitor.readiness
	
	if @readable and (readiness == :r or readiness == :rw)
		@readable.resume(*args)
	end
	
	if @writable and (readiness == :w or readiness == :rw)
		@writable.resume(*args)
	end
	
	if @any
		@any.resume(*args)
	end
end

def wait_any(duration = nil)

Parameters:
  • duration (Float) -- timeout after the given duration if not `nil`.
def wait_any(duration = nil)
	raise WaitError if @any
	
	self.reactor = Task.current.reactor
	
	begin
		@any = Fiber.current
		wait_for(duration)
	ensure
		@any = nil
		@monitor.interests = interests if @monitor
	end
end

def wait_for(duration)

def wait_for(duration)
	if @monitor
		@monitor.interests = interests
	else
		@monitor = @reactor.register(@io, interests, self)
	end
	
	# If the user requested an explicit timeout for this operation:
	if duration
		@reactor.timeout(duration) do
			begin
				Task.yield
			rescue Async::TimeoutError
				return false
			end
		end
	else
		Task.yield
	end
	
	return true
end

def wait_readable(duration = nil)

Wait for the io to become readable.
def wait_readable(duration = nil)
	raise WaitError if @readable
	
	self.reactor = Task.current.reactor
	
	begin
		@readable = Fiber.current
		wait_for(duration)
	ensure
		@readable = nil
		@monitor.interests = interests if @monitor
	end
end

def wait_writable(duration = nil)

Wait for the io to become writable.
def wait_writable(duration = nil)
	raise WaitError if @writable
	
	self.reactor = Task.current.reactor
	
	begin
		@writable = Fiber.current
		wait_for(duration)
	ensure
		@writable = nil
		@monitor.interests = interests if @monitor
	end
end