# typed: true
# frozen_string_literal: true
require "open3"
require "dependabot/dependency"
require "dependabot/uv/requirement_parser"
require "dependabot/uv/file_fetcher"
require "dependabot/uv/file_parser/python_requirement_parser"
require "dependabot/uv/file_updater"
require "dependabot/shared_helpers"
require "dependabot/uv/language_version_manager"
require "dependabot/uv/native_helpers"
require "dependabot/uv/name_normaliser"
require "dependabot/uv/authed_url_builder"
module Dependabot
module Uv
class FileUpdater
# rubocop:disable Metrics/ClassLength
class CompileFileUpdater
require_relative "requirement_replacer"
require_relative "requirement_file_updater"
UNSAFE_PACKAGES = %w(setuptools distribute pip).freeze
INCOMPATIBLE_VERSIONS_REGEX = /There are incompatible versions in the resolved dependencies:.*\z/m
WARNINGS = /\s*# WARNING:.*\Z/m
UNSAFE_NOTE = /\s*# The following packages are considered to be unsafe.*\Z/m
RESOLVER_REGEX = /(?<=--resolver=)(\w+)/
NATIVE_COMPILATION_ERROR =
"pip._internal.exceptions.InstallationSubprocessError: Getting requirements to build wheel exited with 1"
attr_reader :dependencies
attr_reader :dependency_files
attr_reader :credentials
def initialize(dependencies:, dependency_files:, credentials:, index_urls: nil)
@dependencies = dependencies
@dependency_files = dependency_files
@credentials = credentials
@index_urls = index_urls
@build_isolation = true
end
def updated_dependency_files
@updated_dependency_files ||= fetch_updated_dependency_files
end
private
def dependency
# For now, we'll only ever be updating a single dependency
dependencies.first
end
def fetch_updated_dependency_files
updated_compiled_files = compile_new_requirement_files
updated_manifest_files = update_manifest_files
updated_files = updated_compiled_files + updated_manifest_files
updated_uncompiled_files = update_uncompiled_files(updated_files)
[
*updated_manifest_files,
*updated_compiled_files,
*updated_uncompiled_files
]
end
def compile_new_requirement_files
SharedHelpers.in_a_temporary_directory do
write_updated_dependency_files
language_version_manager.install_required_python
filenames_to_compile.each do |filename|
compile_file(filename)
end
# Remove any .python-version file before parsing the reqs
FileUtils.remove_entry(".python-version", true)
dependency_files.filter_map do |file|
next unless file.name.end_with?(".txt")
updated_content = File.read(file.name)
updated_content =
post_process_compiled_file(updated_content, file)
next if updated_content == file.content
file.dup.tap { |f| f.content = updated_content }
end
end
end
def compile_file(filename)
# Shell out to pip-compile, generate a new set of requirements.
# This is slow, as pip-compile needs to do installs.
options = compile_options(filename)
options_fingerprint = compile_options_fingerprint(options)
name_part = "pyenv exec uv pip compile " \
"#{options} -P " \
"#{dependency.name}"
fingerprint_name_part = "pyenv exec uv pip compile " \
"#{options_fingerprint} -P " \
"<dependency_name>"
version_part = "#{dependency.version} #{filename}"
fingerprint_version_part = "<dependency_version> <filename>"
# Don't escape pyenv `dep-name==version` syntax
run_uv_compile_command(
"#{SharedHelpers.escape_command(name_part)}==" \
"#{SharedHelpers.escape_command(version_part)}",
allow_unsafe_shell_command: true,
fingerprint: "#{fingerprint_name_part}==#{fingerprint_version_part}"
)
rescue SharedHelpers::HelperSubprocessFailed => e
retry_count ||= 0
retry_count += 1
if compilation_error?(e) && retry_count <= 1
@build_isolation = false
retry
end
raise
end
def compilation_error?(error)
error.message.include?(NATIVE_COMPILATION_ERROR)
end
def update_manifest_files
dependency_files.filter_map do |file|
next unless file.name.end_with?(".in")
file = file.dup
updated_content = update_dependency_requirement(file)
next if updated_content == file.content
file.content = updated_content
file
end
end
def update_uncompiled_files(updated_files)
updated_filenames = updated_files.map(&:name)
old_reqs = dependency.previous_requirements
.reject { |r| updated_filenames.include?(r[:file]) }
new_reqs = dependency.requirements
.reject { |r| updated_filenames.include?(r[:file]) }
return [] if new_reqs.none?
files = dependency_files
.reject { |file| updated_filenames.include?(file.name) }
args = dependency.to_h
args = args.keys.to_h { |k| [k.to_sym, args[k]] }
args[:requirements] = new_reqs
args[:previous_requirements] = old_reqs
RequirementFileUpdater.new(
dependencies: [Dependency.new(**args)],
dependency_files: files,
credentials: credentials
).updated_dependency_files
end
def run_command(cmd, env: python_env, allow_unsafe_shell_command: false, fingerprint:)
SharedHelpers.run_shell_command(
cmd,
env: env,
allow_unsafe_shell_command: allow_unsafe_shell_command,
fingerprint: fingerprint,
stderr_to_stdout: true
)
rescue SharedHelpers::HelperSubprocessFailed => e
stdout = e.message
if stdout.match?(INCOMPATIBLE_VERSIONS_REGEX)
raise DependencyFileNotResolvable, stdout.match(INCOMPATIBLE_VERSIONS_REGEX)
end
raise
end
def run_uv_compile_command(command, allow_unsafe_shell_command: false, fingerprint:)
run_command(
"pyenv local #{language_version_manager.python_major_minor}",
fingerprint: "pyenv local <python_major_minor>"
)
run_command(
command,
allow_unsafe_shell_command: allow_unsafe_shell_command,
fingerprint: fingerprint
)
end
def python_env
env = {}
# Handle Apache Airflow 1.10.x installs
if dependency_files.any? { |f| f.content.include?("apache-airflow") }
if dependency_files.any? { |f| f.content.include?("unidecode") }
env["AIRFLOW_GPL_UNIDECODE"] = "yes"
else
env["SLUGIFY_USES_TEXT_UNIDECODE"] = "yes"
end
end
env
end
def write_updated_dependency_files
dependency_files.each do |file|
path = file.name
FileUtils.mkdir_p(Pathname.new(path).dirname)
File.write(path, freeze_dependency_requirement(file))
end
# Overwrite the .python-version with updated content
File.write(".python-version", language_version_manager.python_major_minor)
end
def freeze_dependency_requirement(file)
return file.content unless file.name.end_with?(".in")
old_req = dependency.previous_requirements
.find { |r| r[:file] == file.name }
return file.content unless old_req
return file.content if old_req == "==#{dependency.version}"
RequirementReplacer.new(
content: file.content,
dependency_name: dependency.name,
old_requirement: old_req[:requirement],
new_requirement: "==#{dependency.version}",
index_urls: @index_urls
).updated_content
end
def update_dependency_requirement(file)
return file.content unless file.name.end_with?(".in")
old_req = dependency.previous_requirements
.find { |r| r[:file] == file.name }
new_req = dependency.requirements
.find { |r| r[:file] == file.name }
return file.content unless old_req&.fetch(:requirement)
return file.content if old_req == new_req
RequirementReplacer.new(
content: file.content,
dependency_name: dependency.name,
old_requirement: old_req[:requirement],
new_requirement: new_req[:requirement],
index_urls: @index_urls
).updated_content
end
def post_process_compiled_file(updated_content, file)
content = replace_header_with_original(updated_content, file.content)
content = remove_new_warnings(content, file.content)
content = update_hashes_if_required(content, file.content)
replace_absolute_file_paths(content, file.content)
end
def replace_header_with_original(updated_content, original_content)
original_header_lines =
original_content.lines.take_while { |l| l.start_with?("#") }
updated_content_lines =
updated_content.lines.drop_while { |l| l.start_with?("#") }
[*original_header_lines, *updated_content_lines].join
end
def replace_absolute_file_paths(updated_content, original_content)
content = updated_content
update_count = 0
original_content.lines.each do |original_line|
next unless original_line.start_with?("-e")
next update_count += 1 if updated_content.include?(original_line)
line_to_update =
updated_content.lines
.select { |l| l.start_with?("-e") }
.at(update_count)
raise "Mismatch in editable requirements!" unless line_to_update
content = content.gsub(line_to_update, original_line)
update_count += 1
end
content
end
def remove_new_warnings(updated_content, original_content)
content = updated_content
content = content.sub(WARNINGS, "\n") if content.match?(WARNINGS) && !original_content.match?(WARNINGS)
if content.match?(UNSAFE_NOTE) &&
!original_content.match?(UNSAFE_NOTE)
content = content.sub(UNSAFE_NOTE, "\n")
end
content
end
def update_hashes_if_required(updated_content, original_content)
deps_to_update =
deps_to_augment_hashes_for(updated_content, original_content)
updated_content_with_hashes = updated_content
deps_to_update.each do |mtch|
updated_string = mtch.to_s.sub(
RequirementParser::HASHES,
package_hashes_for(
name: mtch.named_captures.fetch("name"),
version: mtch.named_captures.fetch("version"),
algorithm: mtch.named_captures.fetch("algorithm")
).sort.join(hash_separator(mtch.to_s))
)
updated_content_with_hashes = updated_content_with_hashes
.gsub(mtch.to_s, updated_string)
end
updated_content_with_hashes
end
def deps_to_augment_hashes_for(updated_content, original_content)
regex = /^#{RequirementParser::INSTALL_REQ_WITH_REQUIREMENT}/o
new_matches = []
updated_content.scan(regex) { new_matches << Regexp.last_match }
old_matches = []
original_content.scan(regex) { old_matches << Regexp.last_match }
new_deps = []
changed_hashes_deps = []
new_matches.each do |mtch|
nm = mtch.named_captures["name"]
old_match = old_matches.find { |m| m.named_captures["name"] == nm }
next new_deps << mtch unless old_match
next unless old_match.named_captures["hashes"]
old_count = old_match.named_captures["hashes"].split("--hash").count
new_count = mtch.named_captures["hashes"].split("--hash").count
changed_hashes_deps << mtch if new_count < old_count
end
return [] if changed_hashes_deps.none?
[*new_deps, *changed_hashes_deps]
end
def package_hashes_for(name:, version:, algorithm:)
index_urls = @index_urls || [nil]
hashes = []
index_urls.each do |index_url|
args = [name, version, algorithm]
args << index_url if index_url
begin
native_helper_hashes = T.cast(
SharedHelpers.run_helper_subprocess(
command: "pyenv exec python3 #{NativeHelpers.python_helper_path}",
function: "get_dependency_hash",
args: args
),
T::Array[T::Hash[String, String]]
).map { |h| "--hash=#{algorithm}:#{h['hash']}" }
hashes.concat(native_helper_hashes)
rescue SharedHelpers::HelperSubprocessFailed => e
raise unless e.error_class.include?("PackageNotFoundError")
next
end
end
hashes
end
def hash_separator(requirement_string)
hash_regex = RequirementParser::HASH
return unless requirement_string.match?(hash_regex)
current_separator =
requirement_string
.match(/#{hash_regex}((?<separator>\s*\\?\s*?)#{hash_regex})*/)
.named_captures.fetch("separator")
default_separator =
requirement_string
.match(RequirementParser::HASH)
.pre_match.match(/(?<separator>\s*\\?\s*?)\z/)
.named_captures.fetch("separator")
current_separator || default_separator
end
def compile_options_fingerprint(options)
options.sub(
/--output-file=\S+/, "--output-file=<output_file>"
).sub(
/--index-url=\S+/, "--index-url=<index_url>"
).sub(
/--extra-index-url=\S+/, "--extra-index-url=<extra_index_url>"
)
end
def compile_options(filename)
options = @build_isolation ? ["--build-isolation"] : ["--no-build-isolation"]
options += compile_index_options
if (requirements_file = compiled_file_for_filename(filename))
options += uv_compile_options_from_compiled_file(requirements_file)
end
options.join(" ")
end
def uv_compile_options_from_compiled_file(requirements_file)
options = ["--output-file=#{requirements_file.name}"]
options << "--emit-index-url" if requirements_file.content.include?("index-url http")
options << "--generate-hashes" if requirements_file.content.include?("--hash=sha")
options << "--no-annotate" unless requirements_file.content.include?("# via ")
options << "--pre" if requirements_file.content.include?("--pre")
options << "--no-strip-extras" if requirements_file.content.include?("--no-strip-extras")
if requirements_file.content.include?("--no-binary") || requirements_file.content.include?("--only-binary")
options << "--emit-build-options"
end
options << "--universal" if requirements_file.content.include?("--universal")
options
end
def compile_index_options
credentials
.select { |cred| cred["type"] == "python_index" }
.map do |cred|
authed_url = AuthedUrlBuilder.authed_url(credential: cred)
if cred.replaces_base?
"--index-url=#{authed_url}"
else
"--extra-index-url=#{authed_url}"
end
end
end
def includes_unsafe_packages?(content)
UNSAFE_PACKAGES.any? { |n| content.match?(/^#{Regexp.quote(n)}==/) }
end
def filenames_to_compile
files_from_reqs =
dependency.requirements
.map { |r| r[:file] }
.select { |fn| fn.end_with?(".in") }
files_from_compiled_files =
compile_files.map(&:name).select do |fn|
compiled_file = compiled_file_for_filename(fn)
compiled_file_includes_dependency?(compiled_file)
end
filenames = [*files_from_reqs, *files_from_compiled_files].uniq
order_filenames_for_compilation(filenames)
end
def compiled_file_for_filename(filename)
compiled_file =
compiled_files
.find { |f| f.content.match?(output_file_regex(filename)) }
compiled_file ||=
compiled_files
.find { |f| f.name == filename.gsub(/\.in$/, ".txt") }
compiled_file
end
def output_file_regex(filename)
"--output-file[=\s]+.*\s#{Regexp.escape(filename)}\s*$"
end
def compiled_file_includes_dependency?(compiled_file)
return false unless compiled_file
regex = RequirementParser::INSTALL_REQ_WITH_REQUIREMENT
matches = []
compiled_file.content.scan(regex) { matches << Regexp.last_match }
matches.any? { |m| normalise(m[:name]) == dependency.name }
end
def normalise(name)
NameNormaliser.normalise(name)
end
# If the files we need to update require one another then we need to
# update them in the right order
def order_filenames_for_compilation(filenames)
ordered_filenames = T.let([], T::Array[String])
while (remaining_filenames = filenames - ordered_filenames).any?
ordered_filenames +=
remaining_filenames
.reject do |fn|
unupdated_reqs = requirement_map[fn] - ordered_filenames
unupdated_reqs.intersect?(filenames)
end
end
ordered_filenames
end
def requirement_map
child_req_regex = Uv::FileFetcher::CHILD_REQUIREMENT_REGEX
@requirement_map ||=
compile_files.each_with_object({}) do |file, req_map|
paths = file.content.scan(child_req_regex).flatten
current_dir = File.dirname(file.name)
req_map[file.name] =
paths.map do |path|
path = File.join(current_dir, path) if current_dir != "."
path = Pathname.new(path).cleanpath.to_path
path = path.gsub(/\.txt$/, ".in")
next if path == file.name
path
end.uniq.compact
end
end
def python_requirement_parser
@python_requirement_parser ||=
FileParser::PythonRequirementParser.new(
dependency_files: dependency_files
)
end
def language_version_manager
@language_version_manager ||=
LanguageVersionManager.new(
python_requirement_parser: python_requirement_parser
)
end
def compile_files
dependency_files.select { |f| f.name.end_with?(".in") }
end
def compiled_files
dependency_files.select { |f| f.name.end_with?(".txt") }
end
end
# rubocop:enable Metrics/ClassLength
end
end
end