class Async::Task
@public Since ‘stable-v1`.
Encapsulates the state of a running task and it’s result.
def self.current
@returns [Task]
Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.
def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end
def self.current?
Check if there is a task defined for the current fiber.
def self.current? Thread.current[:async_task] end
def self.yield
- With no replacement.
def self.yield Fiber.scheduler.transfer end
def alive?
def alive? @fiber&.alive? end
def async(*arguments, **options, &block)
def async(*arguments, **options, &block) task = Task.new(self, **options, &block) task.run(*arguments) return task end
def backtrace(*arguments)
def backtrace(*arguments) @fiber&.backtrace(*arguments) end
def complete?
def complete? @status == :complete end
def current?
def current? self.equal?(Thread.current[:async_task]) end
def fail!(exception = false, propagate = true)
def fail!(exception = false, propagate = true) @status = :failed @result = exception if exception if propagate raise exception elsif @finished.nil? # If no one has called wait, we log this as a warning: Console.logger.warn(self, "Task may have ended with unhandled exception.", exception) else Console.logger.debug(self, exception) end end end
def failed?
def failed? @status == :failed end
def finish!
def finish! # Allow the fiber to be recycled. @fiber = nil # Attempt to remove this node from the task tree. consume # If this task was being used as a future, signal completion here: if @finished @finished.signal(@result) end end
def finished?
Whether we can remove this node from the reactor graph.
def finished? super && @fiber.nil? end
def initialize(parent = Task.current?, finished: nil, **options, &block)
@parameter reactor [Reactor] the reactor this task will run within.
Create a new task.
def initialize(parent = Task.current?, finished: nil, **options, &block) super(parent, **options) @status = :initialized @result = nil @finished = finished @block = block @fiber = nil end
def reactor
def reactor self.root end
def run(*arguments)
def run(*arguments) if @status == :initialized @status = :running schedule do @block.call(self, *arguments) end else raise RuntimeError, "Task already running!" end end
def running?
Check if the task is running.
def running? @status == :running end
def schedule(&block)
def schedule(&block) @fiber = Fiber.new do set! begin @result = yield @status = :complete # Console.logger.debug(self) {"Task was completed with #{@children.size} children!"} rescue Stop stop! rescue StandardError => error fail!(error, false) rescue Exception => exception fail!(exception, true) ensure # Console.logger.info(self) {"Task ensure $! = #{$!} with #{@children&.size.inspect} children!"} finish! end end self.root.resume(@fiber) end
def set!
def set! # This is actually fiber-local: Thread.current[:async_task] = self end
def sleep(duration = nil)
- Prefer {Kernel#sleep} except when compatibility with `stable-v1` is required.
def sleep(duration = nil) super end
def stop(later = false)
def stop(later = false) if self.stopped? # If we already stopped this task... don't try to stop it again: return end if self.running? if self.current? if later Fiber.scheduler.push(Stop::Later.new(self)) else raise Stop, "Stopping current task!" end elsif @fiber&.alive? begin Fiber.scheduler.raise(@fiber, Stop) rescue FiberError Fiber.scheduler.push(Stop::Later.new(self)) end end else # We are not running, but children might be, so transition directly into stopped state: stop! end end
def stop!
def stop! # Console.logger.info(self, self.annotation) {"Task was stopped with #{@children&.size.inspect} children!"} @status = :stopped stop_children(true) end
def stopped?
def stopped? @status == :stopped end
def to_s
def to_s "\#<#{self.description} (#{@status})>" end
def wait
@raises[RuntimeError] If the task's fiber is the current fiber.
Retrieve the current result of the task. Will cause the caller to wait until result is available.
def wait raise "Cannot wait on own fiber" if Fiber.current.equal?(@fiber) if running? @finished ||= Condition.new @finished.wait end case @result when Exception raise @result else return @result end end
def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) Fiber.scheduler.with_timeout(duration, exception, message, &block) end
def yield
def yield Fiber.scheduler.yield end