class Async::Task
block.
A task represents the state associated with the execution of an asynchronous
def self.current
-
(RuntimeError)
- if task was not {set!} for the current fiber.
Returns:
-
(Async::Task)
-
def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end
def self.current?
-
(Async::Task, nil)
-
def self.current? Thread.current[:async_task] end
def self.yield
- Yield: - result of the task if a block if given.
Raises:
-
(Exception)
- if the result is an exception
Returns:
-
(Object)
- result of the task
def self.yield if block_given? result = yield else result = Fiber.yield end if result.is_a? Exception raise result else return result end end
def async(*args, &block)
def async(*args, &block) task = Task.new(@reactor, self, &block) task.run(*args) return task end
def finish!
def finish! # Attempt to remove this node from the task tree. consume # If this task was being used as a future, signal completion here: if @condition @condition.signal(@result) end end
def finished?
-
(Boolean)
-
def finished? super && @status != :running end
def initialize(reactor, parent = Task.current?)
-
parent
(Async::Task
) -- the parent task. -
reactor
(Async::Reactor
) -- the reactor this task will run within.
def initialize(reactor, parent = Task.current?) super(parent || reactor) @reactor = reactor @status = :initialized @result = nil @condition = nil @fiber = Fiber.new do |args| set! begin @result = yield(self, *args) @status = :complete # Async.logger.debug("Task #{self} completed normally.") rescue Interrupt @status = :interrupted # Async.logger.debug("Task #{self} interrupted: #{$!}") rescue Exception => error @result = error @status = :failed # Async.logger.debug("Task #{self} failed: #{$!}") raise ensure # Async.logger.debug("Task #{self} closing: #{$!}") finish! end end end
def result
-
(Object)
-
Raises:
-
(RuntimeError)
- if the task's fiber is the current fiber.
def result raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber) if running? @condition ||= Condition.new @condition.wait else Task.yield {@result} end end
def run(*args)
def run(*args) if @status == :initialized @status = :running @fiber.resume(*args) else raise RuntimeError, "Task already running!" end end
def running?
-
(Boolean)
-
def running? @status == :running end
def set!
def set! # This is actually fiber-local: Thread.current[:async_task] = self end
def stop
-
(void)
-
def stop @children.each(&:stop) if @fiber.alive? exception = Interrupt.new("Stop right now!") @fiber.resume(exception) end end
def to_s
def to_s "<#{self.description} status=#{@status}>" end