# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require'fiber'require'forwardable'require_relative'node'require_relative'condition'moduleAsync# Raised when a task is explicitly stopped.classStop<Exceptionend# A task represents the state associated with the execution of an asynchronous# block.classTask<NodeextendForwardable# Yield the unerlying `result` for the task. If the result# is an Exception, then that result will be raised an its# exception.# @return [Object] result of the task# @raise [Exception] if the result is an exception# @yield [result] result of the task if a block if given.defself.yieldifblock_given?result=yieldelseresult=Fiber.yieldendifresult.is_a?Exceptionraiseresultelsereturnresultendend# Create a new task.# @param reactor [Async::Reactor] the reactor this task will run within.# @param parent [Async::Task] the parent task.definitialize(reactor,parent=Task.current?)super(parent||reactor)@reactor=reactor@status=:initialized@result=nil@finished=nil@fiber=Fiber.newdo|*args|set!begin@result=yield(self,*args)@status=:complete# Async.logger.debug("Task #{self} completed normally.")rescueStop@status=:stop# Async.logger.debug("Task #{self} stopped: #{$!}")rescueException=>error@result=error@status=:failed# Async.logger.debug("Task #{self} failed: #{$!}")raiseensure# Async.logger.debug("Task #{self} closing: #{$!}")finish!endendenddefto_s"<#{self.description} status=#{@status}>"end# @attr ios [Reactor] The reactor the task was created within.attr:reactordef_delegators:@reactor,:timeout,:sleep# Yield back to the reactor and allow other fibers to execute.defyieldreactor.yieldend# @attr fiber [Fiber] The fiber which is being used for the execution of this task.attr:fiberdef_delegators:@fiber,:alive?# @attr status [Symbol] The status of the execution of the fiber, one of `:running`, `:complete`, `:stopped`, or `:failed`.attr:status# Resume the execution of the task.defrun(*args)if@status==:initialized@status=:running@fiber.resume(*args)elseraiseRuntimeError,"Task already running!"endenddefasync(*args,&block)task=Task.new(@reactor,self,&block)task.run(*args)returntaskend# Retrieve the current result of the task. Will cause the caller to wait until result is available.# @raise [RuntimeError] if the task's fiber is the current fiber.# @return [Object]defresultraiseRuntimeError.new("Cannot wait on own fiber")ifFiber.current.equal?(@fiber)ifrunning?@finished||=Condition.new@finished.waitelseTask.yield{@result}endendaliaswaitresult# Stop the task and all of its children.# @return [void]defstop@children.each(&:stop)if@fiber.alive?@fiber.resume(Stop.new)endend# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.# @return [Async::Task]# @raise [RuntimeError] if task was not {set!} for the current fiber.defself.currentThread.current[:async_task]orraiseRuntimeError,"No async task available!"end# Check if there is a task defined for the current fiber.# @return [Async::Task, nil]defself.current?Thread.current[:async_task]end# Check if the task is running.# @return [Boolean]defrunning?@status==:runningend# Whether we can remove this node from the reactor graph.# @return [Boolean]deffinished?super&&@status!=:runningendprivate# Finish the current task, and all bound bound IO objects.deffinish!# Attempt to remove this node from the task tree.consume# If this task was being used as a future, signal completion here:if@finished@finished.signal(@result)endend# Set the current fiber's `:async_task` to this task.defset!# This is actually fiber-local:Thread.current[:async_task]=selfendendend