lib/dependabot/command_helpers.rb
# typed: strict # frozen_string_literal: true require "open3" require "timeout" require "sorbet-runtime" require "shellwords" module Dependabot module CommandHelpers extend T::Sig module TIMEOUTS NO_TIME_OUT = -1 # No timeout LOCAL = 30 # 30 seconds NETWORK = 120 # 2 minutes LONG_RUNNING = 300 # 5 minutes DEFAULT = 900 # 15 minutes end class ProcessStatus extend T::Sig sig { params(process_status: Process::Status, custom_exitstatus: T.nilable(Integer)).void } def initialize(process_status, custom_exitstatus = nil) @process_status = process_status @custom_exitstatus = custom_exitstatus end # Return the exit status, either from the process status or the custom one sig { returns(Integer) } def exitstatus @custom_exitstatus || @process_status.exitstatus || 0 end # Determine if the process was successful sig { returns(T::Boolean) } def success? @custom_exitstatus.nil? ? @process_status.success? || false : @custom_exitstatus.zero? end # Return the PID of the process (if available) sig { returns(T.nilable(Integer)) } def pid @process_status.pid end sig { returns(T.nilable(Integer)) } def termsig @process_status.termsig end # String representation of the status sig { returns(String) } def to_s if @custom_exitstatus "pid #{pid || 'unknown'}: exit #{@custom_exitstatus} (custom status)" else @process_status.to_s end end end # rubocop:disable Metrics/AbcSize # rubocop:disable Metrics/MethodLength # rubocop:disable Metrics/PerceivedComplexity # rubocop:disable Metrics/CyclomaticComplexity sig do params( env_cmd: T::Array[T.any(T::Hash[String, String], String)], stdin_data: T.nilable(String), stderr_to_stdout: T::Boolean, timeout: Integer ).returns([T.nilable(String), T.nilable(String), T.nilable(ProcessStatus), Float]) end def self.capture3_with_timeout( env_cmd, stdin_data: nil, stderr_to_stdout: false, timeout: TIMEOUTS::DEFAULT ) stdout = T.let("", String) stderr = T.let("", String) status = T.let(nil, T.nilable(ProcessStatus)) pid = T.let(nil, T.untyped) start_time = Time.now begin T.unsafe(Open3).popen3(*env_cmd) do |stdin, stdout_io, stderr_io, wait_thr| # rubocop:disable Metrics/BlockLength pid = wait_thr.pid Dependabot.logger.info("Started process PID: #{pid} with command: #{env_cmd.join(' ')}") # Write to stdin if input data is provided stdin&.write(stdin_data) if stdin_data stdin&.close stdout_io.sync = true stderr_io.sync = true # Array to monitor both stdout and stderr ios = [stdout_io, stderr_io] last_output_time = Time.now # Track the last time output was received until ios.empty? if timeout.positive? # Calculate remaining timeout dynamically remaining_timeout = timeout - (Time.now - last_output_time) # Raise an error if timeout is exceeded if remaining_timeout <= 0 Dependabot.logger.warn("Process PID: #{pid} timed out after #{timeout}s. Terminating...") terminate_process(pid) status = ProcessStatus.new(wait_thr.value, 124) raise Timeout::Error, "Timed out due to inactivity after #{timeout} seconds" end end # Use IO.select with a dynamically calculated short timeout ready_ios = IO.select(ios, nil, nil, 0) # Process ready IO streams ready_ios&.first&.each do |io| # 1. Read data from the stream io.set_encoding("BINARY") data = io.read_nonblock(1024) # 2. Force encoding to UTF-8 (for proper conversion) data.force_encoding("UTF-8") # 3. Convert to UTF-8 safely, handling invalid/undefined bytes data = data.encode("UTF-8", invalid: :replace, undef: :replace, replace: "?") # Reset the timeout if data is received last_output_time = Time.now unless data.empty? # 4. Append data to the appropriate stream if io == stdout_io stdout += data else stderr += data unless stderr_to_stdout stdout += data if stderr_to_stdout end rescue EOFError # Remove the stream when EOF is reached ios.delete(io) rescue IO::WaitReadable # Continue when IO is not ready yet next end end status = ProcessStatus.new(wait_thr.value) Dependabot.logger.info("Process PID: #{pid} completed with status: #{status}") end rescue Timeout::Error => e Dependabot.logger.error("Process PID: #{pid} failed due to timeout: #{e.message}") terminate_process(pid) # Append timeout message only to stderr without interfering with stdout stderr += "\n#{e.message}" unless stderr_to_stdout stdout += "\n#{e.message}" if stderr_to_stdout rescue Errno::ENOENT => e Dependabot.logger.error("Command failed: #{e.message}") stderr += e.message unless stderr_to_stdout stdout += e.message if stderr_to_stdout end elapsed_time = Time.now - start_time Dependabot.logger.info("Total execution time: #{elapsed_time.round(2)} seconds") [stdout, stderr, status, elapsed_time] end # rubocop:enable Metrics/AbcSize # rubocop:enable Metrics/MethodLength # rubocop:enable Metrics/PerceivedComplexity # rubocop:enable Metrics/CyclomaticComplexity # Terminate a process by PID sig { params(pid: T.nilable(Integer)).void } def self.terminate_process(pid) return unless pid begin if process_alive?(pid) Process.kill("TERM", pid) # Attempt graceful termination sleep(0.5) # Allow process to terminate end if process_alive?(pid) Process.kill("KILL", pid) # Forcefully kill if still running end rescue Errno::EPERM Dependabot.logger.error("Insufficient permissions to terminate process: #{pid}") ensure begin Process.waitpid(pid) rescue Errno::ESRCH, Errno::ECHILD # Process has already exited end end end # Check if the process is still alive sig { params(pid: T.nilable(Integer)).returns(T::Boolean) } def self.process_alive?(pid) return false if pid.nil? begin Process.kill(0, pid) # Check if the process exists true rescue Errno::ESRCH false rescue Errno::EPERM Dependabot.logger.error("Insufficient permissions to check process: #{pid}") false end end # Escape shell commands to ensure safe execution sig { params(command: String).returns(String) } def self.escape_command(command) command_parts = command.split.map(&:strip).reject(&:empty?) Shellwords.join(command_parts) end end end