class Async::Task
block.
A task represents the state associated with the execution of an asynchronous
def self.current
-
(RuntimeError)
- if task was not {set!} for the current fiber.
Returns:
-
(Async::Task)
-
def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end
def self.current?
-
(Async::Task, nil)
-
def self.current? Thread.current[:async_task] end
def self.yield
- Yield: - result of the task if a block if given.
Raises:
-
(Exception)
- if the result is an exception
Returns:
-
(Object)
- result of the task
def self.yield if block_given? result = yield else result = Fiber.yield end if result.is_a? Exception raise result else return result end end
def async(*args, **options, &block)
def async(*args, **options, &block) task = Task.new(@reactor, self, **options, &block) task.run(*args) return task end
def complete?
def complete? @status == :complete end
def current?
def current? self.equal?(Thread.current[:async_task]) end
def fail!(exception = nil, propagate = true)
This is a very tricky aspect of tasks to get right. I've modelled it after `Thread` but it's slightly different in that the exception can propagate back up through the reactor. If the user writes code which raises an exception, that exception should always be visible, i.e. cause a failure. If it's not visible, such code fails silently and can be very difficult to debug.
def fail!(exception = nil, propagate = true) @status = :failed @result = exception if propagate raise elsif @finished.nil? # If no one has called wait, we log this as an error: logger.error(self) {$!} else logger.debug(self) {$!} end end
def failed?
def failed? @status == :failed end
def finish!
def finish! # Attempt to remove this node from the task tree. consume # If this task was being used as a future, signal completion here: if @finished @finished.signal(@result) end end
def finished?
-
(Boolean)
-
def finished? super && @status != :running end
def initialize(reactor, parent = Task.current?, logger: nil, &block)
-
parent
(Async::Task
) -- the parent task. -
reactor
(Async::Reactor
) -- the reactor this task will run within.
def initialize(reactor, parent = Task.current?, logger: nil, &block) super(parent || reactor) @reactor = reactor @status = :initialized @result = nil @finished = nil @logger = logger @fiber = make_fiber(&block) end
def logger
def logger @logger ||= @parent&.logger end
def make_fiber(&block)
def make_fiber(&block) Fiber.new do |*args| set! begin @result = yield(self, *args) @status = :complete # logger.debug("Task #{self} completed normally.") rescue Stop stop! rescue StandardError => error fail!(error, false) rescue Exception => exception fail!(exception, true) ensure # logger.debug("Task #{self} closing: #{$!}") finish! end end end
def run(*args)
def run(*args) if @status == :initialized @status = :running @fiber.resume(*args) else raise RuntimeError, "Task already running!" end end
def running?
-
(Boolean)
-
def running? @status == :running end
def set!
def set! # This is actually fiber-local: Thread.current[:async_task] = self end
def stop
-
(void)
-
def stop if self.stopping? # If we are already stopping this task... don't try to stop it again. return true elsif self.running? @status = :stopping if self.current? raise Stop, "Stopping current fiber!" elsif @fiber.alive? @fiber.resume(Stop.new) end end ensure @children&.each(&:stop) end
def stop!
def stop! @status = :stopped end
def stopped?
def stopped? @status == :stopped end
def stopping?
def stopping? @status == :stopping end
def to_s
def to_s "<#{self.description} #{@status}>" end
def wait
-
(Object)
- the final expression/result of the task's block.
Raises:
-
(RuntimeError)
- if the task's fiber is the current fiber.
def wait raise RuntimeError, "Cannot wait on own fiber" if Fiber.current.equal?(@fiber) if running? @finished ||= Condition.new @finished.wait else Task.yield{@result} end end
def yield
def yield reactor.yield end