class Utils::ProbeServer
def cmd(job)
def cmd(job) call = [] if ENV.key?('BUNDLE_GEMFILE') and bundle = `which bundle`.full?(:chomp) call << bundle << 'exec' end call.push($0, *job) #output_message "Executing #{call.inspect} now.", type: :info call end
def commands
def commands annotations = self.class.doc_annotations.sort_by(&:first) max_size = annotations.map { |a| a.first.size }.max output_message annotations.map { |n, v| "#{n.to_s.ljust(max_size + 1)}#{v}" } end
def continue
def continue mutex.unlock true rescue ThreadError false end
def env
def env ENV end
def history_clear
def history_clear @history = [] true end
def history_list
def history_list output_message @history end
def initialize(uri)
def initialize(uri) @uri = uri @history = [].freeze @jobs_queue = Queue.new @current_job_id = 0 Thread.new { work_loop } end
def inspect
def inspect "#<Probe #queue=#{@jobs_queue.size}>" end
def job
def job queue_synchronize do @job end end
def job_enqueue(job_args)
def job_enqueue(job_args) job = Job.new(self, job_args) output_message " → #{job.inspect} enqueued.", type: :info @jobs_queue.push job end
def job_kill(signal = :TERM)
def job_kill(signal = :TERM) @pid and Process.kill signal, @pid end
def job_repeat(job_id = @history.last)
def job_repeat(job_id = @history.last) Job === job_id and job_id = job_id.id if old_job = @history.find { |job| job.id == job_id } job_enqueue old_job.args true else false end end
def jobs_clear
def jobs_clear queue_synchronize do unless @jobs_queue.empty? @jobs_queue.clear output_message "Cleared all queued jobs.", type: :warn true else false end end end
def jobs_list
def jobs_list output_message @jobs_queue.instance_variable_get(:@que) end
def mutex
def mutex @jobs_queue.instance_variable_get(:@mutex) end
def next_job_id
def next_job_id @current_job_id += 1 end
def output_message(msg, type: nil)
def output_message(msg, type: nil) msg.respond_to?(:to_a) and msg = msg.to_a * "\n" msg = case type when :success msg.on_color(22).white when :info msg.on_color(20).white when :warn msg.on_color(40).white when :failure msg.on_color(124).blink.white else msg end STDOUT.puts msg STDOUT.flush self end
def pause
def pause mutex.lock true rescue ThreadError false end
def queue_synchronize(&block)
def queue_synchronize(&block) mutex.synchronize(&block) end
def run_job(job)
def run_job(job) @pid = fork { exec(*cmd(job.args)) } output_message " → #{job.inspect} now running with pid #@pid.", type: :info Process.wait @pid message = " → #{job.inspect} was just run" if $?.success? job.ok = true message << " successfully." output_message message, type: :success else job.ok = false message << " and failed with exit status #{$?.exitstatus}!" output_message message, type: :failure end @history += [ @job.freeze ] @history.freeze @job = nil end
def shutdown
def shutdown output_message "Server was shutdown down – HARD!", type: :warn exit! 23 end
def start
def start output_message "Starting probe server listening to #{@uri.inspect}.", type: :info DRb.start_service(@uri, self) begin DRb.thread.join rescue Interrupt ARGV.clear << '-f' output_message %{\nEntering interactive mode: Type "commands" to get help for the commands.}, type: :info begin old, $VERBOSE = $VERBOSE, nil examine(self) ensure $VERBOSE = old end output_message "Quitting interactive mode, but still listening to #{@uri.inspect}.", type: :info retry end end
def work_loop
def work_loop loop do @job = @jobs_queue.shift run_job @job end end