# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require_relative'logger'require_relative'task'require_relative'wrapper'require'nio'require'timers'require'forwardable'moduleAsyncclassTimeoutError<RuntimeErrorendclassReactor<NodeextendForwardabledefself.run(*args,&block)ifcurrent=Task.current?reactor=current.reactorreactor.async(*args,&block)elsereactor=self.newbeginreactor.run(*args,&block)ensurereactor.closeendreturnreactorendenddefinitialize(wrappers: IO)super(nil)@wrappers=wrappers@selector=NIO::Selector.new@timers=Timers::Group.new@stopped=trueendattr:wrappersattr:stoppeddef_delegators:@timers,:every,:afterdefwrap(io,task)@wrappers[io].new(io,task)enddefwith(io,&block)asyncdo|task|task.with(io,&block)endenddefasync(*ios,&block)task=Task.new(ios,self,&block)# I want to take a moment to explain the logic of this.# When calling an async block, we deterministically execute it until the# first blocking operation. We don't *have* to do this - we could schedule# it for later execution, but it's useful to:# - Fail at the point of call where possible.# - Execute determinstically where possible.# - Avoid overhead if no blocking operation is performed.task.run# Async.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."returntaskenddefregister(*args)@selector.register(*args)enddefstop@stopped=trueenddefrun(*args,&block)raiseRuntimeError,'Reactor has been closed'if@selector.nil?@stopped=false# Allow the user to kick of the initial async tasks.async(*args,&block)ifblock_given?@timers.waitdo|interval|# - nil: no timers# - -ve: timers expired already# - 0: timers ready to fire# - +ve: timers waiting to fireinterval=0ifinterval&&interval<0Async.logger.debug{"[#{self} Pre] Updating #{@children.count} children..."}Async.logger.debug{@children.collect{|child|[child.to_s,child.alive?]}.inspect}# As timeouts may have been updated, and caused fibers to complete, we should check this.# If there is nothing to do, then finish:Async.logger.debug{"[#{self}] @children.empty? = #{@children.empty?} && interval #{interval.inspect}"}returnif@children.empty?&&interval.nil?Async.logger.debug{"Selecting with #{@children.count} fibers interval = #{interval}..."}ifmonitors=@selector.select(interval)monitors.eachdo|monitor|iffiber=monitor.value# Async.logger.debug "Resuming task #{task} due to IO..."fiber.resumeendendendenduntil@stoppedreturnselfensureAsync.logger.debug{"[#{self} Ensure] Exiting run-loop (stopped: #{@stopped} exception: #{$!})..."}Async.logger.debug{@children.collect{|child|[child.to_s,child.alive?]}.inspect}@stopped=trueenddefclose@children.each(&:stop)@selector.close@selector=nilenddefclosed?@selector.nil?enddefsleep(duration)task=Fiber.currenttimer=self.after(duration)doiftask.alive?task.resumeendendTask.yieldensuretimer.canceliftimerenddeftimeout(duration)backtrace=callertask=Fiber.currenttimer=self.after(duration)doiftask.alive?error=TimeoutError.new("execution expired")error.set_backtracebacktracetask.resumeerrorendendyieldensuretimer.canceliftimerendendend