class Async::Wrapper
Represents an asynchronous IO within a reactor.
def cancel_monitor
def cancel_monitor if @readable readable = @readable @readable = nil readable.resume(Cancelled.new) end if @writable writable = @writable @writable = nil writable.resume(Cancelled.new) end if @any any = @any @any = nil any.resume(Cancelled.new) end if @monitor @monitor.close @monitor = nil end end
def close
def close cancel_monitor @io.close end
def closed?
def closed? @io.closed? end
def dup
def dup self.class.new(@io.dup, @reactor) end
def initialize(io, reactor = nil)
-
reactor
(Reactor
) -- the reactor that is managing this wrapper, or not specified, it's looked up by way of {Task.current}. -
io
() -- the native object to wrap.
def initialize(io, reactor = nil) @io = io @reactor = reactor @monitor = nil @readable = nil @writable = nil @any = nil end
def interests
def interests if @any return :rw elsif @readable if @writable return :rw else return :r end elsif @writable return :w end return nil end
def reactor= reactor
Bind this wrapper to a different reactor. Assign nil to convert to an unbound wrapper (can be used from any reactor/task but with slightly increased overhead.)
def reactor= reactor return if @reactor.equal?(reactor) cancel_monitor @reactor = reactor end
def resume(*args)
def resume(*args) # It's possible that the monitor was closed before calling resume. return unless @monitor readiness = @monitor.readiness if @readable and (readiness == :r or readiness == :rw) @readable.resume(*args) end if @writable and (readiness == :w or readiness == :rw) @writable.resume(*args) end if @any @any.resume(*args) end end
def wait_any(timeout = nil)
-
duration
(Float
) -- timeout after the given duration if not `nil`.
def wait_any(timeout = nil) raise WaitError if @any self.reactor = Task.current.reactor begin @any = Fiber.current wait_for(timeout) ensure @any = nil @monitor.interests = interests if @monitor end end
def wait_for(timeout)
def wait_for(timeout) if @monitor @monitor.interests = interests else @monitor = @reactor.register(@io, interests, self) end # If the user requested an explicit timeout for this operation: if timeout @reactor.with_timeout(timeout) do Task.yield end else Task.yield end return true end
def wait_readable(timeout = nil)
def wait_readable(timeout = nil) raise WaitError if @readable self.reactor = Task.current.reactor begin @readable = Fiber.current wait_for(timeout) ensure @readable = nil @monitor.interests = interests if @monitor end end
def wait_writable(timeout = nil)
def wait_writable(timeout = nil) raise WaitError if @writable self.reactor = Task.current.reactor begin @writable = Fiber.current wait_for(timeout) ensure @writable = nil @monitor.interests = interests if @monitor end end