class Toys::Utils::Exec::Executor
@private
An object that manages the execution of a subcommand
#
def capture_stream_thread(key)
def capture_stream_thread(key) stream = make_out_pipe(key) @join_threads << ::Thread.new do begin @captures[key] = stream.read ensure stream.close end end end
def copy_from_out_thread(key, io)
def copy_from_out_thread(key, io) stream = make_out_pipe(key) @join_threads << ::Thread.new do begin ::IO.copy_stream(stream, io) ensure stream.close io.close end end end
def copy_to_in_thread(io)
def copy_to_in_thread(io) stream = make_in_pipe @join_threads << ::Thread.new do begin ::IO.copy_stream(io, stream) ensure stream.close io.close end end end
def execute
def execute setup_in_stream setup_out_stream(:out) setup_out_stream(:err) log_command controller = start_with_controller return controller if @config_opts[:background] begin @block&.call(controller) ensure controller.close_streams end controller.result end
def initialize(exec_opts, spawn_cmd, block)
def initialize(exec_opts, spawn_cmd, block) @fork_func = spawn_cmd.respond_to?(:call) ? spawn_cmd : nil @spawn_cmd = spawn_cmd.respond_to?(:call) ? nil : spawn_cmd @config_opts = exec_opts.config_opts @spawn_opts = exec_opts.spawn_opts @captures = {} @controller_streams = {} @join_threads = [] @child_streams = [] @parent_streams = [] @block = block @default_stream = @config_opts[:background] ? :null : :inherit end
def interpret_in_array(setting)
def interpret_in_array(setting) case setting.first when ::Symbol setup_in_stream_of_type(setting.first, setting[1..-1]) when ::String setup_in_stream_of_type(:file, setting) else raise "Unknown value for in: #{setting.inspect}" end end
def interpret_in_file(args)
def interpret_in_file(args) raise "Expected only file name" unless args.size == 1 && args.first.is_a?(::String) @spawn_opts[:in] = args + [::File::RDONLY] end
def interpret_in_io(setting)
def interpret_in_io(setting) if setting.fileno.is_a?(::Integer) setup_in_stream_of_type(:parent, [setting.fileno]) else setup_in_stream_of_type(:copy_io, [setting]) end end
def interpret_out_array(key, setting)
def interpret_out_array(key, setting) case setting.first when ::Symbol setup_out_stream_of_type(key, setting.first, setting[1..-1]) when ::String setup_out_stream_of_type(key, :file, setting) else raise "Unknown value for #{key}: #{setting.inspect}" end end
def interpret_out_array_within_fork(stream)
def interpret_out_array_within_fork(stream) if stream.first == :child if stream[1] == :err $stderr elsif stream[1] == :out $stdout end else ::File.open(*stream) end end
def interpret_out_file(key, args)
def interpret_out_file(key, args) raise "Expected file name" if args.empty? || !args.first.is_a?(::String) raise "Too many file arguments" if args.size > 3 @spawn_opts[key] = args.size == 1 ? args.first : args end
def interpret_out_io(key, setting)
def interpret_out_io(key, setting) if setting.fileno.is_a?(::Integer) setup_out_stream_of_type(key, :parent, [setting.fileno]) else setup_out_stream_of_type(key, :copy_io, [setting]) end end
def log_command
def log_command logger = @config_opts[:logger] if logger && @config_opts[:log_level] != false cmd_str = @config_opts[:log_cmd] cmd_str ||= @spawn_cmd.size == 1 ? @spawn_cmd.first : @spawn_cmd.inspect if @spawn_cmd logger.add(@config_opts[:log_level] || ::Logger::INFO, cmd_str) if cmd_str end end
def make_in_pipe
def make_in_pipe r, w = ::IO.pipe @spawn_opts[:in] = r @child_streams << r @parent_streams << w w.sync = true w end
def make_null_stream(key, mode)
def make_null_stream(key, mode) f = ::File.open(::File::NULL, mode) @spawn_opts[key] = f @child_streams << f end
def make_out_pipe(key)
def make_out_pipe(key) r, w = ::IO.pipe @spawn_opts[key] = w @child_streams << w @parent_streams << r r end
def run_fork_func
def run_fork_func catch(:result) do if @spawn_opts[:chdir] ::Dir.chdir(@spawn_opts[:chdir]) { @fork_func.call(@config_opts) } else @fork_func.call(@config_opts) end 0 end end
def setup_env_within_fork
def setup_env_within_fork if @config_opts[:unsetenv_others] ::ENV.each_key do |k| ::ENV.delete(k) unless @config_opts.key?(k) end end (@config_opts[:env] || {}).each { |k, v| ::ENV[k.to_s] = v.to_s } end
def setup_in_stream
def setup_in_stream setting = @config_opts[:in] || @default_stream return unless setting case setting when ::Symbol setup_in_stream_of_type(setting, []) when ::Integer setup_in_stream_of_type(:parent, [setting]) when ::String setup_in_stream_of_type(:file, [setting]) when ::IO, ::StringIO interpret_in_io(setting) when ::Array interpret_in_array(setting) else raise "Unknown value for in: #{setting.inspect}" end end
def setup_in_stream_of_type(type, args)
def setup_in_stream_of_type(type, args) case type when :controller @controller_streams[:in] = make_in_pipe when :null make_null_stream(:in, "r") when :inherit @spawn_opts[:in] = :in when :close @spawn_opts[:in] = type when :parent @spawn_opts[:in] = args.first when :child @spawn_opts[:in] = [:child, args.first] when :string write_string_thread(args.first.to_s) when :copy_io copy_to_in_thread(args.first) when :file interpret_in_file(args) else raise "Unknown type for in: #{type.inspect}" end end
def setup_in_stream_within_fork(stream, stdstream)
def setup_in_stream_within_fork(stream, stdstream) in_stream = case stream when ::Integer ::IO.open(stream) when ::Array ::File.open(*stream) when ::String ::File.open(stream, "r") when :close :close else stream if stream.respond_to?(:write) end if in_stream == :close stdstream.close elsif in_stream stdstream.reopen(in_stream) end end
def setup_out_stream(key)
def setup_out_stream(key) setting = @config_opts[key] || @default_stream case setting when ::Symbol setup_out_stream_of_type(key, setting, []) when ::Integer setup_out_stream_of_type(key, :parent, [setting]) when ::String setup_out_stream_of_type(key, :file, [setting]) when ::IO, ::StringIO interpret_out_io(key, setting) when ::Array interpret_out_array(key, setting) else raise "Unknown value for #{key}: #{setting.inspect}" end end
def setup_out_stream_of_type(key, type, args)
def setup_out_stream_of_type(key, type, args) case type when :controller @controller_streams[key] = make_out_pipe(key) when :null make_null_stream(key, "w") when :inherit @spawn_opts[key] = key when :close, :out, :err @spawn_opts[key] = type when :parent @spawn_opts[key] = args.first when :child @spawn_opts[key] = [:child, args.first] when :capture capture_stream_thread(key) when :copy_io copy_from_out_thread(key, args.first) when :file interpret_out_file(key, args) else raise "Unknown type for #{key}: #{type.inspect}" end end
def setup_out_stream_within_fork(stream, stdstream)
def setup_out_stream_within_fork(stream, stdstream) out_stream = case stream when ::Integer ::IO.open(stream) when ::Array interpret_out_array_within_fork(stream) when ::String ::File.open(stream, "w") when :close :close else stream if stream.respond_to?(:write) end if out_stream == :close stdstream.close elsif out_stream stdstream.reopen(out_stream) stdstream.sync = true end end
def setup_streams_within_fork
def setup_streams_within_fork @parent_streams.each(&:close) setup_in_stream_within_fork(@spawn_opts[:in], $stdin) setup_out_stream_within_fork(@spawn_opts[:out], $stdout) setup_out_stream_within_fork(@spawn_opts[:err], $stderr) end
def start_fork
def start_fork pid = ::Process.fork return pid unless pid.nil? exit_code = -1 begin setup_env_within_fork setup_streams_within_fork exit_code = run_fork_func rescue ::SystemExit => e exit_code = e.status rescue ::Exception => e # rubocop:disable Lint/RescueException warn(([e.inspect] + e.backtrace).join("\n")) ensure ::Kernel.exit!(exit_code) end end
def start_process
def start_process args = [] args << @config_opts[:env] if @config_opts[:env] args.concat(@spawn_cmd) ::Process.spawn(*args, @spawn_opts) end
def start_with_controller
def start_with_controller pid = begin @fork_func ? start_fork : start_process rescue ::StandardError => e e end @child_streams.each(&:close) Controller.new(@config_opts[:name], @controller_streams, @captures, pid, @join_threads, @config_opts[:result_callback]) end
def write_string_thread(string)
def write_string_thread(string) stream = make_in_pipe @join_threads << ::Thread.new do begin stream.write string ensure stream.close end end end