class Async::Task
block.
A task represents the state associated with the execution of an asynchronous
def self.current
-
(RuntimeError)
- if task was not {set!} for the current fiber.
Returns:
-
(Async::Task)
-
def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end
def self.current?
-
(Async::Task, nil)
-
def self.current? Thread.current[:async_task] end
def self.yield
- Yield: - result of the task if a block if given.
Raises:
-
(Exception)
- if the result is an exception
Returns:
-
(Object)
- result of the task
def self.yield if block_given? result = yield else result = Fiber.yield end if result.is_a? Exception raise result else return result end end
def bind(io)
-
(Wrapper)
- The wrapped object.
Parameters:
-
io
() -- the native object to bind to this task.
def bind(io) @ios[io.fileno] ||= @reactor.wrap(io, self) end
def close
def close @ios.each_value(&:close) @ios = nil # Attempt to remove this node from the task tree. consume # If this task was being used as a future, signal completion here: if @condition @condition.signal(@result) end end
def finished?
-
(Boolean)
-
def finished? super && @status != :running end
def initialize(ios, reactor)
-
(void)
-
Parameters:
-
reactor
(Async::Reactor
) -- -
ios
(Array
) -- an array of `IO` objects such as `TCPServer`, `Socket`, etc.
def initialize(ios, reactor) if parent = Task.current? super(parent) else super(reactor) end @ios = Hash[ ios.collect{|io| [io.fileno, reactor.wrap(io, self)]} ] @reactor = reactor @status = :running @result = nil @condition = nil @fiber = Fiber.new do set! begin @result = yield(*@ios.values, self) @status = :complete # Async.logger.debug("Task #{self} completed normally.") rescue Interrupt @status = :interrupted # Async.logger.debug("Task #{self} interrupted: #{$!}") rescue Exception => error @result = error @status = :failed # Async.logger.debug("Task #{self} failed: #{$!}") raise ensure # Async.logger.debug("Task #{self} closing: #{$!}") close end end end
def register(io, interests)
-
(NIO::Monitor)
-
Parameters:
-
interests
(Symbol
) -- One of `:r`, `:w` or `:rw`. -
io
(IO
) -- a native io object.
def register(io, interests) @reactor.register(io, interests) end
def result
-
(Object)
-
Raises:
-
(RuntimeError)
- if the task's fiber is the current fiber.
def result raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber) if running? @condition ||= Condition.new @condition.wait else Task.yield {@result} end end
def run
def run @fiber.resume end
def running?
-
(Boolean)
-
def running? @status == :running end
def set!
def set! # This is actually fiber-local: Thread.current[:async_task] = self end
def stop
-
(void)
-
def stop @children.each(&:stop) if @fiber.alive? exception = Interrupt.new("Stop right now!") @fiber.resume(exception) end end
def to_s
- Todo: - (picat) Add test for this method?
def to_s "#{super}[#{@status}]" end
def with(io, *args)
- Yield: - a wrapped object.
def with(io, *args) wrapper = @reactor.wrap(io, self) yield wrapper, *args ensure wrapper.close if wrapper io.close if io end