require'thread'require'concurrent/runnable'moduleConcurrentclassSupervisorDEFAULT_MONITOR_INTERVAL=1RESTART_STRATEGIES=[:one_for_one,:one_for_all,:rest_for_one]DEFAULT_MAX_RESTART=5DEFAULT_MAX_TIME=60WORKER_API={run: 0,stop: 0,running?: 0}CHILD_TYPES=[:worker,:supervisor]CHILD_RESTART_OPTIONS=[:permanent,:transient,:temporary]MaxRestartFrequencyError=Class.new(StandardError)WorkerContext=Struct.new(:worker,:type,:restart)doattr_accessor:threadattr_accessor:terminateddefalive?()returnthread&&thread.alive?;enddefneeds_restart?returnfalseifthread&&thread.alive?returnfalseifterminatedcaseself.restartwhen:permanentreturntruewhen:transientreturnthread.nil?||thread.status.nil?else#when :temporaryreturnfalseendendendWorkerCounts=Struct.new(:specs,:supervisors,:workers)doattr_accessor:statusdefadd(context)self.specs+=1self.supervisors+=1ifcontext.type==:supervisorself.workers+=1ifcontext.type==:workerenddefactive()sleeping+running+abortingenddefsleeping()@status.reduce(0){|x,s|x+=(s=='sleep'?1:0)}enddefrunning()@status.reduce(0){|x,s|x+=(s=='run'?1:0)}enddefaborting()@status.reduce(0){|x,s|x+=(s=='aborting'?1:0)}enddefstopped()@status.reduce(0){|x,s|x+=(s==false?1:0)}enddefabend()@status.reduce(0){|x,s|x+=(s.nil??1:0)}endendattr_reader:monitor_intervalattr_reader:restart_strategyattr_reader:max_restartattr_reader:max_timealias_method:strategy,:restart_strategyalias_method:max_r,:max_restartalias_method:max_t,:max_timedefinitialize(opts={})@restart_strategy=opts[:restart_strategy]||opts[:strategy]||:one_for_one@monitor_interval=(opts[:monitor_interval]||DEFAULT_MONITOR_INTERVAL).to_f@max_restart=(opts[:max_restart]||opts[:max_r]||DEFAULT_MAX_RESTART).to_i@max_time=(opts[:max_time]||opts[:max_t]||DEFAULT_MAX_TIME).to_iraiseArgumentError.new(":#{@restart_strategy} is not a valid restart strategy")unlessRESTART_STRATEGIES.include?(@restart_strategy)raiseArgumentError.new(':monitor_interval must be greater than zero')unless@monitor_interval>0.0raiseArgumentError.new(':max_restart must be greater than zero')unless@max_restart>0raiseArgumentError.new(':max_time must be greater than zero')unless@max_time>0@running=false@mutex=Mutex.new@workers=[]@monitor=nil@count=WorkerCounts.new(0,0,0)@restart_times=[]add_worker(opts[:worker])unlessopts[:worker].nil?add_workers(opts[:workers])unlessopts[:workers].nil?enddefrun!@mutex.synchronizedoraiseStandardError.new('already running')if@running@running=true@monitor=Thread.newdoThread.current.abort_on_exception=falsemonitorendendThread.passenddefrun@mutex.synchronizedoraiseStandardError.new('already running')if@running@running=trueendmonitortrueenddefstop@mutex.synchronizedoreturntrueunless@running@running=falseunless@monitor.nil?@monitor.runif@monitor.status=='sleep'if@monitor.join(0.1).nil?@monitor.killend@monitor=nilend@restart_times.clear@workers.length.timesdo|i|context=@workers[-1-i]terminate_worker(context)endprune_workersendtrueenddefrunning?@mutex.synchronize{@running}enddeflength@mutex.synchronize{@workers.length}endalias_method:size,:lengthdefcurrent_restart_count@restart_times.lengthenddefcount@mutex.synchronizedo@count.status=@workers.collect{|w|w.thread?w.thread.status:false}@count.dup.freezeendenddefadd_worker(worker,opts={})returnnilifworker.nil?||!behaves_as_worker?(worker)@mutex.synchronize{restart=opts[:restart]||:permanenttype=opts[:type]||(worker.is_a?(Supervisor)?:supervisor:nil)||:workerraiseArgumentError.new(":#{restart} is not a valid restart option")unlessCHILD_RESTART_OPTIONS.include?(restart)raiseArgumentError.new(":#{type} is not a valid child type")unlessCHILD_TYPES.include?(type)context=WorkerContext.new(worker,type,restart)@workers<<context@count.add(context)worker.runif@runningcontext.object_id}endalias_method:add_child,:add_workerdefadd_workers(workers,opts={})workers.collectdo|worker|add_worker(worker,opts)endendalias_method:add_children,:add_workersdefremove_worker(worker_id)@mutex.synchronizedoindex,context=find_worker(worker_id)break(nil)ifcontext.nil?break(false)ifcontext.alive?@workers.delete_at(index)context.workerendendalias_method:remove_child,:remove_workerdefstop_worker(worker_id)@mutex.synchronizedoreturntrueunless@runningindex,context=find_worker(worker_id)break(nil)ifindex.nil?context.terminated=trueterminate_worker(context)@workers.delete_at(index)if@workers[index].restart==:temporarytrueendendalias_method:stop_child,:stop_workerdefstart_worker(worker_id)@mutex.synchronizedoreturnfalseunless@runningindex,context=find_worker(worker_id)break(nil)ifcontext.nil?context.terminated=falserun_worker(context)unlesscontext.alive?trueendendalias_method:start_child,:start_workerdefrestart_worker(worker_id)@mutex.synchronizedoreturnfalseunless@runningindex,context=find_worker(worker_id)break(nil)ifcontext.nil?break(false)ifcontext.restart==:temporarycontext.terminated=falseterminate_worker(context)run_worker(context)trueendendalias_method:restart_child,:restart_workerprivatedefbehaves_as_worker?(obj)WORKER_API.eachdo|method,arity|break(false)unlessobj.respond_to?(method)&&obj.method(method).arity==aritytrueendenddefmonitor@workers.each{|context|run_worker(context)}loopdosleep(@monitor_interval)breakunlessrunning?@mutex.synchronizedoprune_workersself.send(@restart_strategy)endbreakunlessrunning?endrescueMaxRestartFrequencyError=>exstopenddefrun_worker(context)context.thread=Thread.newdoThread.current.abort_on_exception=falsecontext.worker.runendcontextenddefterminate_worker(context)ifcontext.alive?context.worker.stopThread.passendrescueException=>exbeginThread.kill(context.thread)rescue# suppressendensurecontext.thread=nilenddefprune_workers@workers.delete_if{|w|w.restart==:temporary&&!w.alive?}enddeffind_worker(worker_id)index=@workers.find_index{|worker|worker.object_id==worker_id}ifindex.nil?[nil,nil]else[index,@workers[index]]endenddefexceeded_max_restart_frequency?@restart_times.unshift(Time.now.to_i)diff=(@restart_times.first-@restart_times.last).absif@restart_times.length>=@max_restart&&diff<=@max_timereturntrueelsifdiff>=@max_time@restart_times.popendfalseend#----------------------------------------------------------------# restart strategiesdefone_for_one@workers.eachdo|context|ifcontext.needs_restart?raiseMaxRestartFrequencyErrorifexceeded_max_restart_frequency?run_worker(context)endendenddefone_for_allrestart=falserestart=@workers.eachdo|context|ifcontext.needs_restart?raiseMaxRestartFrequencyErrorifexceeded_max_restart_frequency?break(true)endendifrestart@workers.eachdo|context|terminate_worker(context)end@workers.each{|context|run_worker(context)}endenddefrest_for_onerestart=false@workers.eachdo|context|ifrestartterminate_worker(context)elsifcontext.needs_restart?raiseMaxRestartFrequencyErrorifexceeded_max_restart_frequency?restart=trueendendone_for_oneifrestartendendend