lib/selective/ruby/core/controller.rb
require "logger" require "uri" require "json" require "fileutils" require "open3" module Selective module Ruby module Core class Controller @@selective_suppress_reporting = false def initialize(runner, debug: false, log: false) @debug = debug @runner = runner @retries = 0 @runner_id = ENV.fetch("SELECTIVE_RUNNER_ID", generate_runner_id) @logger = init_logger(log) end def start(reconnect: false) @pipe = NamedPipe.new("/tmp/#{runner_id}_2", "/tmp/#{runner_id}_1") @transport_pid = spawn_transport_process(reconnect ? transport_url + "&reconnect=true" : transport_url) handle_termination_signals(transport_pid) run_main_loop rescue NamedPipe::PipeClosedError retry! rescue => e with_error_handling { raise e } end def exec runner.exec rescue => e with_error_handling(include_header: false) { raise e } end def self.suppress_reporting! @@selective_suppress_reporting = true end def self.restore_reporting! @@selective_suppress_reporting = false end def self.suppress_reporting? @@selective_suppress_reporting end private attr_reader :runner, :pipe, :transport_pid, :retries, :logger, :runner_id BUILD_ENV_SCRIPT_PATH = "../../../bin/build_env.sh".freeze def init_logger(enabled) if enabled FileUtils.mkdir_p("log") Logger.new("log/#{runner_id}.log") else Logger.new("/dev/null") end end def run_main_loop loop do message = pipe.read next sleep(0.1) if message.nil? || message.empty? response = JSON.parse(message, symbolize_names: true) @logger.info("Received Command: #{response}") next if handle_command(response) break end end def retry! @retries += 1 with_error_handling { raise "Too many retries" } if retries > 4 puts("Retrying in #{retries} seconds...") sleep(retries) kill_transport pipe.reset! start(reconnect: true) end def write(data) pipe.write JSON.dump(data) end def generate_runner_id "selgen-#{SecureRandom.hex(4)}" end def transport_url @transport_url ||= begin api_key = ENV.fetch("SELECTIVE_API_KEY") host = ENV.fetch("SELECTIVE_HOST", "wss://app.selective.ci") # Validate that host is a valid websocket url(starts with ws:// or wss://) raise "Invalid host: #{host}" unless host.match?(/^wss?:\/\//) run_id = build_env.delete("run_id") run_attempt = build_env.delete("run_attempt") run_attempt = SecureRandom.uuid if run_attempt.nil? || run_attempt.empty? params = { "run_id" => run_id, "run_attempt" => run_attempt, "api_key" => api_key, "runner_id" => runner_id }.merge(metadata: build_env.to_json) query_string = URI.encode_www_form(params) "#{host}/transport/websocket?#{query_string}" end end def build_env @build_env ||= begin result = `#{Pathname.new(__dir__) + BUILD_ENV_SCRIPT_PATH}` JSON.parse(result) end end def spawn_transport_process(url) root_path = Gem.loaded_specs["selective-ruby-core"].full_gem_path transport_path = File.join(root_path, "lib", "bin", "transport") get_transport_path = File.join(root_path, "bin", "get_transport") # The get_transport script is not released with the gem, so this # code is intended for development/CI purposes. if !File.exist?(transport_path) && File.exist?(get_transport_path) output, status = Open3.capture2e(get_transport_path) if !status.success? puts <<~TEXT Failed to download transport binary. #{output} TEXT end end Process.spawn(transport_path, url, runner_id).tap do |pid| Process.detach(pid) end end def handle_termination_signals(pid) ["INT", "TERM"].each do |signal| Signal.trap(signal) do # :nocov: kill_transport(signal: signal) exit # :nocov: end end end def kill_transport(signal: "TERM") begin pipe.write "exit" # Give up to 5 seconds for graceful exit # before killing it below 1..5.times do Process.getpgid(transport_pid) sleep(1) end rescue NamedPipe::PipeClosedError # If the pipe is close, move straight to killing # it forcefully. end # :nocov: Process.kill(signal, transport_pid) # :nocov: rescue Errno::ESRCH # Process already gone noop end def handle_command(response) case response[:command] when "print_notice" print_notice(response[:message]) when "test_manifest" handle_test_manifest when "run_test_cases" handle_run_test_cases(response[:test_case_ids]) when "remove_failed_test_case_result" handle_remove_failed_test_case_result(response[:test_case_id]) when "reconnect" handle_reconnect when "print_message" handle_print_message(response[:message]) when "close" handle_close(response[:exit_status]) # This return is here for the sake of test where # we cannot exit but we need to break the loop return false else raise "Unknown command received: #{response[:command]}" if debug? end true end def handle_reconnect kill_transport pipe.reset! start(reconnect: true) end def handle_test_manifest self.class.restore_reporting! @logger.info("Sending Response: test_manifest") data = {test_cases: runner.manifest["examples"]} data[:modified_test_files] = modified_test_files unless modified_test_files.nil? write({type: "test_manifest", data: data}) end def handle_run_test_cases(test_cases) runner.run_test_cases(test_cases, method(:test_case_callback)) end def test_case_callback(test_case) @logger.info("Sending Response: test_case_result: #{test_case[:id]}") write({type: "test_case_result", data: test_case}) end def handle_remove_failed_test_case_result(test_case_id) runner.remove_failed_test_case_result(test_case_id) end def modified_test_files @modified_test_files ||= begin target_branch = build_env["target_branch"] return [] if target_branch.nil? || target_branch.empty? output, status = Open3.capture2e("git diff #{target_branch} --name-only") if status.success? output.split("\n").filter do |f| f.match?(/^#{runner.base_test_path}/) end end end end def handle_print_message(message) print_warning(message) end def handle_close(exit_status = nil) self.class.restore_reporting! runner.finish unless exit_status.is_a?(Integer) kill_transport pipe.delete_pipes exit(exit_status || runner.exit_status) end def debug? @debug end def with_error_handling(include_header: true) yield rescue => e raise e if debug? header = <<~TEXT An error occurred. Please rerun with --debug and contact support at https://selective.ci/support TEXT unless @banner_displayed header = <<~TEXT #{banner} #{header} TEXT end puts_indented <<~TEXT \e[31m #{header if include_header} #{e.message} \e[0m TEXT exit 1 end def print_warning(message) puts_indented <<~TEXT \e[33m #{message} \e[0m TEXT end def print_notice(message) puts_indented <<~TEXT #{banner} #{message} TEXT end def puts_indented(text) puts text.gsub(/^/, " ") end def banner @banner_displayed = true <<~BANNER ____ _ _ _ / ___| ___| | ___ ___| |_(_)_ _____ \\___ \\ / _ \\ |/ _ \\/ __| __| \\ \\ / / _ \\ ___) | __/ | __/ (__| |_| |\\ V / __/ |____/ \\___|_|\\___|\\___|\\__|_| \\_/ \\___| ________________________________________ BANNER end end end end end