class Async::Task

block.
A task represents the state associated with the execution of an asynchronous

def self.current

Raises:
  • (RuntimeError) - if task was not {set!} for the current fiber.

Returns:
  • (Async::Task) -
def self.current
	Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end

def self.current?

Returns:
  • (Async::Task, nil) -
def self.current?
	Thread.current[:async_task]
end

def self.yield

Other tags:
    Yield: - result of the task if a block if given.

Raises:
  • (Exception) - if the result is an exception

Returns:
  • (Object) - result of the task
def self.yield
	if block_given?
		result = yield
	else
		result = Fiber.yield
	end
	
	if result.is_a? Exception
		raise result
	else
		return result
	end
end

def async(*args, **options, &block)

def async(*args, **options, &block)
	task = Task.new(@reactor, self, **options, &block)
	
	task.run(*args)
	
	return task
end

def complete?

def complete?
	@status == :complete
end

def current?

def current?
	self.equal?(Thread.current[:async_task])
end

def fail!(exception = nil, propagate = true)

As an explcit choice, the user can start a task which doesn't propagate exceptions. This only applies to `StandardError` and derived tasks. This allows tasks to internally capture their error state which is raised when invoking `Task#result` similar to how `Thread#join` works. This mode makes `Async::Task` behave more like a promise, and you would need to ensure that someone calls `Task#result` otherwise you might miss important errors.
This is a very tricky aspect of tasks to get right. I've modelled it after `Thread` but it's slightly different in that the exception can propagate back up through the reactor. If the user writes code which raises an exception, that exception should always be visible, i.e. cause a failure. If it's not visible, such code fails silently and can be very difficult to debug.
def fail!(exception = nil, propagate = true)
	@status = :failed
	@result = exception
	
	if propagate
		raise
	elsif @finished.nil?
		# If no one has called wait, we log this as an error:
		logger.error(self) {$!}
	else
		logger.debug(self) {$!}
	end
end

def failed?

def failed?
	@status == :failed
end

def finish!

Finish the current task, and all bound bound IO objects.
def finish!
	# Attempt to remove this node from the task tree.
	consume
	
	# If this task was being used as a future, signal completion here:
	if @finished
		@finished.signal(@result)
	end
end

def finished?

Returns:
  • (Boolean) -
def finished?
	super && @status != :running
end

def initialize(reactor, parent = Task.current?, logger: nil, &block)

Parameters:
  • parent (Async::Task) -- the parent task.
  • reactor (Async::Reactor) -- the reactor this task will run within.
def initialize(reactor, parent = Task.current?, logger: nil, &block)
	super(parent || reactor)
	
	@reactor = reactor
	
	@status = :initialized
	@result = nil
	@finished = nil
	
	@logger = logger
	
	@fiber = make_fiber(&block)
end

def logger

def logger
	@logger ||= @parent&.logger
end

def make_fiber(&block)

def make_fiber(&block)
	Fiber.new do |*args|
		set!
		
		begin
			@result = yield(self, *args)
			@status = :complete
			# logger.debug("Task #{self} completed normally.")
		rescue Stop
			stop!
		rescue StandardError => error
			fail!(error, false)
		rescue Exception => exception
			fail!(exception, true)
		ensure
			# logger.debug("Task #{self} closing: #{$!}")
			finish!
		end
	end
end

def run(*args)

Begin the execution of the task.
def run(*args)
	if @status == :initialized
		@status = :running
		@fiber.resume(*args)
	else
		raise RuntimeError, "Task already running!"
	end
end

def running?

Returns:
  • (Boolean) -
def running?
	@status == :running
end

def set!

Set the current fiber's `:async_task` to this task.
def set!
	# This is actually fiber-local:
	Thread.current[:async_task] = self
end

def stop

Returns:
  • (void) -
def stop
	if self.stopping?
		# If we are already stopping this task... don't try to stop it again.
		return true
	elsif self.running?
		@status = :stopping
		
		if self.current?
			raise Stop, "Stopping current fiber!"
		elsif @fiber.alive?
			@fiber.resume(Stop.new)
		end
	end
ensure
	@children&.each(&:stop)
end

def stop!

def stop!
	@status = :stopped
end

def stopped?

def stopped?
	@status == :stopped
end

def stopping?

def stopping?
	@status == :stopping
end

def to_s

def to_s
	"<#{self.description} #{@status}>"
end

def wait

Returns:
  • (Object) - the final expression/result of the task's block.

Raises:
  • (RuntimeError) - if the task's fiber is the current fiber.
def wait
	raise RuntimeError, "Cannot wait on own fiber" if Fiber.current.equal?(@fiber)
	
	if running?
		@finished ||= Condition.new
		@finished.wait
	else
		Task.yield{@result}
	end
end

def yield

Yield back to the reactor and allow other fibers to execute.
def yield
	reactor.yield
end