lib/autoload/kuroko2/command/monitor.rb
require 'open3' module Kuroko2 module Command class Monitor NUM_FAILURES = 15 def initialize(hostname:, worker_id:) @hostname = hostname @worker_id = worker_id @counter = Hash.new(0) @intervals = {} end def execute execution_ids = Worker.on(@hostname).pluck(:execution_id).compact executions = Execution.where(id: execution_ids, mailed_at: nil).started executions.each do |execution| if execution.pid if check_process_absence(execution) && log_memory_consumption?(execution) get_memory_consumption(execution).try do |value| execution.log_memory_consumption(value) end end else check_assignment_delay(execution) end end (@counter.keys - executions.map(&:id)).each do |removable_id| @counter.delete(removable_id) end end def counter_size @counter.size end private # @return [Boolean] true means process is exists def check_process_absence(execution) begin process_num = Process.kill(0, execution.pid) @counter.delete(execution.id) !!process_num rescue Errno::EPERM true rescue Errno::ESRCH if Execution.exists?(execution.id) @counter[execution.id] += 1 message = "(execution.id #{execution.id}) : PID #{execution.pid} not found, increment monitor counter to #{@counter[execution.id]}." Kuroko2.logger.info { message } Kuroko2.logger.info(@counter) # TODO: will remove this logging (for debug only). if @counter[execution.id] >= NUM_FAILURES notify_process_absence(execution) @counter.delete(execution.id) @intervals.delete(execution.id) end else @counter.delete(execution.id) @intervals.delete(execution.id) end false end end def log_memory_consumption?(execution) if @intervals[execution.id] @intervals[execution.id].reached?(Time.current) else # first time @intervals[execution.id] = MemoryConsumptionLog::Interval.new(Time.current) true end end def get_memory_consumption(execution) result = MemorySampler.get_by_pgid(execution.pid) if result @intervals[execution.id] = @intervals[execution.id].next result else nil end end def notify_process_absence(execution) message = "(execution #{execution.uuid}) Deliver notification mail: PID #{execution.pid} is not running." Kuroko2.logger.info { message } execution.job_instance.logs.warn(message) Kuroko2::Notifications.process_absence(execution, @hostname).deliver_now execution.touch(:mailed_at) end def check_assignment_delay(execution) if execution.started_at < 1.minutes.ago message = "(execution #{execution.uuid}) Deliver notification mail: process is not assigned to any job-executor." Kuroko2.logger.info { message } execution.job_instance.logs.warn(message) Kuroko2::Notifications.executor_not_assigned(execution, @hostname).deliver_now execution.touch(:mailed_at) end end end end end