class Async::Task

block.
A task represents the state associated with the execution of an asynchronous

def self.current

Raises:
  • (RuntimeError) - if task was not {set!} for the current fiber.

Returns:
  • (Async::Task) -
def self.current
	Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end

def self.current?

Returns:
  • (Async::Task, nil) -
def self.current?
	Thread.current[:async_task]
end

def self.yield

Other tags:
    Yield: - result of the task if a block if given.

Raises:
  • (Exception) - if the result is an exception

Returns:
  • (Object) - result of the task
def self.yield
	if block_given?
		result = yield
	else
		result = Fiber.yield
	end
	
	if result.is_a? Exception
		raise result
	else
		return result
	end
end

def bind(io)

Returns:
  • (Wrapper) - The wrapped object.

Parameters:
  • io () -- the native object to bind to this task.
def bind(io)
	@ios[io.fileno] ||= @reactor.wrap(io, self)
end

def close

Close all bound IO objects.
def close
	@ios.each_value(&:close)
	@ios = nil
	
	# Attempt to remove this node from the task tree.
	consume
	
	# If this task was being used as a future, signal completion here:
	if @condition
		@condition.signal(@result)
	end
end

def finished?

Returns:
  • (Boolean) -
def finished?
	super && @status != :running
end

def initialize(ios, reactor)

Returns:
  • (void) -

Parameters:
  • reactor (Async::Reactor) --
  • ios (Array) -- an array of `IO` objects such as `TCPServer`, `Socket`, etc.
def initialize(ios, reactor)
	if parent = Task.current?
		super(parent)
	else
		super(reactor)
	end
	
	@ios = Hash[
		ios.collect{|io| [io.fileno, reactor.wrap(io, self)]}
	]
	
	@reactor = reactor
	
	@status = :running
	@result = nil
	
	@condition = nil
	
	@fiber = Fiber.new do
		set!
		
		begin
			@result = yield(*@ios.values, self)
			@status = :complete
			# Async.logger.debug("Task #{self} completed normally.")
		rescue Interrupt
			@status = :interrupted
			# Async.logger.debug("Task #{self} interrupted: #{$!}")
		rescue Exception => error
			@result = error
			@status = :failed
			# Async.logger.debug("Task #{self} failed: #{$!}")
			raise
		ensure
			# Async.logger.debug("Task #{self} closing: #{$!}")
			close
		end
	end
end

def register(io, interests)

Returns:
  • (NIO::Monitor) -

Parameters:
  • interests (Symbol) -- One of `:r`, `:w` or `:rw`.
  • io (IO) -- a native io object.
def register(io, interests)
	@reactor.register(io, interests)
end

def result

Returns:
  • (Object) -

Raises:
  • (RuntimeError) - if the task's fiber is the current fiber.
def result
	raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber)
	
	if running?
		@condition ||= Condition.new
		@condition.wait
	else
		Task.yield {@result}
	end
end

def run

Resume the execution of the task.
def run
	@fiber.resume
end

def running?

Returns:
  • (Boolean) -
def running?
	@status == :running
end

def set!

Set the current fiber's `:async_task` to this task.
def set!
	# This is actually fiber-local:
	Thread.current[:async_task] = self
end

def stop

Returns:
  • (void) -
def stop
	@children.each(&:stop)
	
	if @fiber.alive?
		exception = Interrupt.new("Stop right now!")
		@fiber.resume(exception)
	end
end

def to_s

Other tags:
    Todo: - (picat) Add test for this method?
def to_s
	"#{super}[#{@status}]"
end

def with(io, *args)

Other tags:
    Yield: - a wrapped object.
def with(io, *args)
	wrapper = @reactor.wrap(io, self)
	yield wrapper, *args
ensure
	wrapper.close if wrapper
	io.close if io
end