lib/autoload/kuroko2/command/shell.rb
require 'open3'
module Kuroko2
module Command
class Shell
MAX_OUTPUT_LENGTH = 60_000
MAX_READ_LENGTH = 1024
def initialize(hostname:, worker_id: 0, worker:, queue: Execution::DEFAULT_QUEUE)
@hostname = hostname
@worker_id = worker_id
@worker = worker
@queue = queue
end
def execute
@worker.reload
return nil if @worker.suspended?
unless @worker.execution_id?
if (execution = Execution.poll(@queue))
do_execute(execution)
execution
end
end
rescue RuntimeError => e
Kuroko2.logger.error("[#{@hostname}-#{@worker_id}] #{e.message}\n" +
e.backtrace.map { |trace| "[#{@hostname}-#{@worker_id}] #{trace}" }.join("\n"))
nil
end
private
def do_execute(execution)
begin
@worker.update_column(:execution_id, execution.id)
execution.update(hostname: @hostname, worker_id: @worker_id)
invoke(execution)
rescue SystemCallError => e
message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{execution.shell}` failed because #{e.class}: #{e.message}"
execution.token.job_instance.logs.warn(message)
Kuroko2.logger.warn(message)
output = truncate_and_escape(e.message)
execution.finish(output: output, exit_status: e.errno)
ensure
@worker.update_column(:execution_id, nil)
end
end
def invoke(execution)
command = execution.shell
env = execution.context.fetch('ENV', {})
message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{command}` run with env (#{env})"
execution.token.job_instance.logs.info(message)
Kuroko2.logger.info(message)
output, status = execute_shell(command, env, execution)
output = truncate_and_escape(output)
if status.signaled?
message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{command}` stopped by #{Signal.signame(status.termsig)} signal (pid #{status.pid})"
execution.token.job_instance.logs.warn(message)
Kuroko2.logger.warn(message)
execution.finish_by_signal(output: output, term_signal: status.termsig)
else
message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{command}` finished with #{status.exitstatus} (pid #{status.pid})"
execution.token.job_instance.logs.info(message)
Kuroko2.logger.info(message)
execution.finish(output: output, exit_status: status.exitstatus)
end
end
def execute_shell(command, env, execution)
opts = { unsetenv_others: true, pgroup: true }
opts[:chdir] = real_path(execution.context['CHDIR']) if execution.context['CHDIR']
launched_time = execution.context['meta'].try(:[], 'launched_time').to_s
job_definition_id = execution.context['meta'].try(:[], 'job_definition_id').to_s
job_definition_name = execution.context['meta'].try(:[], 'job_definition_name').to_s
job_instance_id = execution.context['meta'].try(:[], 'job_instance_id').to_s
env.reverse_merge!(
'HOME' => ENV['HOME'],
'PATH' => ENV['PATH'],
'LANG' => ENV['LANG'],
'KUROKO2_LAUNCHED_TIME' => launched_time,
'KUROKO2_JOB_DEFINITION_ID' => job_definition_id,
'KUROKO2_JOB_DEFINITION_NAME' => job_definition_name,
'KUROKO2_JOB_INSTANCE_ID' => job_instance_id,
)
execution_logger = ExecutionLogger.get_logger(
stream_name: "JOB#{sprintf("%010d", job_definition_id.to_i)}/#{execution.token.job_instance.id}",
)
temporally_path_with(env['PATH']) do
Open3.popen2e(env, command, opts) do |stdin, stdout_and_stderr, thread|
stdin.close
pid = thread.pid
execution.update(pid: pid)
reader = Thread.new do
begin
output = ''
stdout_and_stderr.each do |data|
output << data
begin
execution_logger.send_log(
{
uuid: execution.uuid,
pid: pid,
level: 'NOTICE',
message: truncate_and_escape(data.chomp),
}
)
rescue => e
Kuroko2.logger.error(
"[#{@hostname}-#{@worker_id}] #{e.message}\n " + e.backtrace.join("\n "))
end
end
rescue EOFError
# do nothing
ensure
next output
end
end
status = thread.value # wait until thread is dead
output = reader.value
[output, status]
end
end
end
def real_path(path)
path = Pathname.new(path.sub(/\/\Z/, ''))
Retryable.retryable(tries: 3, sleep: 0.5, on: [Errno::ENOENT]) do
path.realpath
end
end
def temporally_path_with(path)
original_path = ENV['PATH']
ENV['PATH'] = path
yield
ensure
ENV['PATH'] = original_path
end
def truncate_and_escape(str)
str.force_encoding('utf-8')
truncated = str.length > MAX_OUTPUT_LENGTH ? str[0...MAX_OUTPUT_LENGTH] : str
truncated.scrub.each_char.select{ |c| c.bytes.count < 4 }.join('')
end
end
end
end