class Dependabot::Uv::FileUpdater::CompileFileUpdater
rubocop:disable Metrics/ClassLength
def compilation_error?(error)
def compilation_error?(error) error.message.include?(NATIVE_COMPILATION_ERROR) end
def compile_file(filename)
def compile_file(filename) # Shell out to pip-compile, generate a new set of requirements. # This is slow, as pip-compile needs to do installs. options = compile_options(filename) options_fingerprint = compile_options_fingerprint(options) name_part = "pyenv exec uv pip compile " \ "#{options} -P " \ "#{dependency.name}" fingerprint_name_part = "pyenv exec uv pip compile " \ "#{options_fingerprint} -P " \ "<dependency_name>" version_part = "#{dependency.version} #{filename}" fingerprint_version_part = "<dependency_version> <filename>" # Don't escape pyenv `dep-name==version` syntax run_uv_compile_command( "#{SharedHelpers.escape_command(name_part)}==" \ "#{SharedHelpers.escape_command(version_part)}", allow_unsafe_shell_command: true, fingerprint: "#{fingerprint_name_part}==#{fingerprint_version_part}" ) rescue SharedHelpers::HelperSubprocessFailed => e retry_count ||= 0 retry_count += 1 if compilation_error?(e) && retry_count <= 1 @build_isolation = false retry end raise end
def compile_files
def compile_files dependency_files.select { |f| f.name.end_with?(".in") } end
def compile_index_options
def compile_index_options credentials .select { |cred| cred["type"] == "python_index" } .map do |cred| authed_url = AuthedUrlBuilder.authed_url(credential: cred) if cred.replaces_base? "--index-url=#{authed_url}" else "--extra-index-url=#{authed_url}" end end end
def compile_new_requirement_files
def compile_new_requirement_files SharedHelpers.in_a_temporary_directory do write_updated_dependency_files language_version_manager.install_required_python filenames_to_compile.each do |filename| compile_file(filename) end # Remove any .python-version file before parsing the reqs FileUtils.remove_entry(".python-version", true) dependency_files.filter_map do |file| next unless file.name.end_with?(".txt") updated_content = File.read(file.name) updated_content = post_process_compiled_file(updated_content, file) next if updated_content == file.content file.dup.tap { |f| f.content = updated_content } end end end
def compile_options(filename)
def compile_options(filename) options = @build_isolation ? ["--build-isolation"] : ["--no-build-isolation"] options += compile_index_options if (requirements_file = compiled_file_for_filename(filename)) options += uv_compile_options_from_compiled_file(requirements_file) end options.join(" ") end
def compile_options_fingerprint(options)
def compile_options_fingerprint(options) options.sub( /--output-file=\S+/, "--output-file=<output_file>" ).sub( /--index-url=\S+/, "--index-url=<index_url>" ).sub( /--extra-index-url=\S+/, "--extra-index-url=<extra_index_url>" ) end
def compiled_file_for_filename(filename)
def compiled_file_for_filename(filename) compiled_file = compiled_files .find { |f| f.content.match?(output_file_regex(filename)) } compiled_file ||= compiled_files .find { |f| f.name == filename.gsub(/\.in$/, ".txt") } compiled_file end
def compiled_file_includes_dependency?(compiled_file)
def compiled_file_includes_dependency?(compiled_file) return false unless compiled_file regex = RequirementParser::INSTALL_REQ_WITH_REQUIREMENT matches = [] compiled_file.content.scan(regex) { matches << Regexp.last_match } matches.any? { |m| normalise(m[:name]) == dependency.name } end
def compiled_files
def compiled_files dependency_files.select { |f| f.name.end_with?(".txt") } end
def dependency
def dependency # For now, we'll only ever be updating a single dependency dependencies.first end
def deps_to_augment_hashes_for(updated_content, original_content)
def deps_to_augment_hashes_for(updated_content, original_content) regex = /^#{RequirementParser::INSTALL_REQ_WITH_REQUIREMENT}/o new_matches = [] updated_content.scan(regex) { new_matches << Regexp.last_match } old_matches = [] original_content.scan(regex) { old_matches << Regexp.last_match } new_deps = [] changed_hashes_deps = [] new_matches.each do |mtch| nm = mtch.named_captures["name"] old_match = old_matches.find { |m| m.named_captures["name"] == nm } next new_deps << mtch unless old_match next unless old_match.named_captures["hashes"] old_count = old_match.named_captures["hashes"].split("--hash").count new_count = mtch.named_captures["hashes"].split("--hash").count changed_hashes_deps << mtch if new_count < old_count end return [] if changed_hashes_deps.none? [*new_deps, *changed_hashes_deps] end
def fetch_updated_dependency_files
def fetch_updated_dependency_files updated_compiled_files = compile_new_requirement_files updated_manifest_files = update_manifest_files updated_files = updated_compiled_files + updated_manifest_files updated_uncompiled_files = update_uncompiled_files(updated_files) [ *updated_manifest_files, *updated_compiled_files, *updated_uncompiled_files ] end
def filenames_to_compile
def filenames_to_compile files_from_reqs = dependency.requirements .map { |r| r[:file] } .select { |fn| fn.end_with?(".in") } files_from_compiled_files = compile_files.map(&:name).select do |fn| compiled_file = compiled_file_for_filename(fn) compiled_file_includes_dependency?(compiled_file) end filenames = [*files_from_reqs, *files_from_compiled_files].uniq order_filenames_for_compilation(filenames) end
def freeze_dependency_requirement(file)
def freeze_dependency_requirement(file) return file.content unless file.name.end_with?(".in") old_req = dependency.previous_requirements .find { |r| r[:file] == file.name } return file.content unless old_req return file.content if old_req == "==#{dependency.version}" RequirementReplacer.new( content: file.content, dependency_name: dependency.name, old_requirement: old_req[:requirement], new_requirement: "==#{dependency.version}", index_urls: @index_urls ).updated_content end
def hash_separator(requirement_string)
def hash_separator(requirement_string) hash_regex = RequirementParser::HASH return unless requirement_string.match?(hash_regex) current_separator = requirement_string .match(/#{hash_regex}((?<separator>\s*\\?\s*?)#{hash_regex})*/) .named_captures.fetch("separator") default_separator = requirement_string .match(RequirementParser::HASH) .pre_match.match(/(?<separator>\s*\\?\s*?)\z/) .named_captures.fetch("separator") current_separator || default_separator end
def includes_unsafe_packages?(content)
def includes_unsafe_packages?(content) UNSAFE_PACKAGES.any? { |n| content.match?(/^#{Regexp.quote(n)}==/) } end
def initialize(dependencies:, dependency_files:, credentials:, index_urls: nil)
def initialize(dependencies:, dependency_files:, credentials:, index_urls: nil) @dependencies = dependencies @dependency_files = dependency_files @credentials = credentials @index_urls = index_urls @build_isolation = true end
def language_version_manager
def language_version_manager @language_version_manager ||= LanguageVersionManager.new( python_requirement_parser: python_requirement_parser ) end
def normalise(name)
def normalise(name) NameNormaliser.normalise(name) end
def order_filenames_for_compilation(filenames)
If the files we need to update require one another then we need to
def order_filenames_for_compilation(filenames) ordered_filenames = T.let([], T::Array[String]) while (remaining_filenames = filenames - ordered_filenames).any? ordered_filenames += remaining_filenames .reject do |fn| unupdated_reqs = requirement_map[fn] - ordered_filenames unupdated_reqs.intersect?(filenames) end end ordered_filenames end
def output_file_regex(filename)
def output_file_regex(filename) "--output-file[=\s]+.*\s#{Regexp.escape(filename)}\s*$" end
def package_hashes_for(name:, version:, algorithm:)
def package_hashes_for(name:, version:, algorithm:) index_urls = @index_urls || [nil] hashes = [] index_urls.each do |index_url| args = [name, version, algorithm] args << index_url if index_url begin native_helper_hashes = T.cast( SharedHelpers.run_helper_subprocess( command: "pyenv exec python3 #{NativeHelpers.python_helper_path}", function: "get_dependency_hash", args: args ), T::Array[T::Hash[String, String]] ).map { |h| "--hash=#{algorithm}:#{h['hash']}" } hashes.concat(native_helper_hashes) rescue SharedHelpers::HelperSubprocessFailed => e raise unless e.error_class.include?("PackageNotFoundError") next end end hashes end
def post_process_compiled_file(updated_content, file)
def post_process_compiled_file(updated_content, file) content = replace_header_with_original(updated_content, file.content) content = remove_new_warnings(content, file.content) content = update_hashes_if_required(content, file.content) replace_absolute_file_paths(content, file.content) end
def python_env
def python_env env = {} # Handle Apache Airflow 1.10.x installs if dependency_files.any? { |f| f.content.include?("apache-airflow") } if dependency_files.any? { |f| f.content.include?("unidecode") } env["AIRFLOW_GPL_UNIDECODE"] = "yes" else env["SLUGIFY_USES_TEXT_UNIDECODE"] = "yes" end end env end
def python_requirement_parser
def python_requirement_parser @python_requirement_parser ||= FileParser::PythonRequirementParser.new( dependency_files: dependency_files ) end
def remove_new_warnings(updated_content, original_content)
def remove_new_warnings(updated_content, original_content) content = updated_content content = content.sub(WARNINGS, "\n") if content.match?(WARNINGS) && !original_content.match?(WARNINGS) if content.match?(UNSAFE_NOTE) && !original_content.match?(UNSAFE_NOTE) content = content.sub(UNSAFE_NOTE, "\n") end content end
def replace_absolute_file_paths(updated_content, original_content)
def replace_absolute_file_paths(updated_content, original_content) content = updated_content update_count = 0 original_content.lines.each do |original_line| next unless original_line.start_with?("-e") next update_count += 1 if updated_content.include?(original_line) line_to_update = updated_content.lines .select { |l| l.start_with?("-e") } .at(update_count) raise "Mismatch in editable requirements!" unless line_to_update content = content.gsub(line_to_update, original_line) update_count += 1 end content end
def replace_header_with_original(updated_content, original_content)
def replace_header_with_original(updated_content, original_content) original_header_lines = original_content.lines.take_while { |l| l.start_with?("#") } updated_content_lines = updated_content.lines.drop_while { |l| l.start_with?("#") } [*original_header_lines, *updated_content_lines].join end
def requirement_map
def requirement_map child_req_regex = Uv::FileFetcher::CHILD_REQUIREMENT_REGEX @requirement_map ||= compile_files.each_with_object({}) do |file, req_map| paths = file.content.scan(child_req_regex).flatten current_dir = File.dirname(file.name) req_map[file.name] = paths.map do |path| path = File.join(current_dir, path) if current_dir != "." path = Pathname.new(path).cleanpath.to_path path = path.gsub(/\.txt$/, ".in") next if path == file.name path end.uniq.compact end end
def run_command(cmd, env: python_env, allow_unsafe_shell_command: false, fingerprint:)
def run_command(cmd, env: python_env, allow_unsafe_shell_command: false, fingerprint:) SharedHelpers.run_shell_command( cmd, env: env, allow_unsafe_shell_command: allow_unsafe_shell_command, fingerprint: fingerprint, stderr_to_stdout: true ) rescue SharedHelpers::HelperSubprocessFailed => e stdout = e.message if stdout.match?(INCOMPATIBLE_VERSIONS_REGEX) raise DependencyFileNotResolvable, stdout.match(INCOMPATIBLE_VERSIONS_REGEX) end raise end
def run_uv_compile_command(command, allow_unsafe_shell_command: false, fingerprint:)
def run_uv_compile_command(command, allow_unsafe_shell_command: false, fingerprint:) run_command( "pyenv local #{language_version_manager.python_major_minor}", fingerprint: "pyenv local <python_major_minor>" ) run_command( command, allow_unsafe_shell_command: allow_unsafe_shell_command, fingerprint: fingerprint ) end
def update_dependency_requirement(file)
def update_dependency_requirement(file) return file.content unless file.name.end_with?(".in") old_req = dependency.previous_requirements .find { |r| r[:file] == file.name } new_req = dependency.requirements .find { |r| r[:file] == file.name } return file.content unless old_req&.fetch(:requirement) return file.content if old_req == new_req RequirementReplacer.new( content: file.content, dependency_name: dependency.name, old_requirement: old_req[:requirement], new_requirement: new_req[:requirement], index_urls: @index_urls ).updated_content end
def update_hashes_if_required(updated_content, original_content)
def update_hashes_if_required(updated_content, original_content) deps_to_update = deps_to_augment_hashes_for(updated_content, original_content) updated_content_with_hashes = updated_content deps_to_update.each do |mtch| updated_string = mtch.to_s.sub( RequirementParser::HASHES, package_hashes_for( name: mtch.named_captures.fetch("name"), version: mtch.named_captures.fetch("version"), algorithm: mtch.named_captures.fetch("algorithm") ).sort.join(hash_separator(mtch.to_s)) ) updated_content_with_hashes = updated_content_with_hashes .gsub(mtch.to_s, updated_string) end updated_content_with_hashes end
def update_manifest_files
def update_manifest_files dependency_files.filter_map do |file| next unless file.name.end_with?(".in") file = file.dup updated_content = update_dependency_requirement(file) next if updated_content == file.content file.content = updated_content file end end
def update_uncompiled_files(updated_files)
def update_uncompiled_files(updated_files) updated_filenames = updated_files.map(&:name) old_reqs = dependency.previous_requirements .reject { |r| updated_filenames.include?(r[:file]) } new_reqs = dependency.requirements .reject { |r| updated_filenames.include?(r[:file]) } return [] if new_reqs.none? files = dependency_files .reject { |file| updated_filenames.include?(file.name) } args = dependency.to_h args = args.keys.to_h { |k| [k.to_sym, args[k]] } args[:requirements] = new_reqs args[:previous_requirements] = old_reqs RequirementFileUpdater.new( dependencies: [Dependency.new(**args)], dependency_files: files, credentials: credentials ).updated_dependency_files end
def updated_dependency_files
def updated_dependency_files @updated_dependency_files ||= fetch_updated_dependency_files end
def uv_compile_options_from_compiled_file(requirements_file)
def uv_compile_options_from_compiled_file(requirements_file) options = ["--output-file=#{requirements_file.name}"] options << "--emit-index-url" if requirements_file.content.include?("index-url http") options << "--generate-hashes" if requirements_file.content.include?("--hash=sha") options << "--no-annotate" unless requirements_file.content.include?("# via ") options << "--pre" if requirements_file.content.include?("--pre") options << "--no-strip-extras" if requirements_file.content.include?("--no-strip-extras") if requirements_file.content.include?("--no-binary") || requirements_file.content.include?("--only-binary") options << "--emit-build-options" end options << "--universal" if requirements_file.content.include?("--universal") options end
def write_updated_dependency_files
def write_updated_dependency_files dependency_files.each do |file| path = file.name FileUtils.mkdir_p(Pathname.new(path).dirname) File.write(path, freeze_dependency_requirement(file)) end # Overwrite the .python-version with updated content File.write(".python-version", language_version_manager.python_major_minor) end