lib/dependabot/python/file_updater/poetry_file_updater.rb



# typed: strict
# frozen_string_literal: true

require "sorbet-runtime"
require "toml-rb"
require "open3"
require "dependabot/dependency"
require "dependabot/shared_helpers"
require "dependabot/python/language_version_manager"
require "dependabot/python/version"
require "dependabot/python/requirement"
require "dependabot/python/file_parser/python_requirement_parser"
require "dependabot/python/file_updater"
require "dependabot/python/native_helpers"
require "dependabot/python/name_normaliser"

module Dependabot
  module Python
    class FileUpdater
      class PoetryFileUpdater
        require_relative "pyproject_preparer"
        extend T::Sig

        sig { returns(T::Array[Dependabot::DependencyFile]) }
        attr_reader :dependency_files

        sig { returns(T::Array[Dependabot::Credential]) }
        attr_reader :credentials

        sig { returns(T::Array[Dependabot::Dependency]) }
        attr_reader :dependencies

        sig do
          params(
            dependencies: T::Array[Dependabot::Dependency],
            dependency_files: T::Array[Dependabot::DependencyFile],
            credentials: T::Array[Dependabot::Credential]
          ).void
        end
        def initialize(dependencies:, dependency_files:, credentials:)
          @dependencies = dependencies
          @dependency_files = dependency_files
          @credentials = credentials
          @updated_dependency_files = T.let(nil, T.nilable(T::Array[Dependabot::DependencyFile]))
          @prepared_pyproject = T.let(nil, T.nilable(String))
          @pyproject = T.let(nil, T.nilable(Dependabot::DependencyFile))
          @lockfile = T.let(nil, T.nilable(Dependabot::DependencyFile))
          @updated_lockfile_content = T.let(nil, T.nilable(String))
          @language_version_manager = T.let(nil, T.nilable(LanguageVersionManager))
          @python_requirement_parser = T.let(nil, T.nilable(FileParser::PythonRequirementParser))
          @updated_pyproject_content = T.let(nil, T.nilable(String))
          @python_helper_path = T.let(nil, T.nilable(String))
          @poetry_lock = T.let(nil, T.nilable(Dependabot::DependencyFile))
        end

        sig { returns(T::Array[Dependabot::DependencyFile]) }
        def updated_dependency_files
          @updated_dependency_files ||= fetch_updated_dependency_files
        end

        private

        sig { returns(Dependabot::Dependency) }
        def dependency
          # For now, we'll only ever be updating a single dependency
          T.must(dependencies.first)
        end

        sig { returns(T::Array[Dependabot::DependencyFile]) }
        def fetch_updated_dependency_files
          updated_files = []

          if file_changed?(T.must(pyproject))
            updated_files <<
              updated_file(
                file: T.must(pyproject),
                content: T.must(updated_pyproject_content)
              )
          end

          raise "Expected lockfile to change!" if lockfile && lockfile&.content == updated_lockfile_content

          if lockfile
            updated_files <<
              updated_file(file: T.must(lockfile), content: updated_lockfile_content)
          end

          updated_files
        end

        sig { returns(T.nilable(String)) }
        def updated_pyproject_content
          content = T.must(pyproject).content
          return content unless requirement_changed?(T.must(pyproject), dependency)

          updated_content = content.dup

          dependency.requirements.zip(T.must(dependency.previous_requirements)).each do |new_r, old_r|
            next unless new_r[:file] == pyproject&.name && T.must(old_r)[:file] == pyproject&.name

            updated_content = replace_dep(dependency, T.must(updated_content), new_r, T.must(old_r))
          end

          raise DependencyFileContentNotChanged, "Content did not change!" if content == updated_content

          updated_content
        end

        sig do
          params(
            dep: Dependabot::Dependency,
            content: String,
            new_r: T::Hash[Symbol, T.untyped],
            old_r: T::Hash[Symbol, T.untyped]
          ).returns(String)
        end
        def replace_dep(dep, content, new_r, old_r)
          new_req = new_r[:requirement]
          old_req = old_r[:requirement]

          declaration_regex = declaration_regex(dep, old_r)
          declaration_match = content.match(declaration_regex)
          if declaration_match
            declaration = declaration_match[:declaration]
            new_declaration = T.must(declaration).sub(old_req, new_req)
            content.sub(T.must(declaration), new_declaration)
          else
            content.gsub(table_declaration_regex(dep, new_r)) do |match|
              match.gsub(/(\s*version\s*=\s*["'])#{Regexp.escape(old_req)}/,
                         '\1' + new_req)
            end
          end
        end

        sig { returns(String) }
        def updated_lockfile_content
          @updated_lockfile_content ||=
            begin
              new_lockfile = updated_lockfile_content_for(prepared_pyproject)

              original_locked_python = TomlRB.parse(T.must(lockfile).content)["metadata"]["python-versions"]

              new_lockfile.gsub!(/\[metadata\]\n.*python-versions[^\n]+\n/m) do |match|
                match.gsub(/(["']).*(['"])\n\Z/, '\1' + original_locked_python + '\1' + "\n")
              end

              tmp_hash =
                TomlRB.parse(new_lockfile)["metadata"]["content-hash"]
              correct_hash = pyproject_hash_for(updated_pyproject_content.to_s)

              new_lockfile.gsub(tmp_hash, T.must(correct_hash).to_s)
            end
        end

        sig { returns(String) }
        def prepared_pyproject
          @prepared_pyproject ||=
            begin
              content = updated_pyproject_content
              content = sanitize(T.must(content))
              content = freeze_other_dependencies(content)
              content = freeze_dependencies_being_updated(content)
              content = update_python_requirement(content)
              content
            end
        end

        sig { params(pyproject_content: String).returns(String) }
        def freeze_other_dependencies(pyproject_content)
          PyprojectPreparer
            .new(pyproject_content: pyproject_content, lockfile: lockfile)
            .freeze_top_level_dependencies_except(dependencies)
        end

        sig { params(pyproject_content: String).returns(String) }
        def freeze_dependencies_being_updated(pyproject_content)
          pyproject_object = TomlRB.parse(pyproject_content)
          poetry_object = pyproject_object.fetch("tool").fetch("poetry")

          dependencies.each do |dep|
            if dep.requirements.find { |r| r[:file] == pyproject&.name }
              lock_declaration_to_new_version!(poetry_object, dep)
            else
              create_declaration_at_new_version!(poetry_object, dep)
            end
          end

          TomlRB.dump(pyproject_object)
        end

        sig { params(pyproject_content: String).returns(String) }
        def update_python_requirement(pyproject_content)
          PyprojectPreparer
            .new(pyproject_content: pyproject_content)
            .update_python_requirement(language_version_manager.python_version)
        end

        sig { params(poetry_object: T::Hash[String, T.untyped], dep: Dependabot::Dependency).returns(T::Array[String]) }
        def lock_declaration_to_new_version!(poetry_object, dep)
          Dependabot::Python::FileParser::PyprojectFilesParser::POETRY_DEPENDENCY_TYPES.each do |type|
            names = poetry_object[type]&.keys || []
            pkg_name = names.find { |nm| normalise(nm) == dep.name }
            next unless pkg_name

            if poetry_object[type][pkg_name].is_a?(Hash)
              poetry_object[type][pkg_name]["version"] = dep.version
            else
              poetry_object[type][pkg_name] = dep.version
            end
          end
        end

        sig { params(poetry_object: T::Hash[String, T.untyped], dep: Dependabot::Dependency).void }
        def create_declaration_at_new_version!(poetry_object, dep)
          subdep_type = dep.production? ? "dependencies" : "dev-dependencies"

          poetry_object[subdep_type] ||= {}
          poetry_object[subdep_type][dep.name] = dep.version
        end

        sig { params(pyproject_content: String).returns(String) }
        def sanitize(pyproject_content)
          PyprojectPreparer
            .new(pyproject_content: pyproject_content)
            .sanitize
        end

        sig { params(pyproject_content: String).returns(String) }
        def updated_lockfile_content_for(pyproject_content)
          SharedHelpers.in_a_temporary_directory do
            SharedHelpers.with_git_configured(credentials: credentials) do
              write_temporary_dependency_files(pyproject_content)
              add_auth_env_vars

              language_version_manager.install_required_python

              # use system git instead of the pure Python dulwich
              run_poetry_command("pyenv exec poetry config system-git-client true")

              run_poetry_update_command

              File.read("poetry.lock")
            end
          end
        end

        # Using `--lock` avoids doing an install.
        # Using `--no-interaction` avoids asking for passwords.
        sig { returns(String) }
        def run_poetry_update_command
          run_poetry_command(
            "pyenv exec poetry update #{dependency.name} --lock --no-interaction",
            fingerprint: "pyenv exec poetry update <dependency_name> --lock --no-interaction"
          )
        end

        sig { params(command: String, fingerprint: T.nilable(String)).returns(String) }
        def run_poetry_command(command, fingerprint: nil)
          SharedHelpers.run_shell_command(command, fingerprint: fingerprint)
        end

        sig { params(pyproject_content: Object).returns(Integer) }
        def write_temporary_dependency_files(pyproject_content)
          dependency_files.each do |file|
            path = file.name
            FileUtils.mkdir_p(Pathname.new(path).dirname)
            File.write(path, file.content)
          end

          # Overwrite the .python-version with updated content
          File.write(".python-version", language_version_manager.python_major_minor)

          # Overwrite the pyproject with updated content
          File.write("pyproject.toml", pyproject_content)
        end

        sig { void }
        def add_auth_env_vars
          Python::FileUpdater::PyprojectPreparer
            .new(pyproject_content: T.must(pyproject&.content))
            .add_auth_env_vars(credentials)
        end

        sig do
          params(
            pyproject_content: String
          ).returns(T.nilable(T.any(
                                T::Hash[String, T.untyped],
                                String,
                                T::Array[T::Hash[String, T.untyped]]
                              )))
        end
        def pyproject_hash_for(pyproject_content)
          SharedHelpers.in_a_temporary_directory do |dir|
            SharedHelpers.with_git_configured(credentials: credentials) do
              write_temporary_dependency_files(pyproject_content)

              SharedHelpers.run_helper_subprocess(
                command: "pyenv exec python3 #{python_helper_path}",
                function: "get_pyproject_hash",
                args: [T.cast(dir, Pathname).to_s]
              )
            end
          end
        end

        sig { params(dep: Dependabot::Dependency, old_req: T::Hash[Symbol, T.untyped]).returns(Regexp) }
        def declaration_regex(dep, old_req)
          group = old_req[:groups].first

          header_regex = "#{group}(?:\\.dependencies)?\\]\s*(?:\s*#.*?)*?"
          /#{header_regex}\n.*?(?<declaration>(?:^\s*|["'])#{escape(dep)}["']?\s*=[^\n]*)$/mi
        end

        sig { params(dep: Dependabot::Dependency, old_req: T::Hash[Symbol, T.untyped]).returns(Regexp) }
        def table_declaration_regex(dep, old_req)
          /tool\.poetry\.#{old_req[:groups].first}\.#{escape(dep)}\]\n.*?\s*version\s* =.*?\n/m
        end

        sig { params(dep: Dependency).returns(String) }
        def escape(dep)
          Regexp.escape(dep.name).gsub("\\-", "[-_.]")
        end

        sig { params(file: Dependabot::DependencyFile).returns(T::Boolean) }
        def file_changed?(file)
          dependencies.any? { |dep| requirement_changed?(file, dep) }
        end

        sig { params(file: Dependabot::DependencyFile, dependency: Dependabot::Dependency).returns(T::Boolean) }
        def requirement_changed?(file, dependency)
          changed_requirements =
            dependency.requirements - T.must(dependency.previous_requirements)

          changed_requirements.any? { |f| f[:file] == file.name }
        end

        sig { params(file: Dependabot::DependencyFile, content: String).returns(Dependabot::DependencyFile) }
        def updated_file(file:, content:)
          updated_file = file.dup
          updated_file.content = content
          updated_file
        end

        sig { params(name: String).returns(String) }
        def normalise(name)
          NameNormaliser.normalise(name)
        end

        sig { returns(FileParser::PythonRequirementParser) }
        def python_requirement_parser
          @python_requirement_parser ||=
            FileParser::PythonRequirementParser.new(
              dependency_files: dependency_files
            )
        end

        sig { returns(Dependabot::Python::LanguageVersionManager) }
        def language_version_manager
          @language_version_manager ||=
            LanguageVersionManager.new(
              python_requirement_parser: python_requirement_parser
            )
        end

        sig { returns(T.nilable(Dependabot::DependencyFile)) }
        def pyproject
          @pyproject ||=
            dependency_files.find { |f| f.name == "pyproject.toml" }
        end

        sig { returns(T.nilable(Dependabot::DependencyFile)) }
        def lockfile
          @lockfile ||= poetry_lock
        end

        sig { returns(String) }
        def python_helper_path
          NativeHelpers.python_helper_path
        end

        sig { returns(T.nilable(Dependabot::DependencyFile)) }
        def poetry_lock
          dependency_files.find { |f| f.name == "poetry.lock" }
        end
      end
    end
  end
end