# typed: strict
# frozen_string_literal: true
require "sorbet-runtime"
require "dependabot/version"
module Dependabot
class Dependency
extend T::Sig
@production_checks = T.let(
{},
T::Hash[String, T.proc.params(arg0: T::Array[T.untyped]).returns(T::Boolean)]
)
@display_name_builders = T.let({}, T::Hash[String, T.proc.params(arg0: String).returns(String)])
@name_normalisers = T.let({}, T::Hash[String, T.proc.params(arg0: String).returns(String)])
sig do
params(package_manager: String).returns(T.proc.params(arg0: T::Array[T.untyped]).returns(T::Boolean))
end
def self.production_check_for_package_manager(package_manager)
production_check = @production_checks[package_manager]
return production_check if production_check
raise "Unsupported package_manager #{package_manager}"
end
sig do
params(
package_manager: String,
production_check: T.proc.params(arg0: T::Array[T.untyped]).returns(T::Boolean)
)
.returns(T.proc.params(arg0: T::Array[T.untyped]).returns(T::Boolean))
end
def self.register_production_check(package_manager, production_check)
@production_checks[package_manager] = production_check
end
sig { params(package_manager: String).returns(T.nilable(T.proc.params(arg0: String).returns(String))) }
def self.display_name_builder_for_package_manager(package_manager)
@display_name_builders[package_manager]
end
sig { params(package_manager: String, name_builder: T.proc.params(arg0: String).returns(String)).void }
def self.register_display_name_builder(package_manager, name_builder)
@display_name_builders[package_manager] = name_builder
end
sig { params(package_manager: String).returns(T.nilable(T.proc.params(arg0: String).returns(String))) }
def self.name_normaliser_for_package_manager(package_manager)
@name_normalisers[package_manager] || ->(name) { name }
end
sig do
params(
package_manager: String,
name_builder: T.proc.params(arg0: String).returns(String)
).void
end
def self.register_name_normaliser(package_manager, name_builder)
@name_normalisers[package_manager] = name_builder
end
sig { returns(String) }
attr_reader :name
sig { returns(T.nilable(String)) }
attr_reader :version
sig { returns(T::Array[T::Hash[Symbol, T.untyped]]) }
attr_reader :requirements
sig { returns(String) }
attr_reader :package_manager
sig { returns(T.nilable(String)) }
attr_reader :previous_version
sig { returns(T.nilable(T::Array[T::Hash[Symbol, T.untyped]])) }
attr_reader :previous_requirements
sig { returns(T.nilable(String)) }
attr_accessor :directory
sig { returns(T.nilable(T::Array[T::Hash[Symbol, T.untyped]])) }
attr_reader :subdependency_metadata
sig { returns(T::Hash[Symbol, T.untyped]) }
attr_reader :metadata
# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/PerceivedComplexity
sig do
params(
name: String,
requirements: T::Array[T::Hash[T.any(Symbol, String), T.untyped]],
package_manager: String,
# TODO: Make version a Dependabot::Version everywhere
version: T.nilable(T.any(String, Dependabot::Version)),
previous_version: T.nilable(String),
previous_requirements: T.nilable(T::Array[T::Hash[T.any(Symbol, String), T.untyped]]),
directory: T.nilable(String),
subdependency_metadata: T.nilable(T::Array[T::Hash[T.any(Symbol, String), String]]),
removed: T::Boolean,
metadata: T.nilable(T::Hash[T.any(Symbol, String), String])
).void
end
def initialize(name:, requirements:, package_manager:, version: nil,
previous_version: nil, previous_requirements: nil, directory: nil,
subdependency_metadata: [], removed: false, metadata: {})
@name = name
@version = T.let(
case version
when Dependabot::Version then version.to_s
when String then version
end,
T.nilable(String)
)
@version = nil if @version == ""
@requirements = T.let(requirements.map { |req| symbolize_keys(req) }, T::Array[T::Hash[Symbol, T.untyped]])
@previous_version = previous_version
@previous_version = nil if @previous_version == ""
@previous_requirements = T.let(
previous_requirements&.map { |req| symbolize_keys(req) },
T.nilable(T::Array[T::Hash[Symbol, T.untyped]])
)
@package_manager = package_manager
@directory = directory
unless top_level? || subdependency_metadata == []
@subdependency_metadata = T.let(
subdependency_metadata&.map { |h| symbolize_keys(h) },
T.nilable(T::Array[T::Hash[Symbol, T.untyped]])
)
end
@removed = removed
@metadata = T.let(symbolize_keys(metadata || {}), T::Hash[Symbol, T.untyped])
check_values
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/PerceivedComplexity
sig { returns(T::Boolean) }
def top_level?
requirements.any?
end
sig { returns(T::Boolean) }
def removed?
@removed
end
sig { returns(T.nilable(Dependabot::Version)) }
def numeric_version
return unless version && version_class.correct?(version)
@numeric_version ||= T.let(version_class.new(T.must(version)), T.nilable(Dependabot::Version))
end
sig { returns(T::Hash[String, T.untyped]) }
def to_h
{
"name" => name,
"version" => version,
"requirements" => requirements,
"previous_version" => previous_version,
"previous_requirements" => previous_requirements,
"directory" => directory,
"package_manager" => package_manager,
"subdependency_metadata" => subdependency_metadata,
"removed" => removed? ? true : nil
}.compact
end
sig { returns(T::Boolean) }
def appears_in_lockfile?
!!(previous_version || (version && previous_requirements.nil?))
end
sig { returns(T::Boolean) }
def production?
return subdependency_production_check unless top_level?
groups = requirements.flat_map { |r| r.fetch(:groups).map(&:to_s) }
self.class
.production_check_for_package_manager(package_manager)
.call(groups)
end
sig { returns(T::Boolean) }
def subdependency_production_check
!subdependency_metadata&.all? { |h| h[:production] == false }
end
sig { returns(String) }
def display_name
display_name_builder =
self.class.display_name_builder_for_package_manager(package_manager)
return name unless display_name_builder
display_name_builder.call(name)
end
sig { returns(T.nilable(String)) }
def humanized_previous_version
# If we don't have a previous version, we *may* still be able to figure
# one out if a ref was provided and has been changed (in which case the
# previous ref was essentially the version).
if previous_version.nil?
return ref_changed? ? previous_ref : nil
end
if T.must(previous_version).match?(/^[0-9a-f]{40}/)
return previous_ref if ref_changed? && previous_ref
"`#{T.must(previous_version)[0..6]}`"
elsif version == previous_version &&
package_manager == "docker"
digest = docker_digest_from_reqs(T.must(previous_requirements))
"`#{T.must(T.must(digest).split(':').last)[0..6]}`"
else
previous_version
end
end
sig { returns(T.nilable(String)) }
def humanized_version
return "removed" if removed?
if T.must(version).match?(/^[0-9a-f]{40}/)
return new_ref if ref_changed? && new_ref
"`#{T.must(version)[0..6]}`"
elsif version == previous_version &&
package_manager == "docker"
digest = docker_digest_from_reqs(requirements)
"`#{T.must(T.must(digest).split(':').last)[0..6]}`"
else
version
end
end
sig { params(requirements: T::Array[T::Hash[Symbol, T.untyped]]).returns(T.nilable(String)) }
def docker_digest_from_reqs(requirements)
requirements
.filter_map { |r| r.dig(:source, "digest") || r.dig(:source, :digest) }
.first
end
sig { returns(T.nilable(String)) }
def previous_ref
return nil if previous_requirements.nil?
previous_refs = T.must(previous_requirements).filter_map do |r|
r.dig(:source, "ref") || r.dig(:source, :ref)
end.uniq
previous_refs.first if previous_refs.count == 1
end
sig { returns(T.nilable(String)) }
def new_ref
new_refs = requirements.filter_map do |r|
r.dig(:source, "ref") || r.dig(:source, :ref)
end.uniq
new_refs.first if new_refs.count == 1
end
sig { returns(T::Boolean) }
def ref_changed?
previous_ref != new_ref
end
# Returns all detected versions of the dependency. Only ecosystems that
# support this feature will return more than the current version.
sig { returns(T::Array[T.nilable(String)]) }
def all_versions
all_versions = metadata[:all_versions]
return [version].compact unless all_versions
all_versions.filter_map(&:version)
end
# This dependency is being indirectly updated by an update to another
# dependency. We don't need to try and update it ourselves but want to
# surface it to the user in the PR.
sig { returns(T.nilable(T::Boolean)) }
def informational_only?
metadata[:information_only]
end
sig { params(other: T.anything).returns(T::Boolean) }
def ==(other)
case other
when Dependency
to_h == other.to_h
else
false
end
end
sig { returns(Integer) }
def hash
to_h.hash
end
sig { params(other: T.anything).returns(T::Boolean) }
def eql?(other)
self == other
end
sig { returns(T::Array[T::Hash[Symbol, T.untyped]]) }
def specific_requirements
requirements.select { |r| requirement_class.new(r[:requirement]).specific? }
end
sig { returns(T.class_of(Dependabot::Requirement)) }
def requirement_class
Utils.requirement_class_for_package_manager(package_manager)
end
sig { returns(T.class_of(Dependabot::Version)) }
def version_class
Utils.version_class_for_package_manager(package_manager)
end
sig do
params(
allowed_types: T.nilable(T::Array[String])
)
.returns(T.nilable(T::Hash[T.any(String, Symbol), T.untyped]))
end
def source_details(allowed_types: nil)
sources = all_sources.uniq.compact
sources.select! { |source| allowed_types.include?(source[:type].to_s) } if allowed_types
git = allowed_types == ["git"]
if (git && sources.map { |s| s[:url] }.uniq.count > 1) || (!git && sources.count > 1)
raise "Multiple sources! #{sources.join(', ')}"
end
sources.first
end
sig { returns(T.nilable(String)) }
def source_type
details = source_details
return "default" if details.nil?
details[:type] || details.fetch("type")
end
sig { returns(T::Array[T::Hash[Symbol, T.untyped]]) }
def all_sources
if top_level?
requirements.map { |requirement| requirement.fetch(:source) }
elsif subdependency_metadata
T.must(subdependency_metadata).filter_map { |data| data[:source] }
else
[]
end
end
sig { returns(T::Boolean) }
def requirements_changed?
(requirements - T.must(previous_requirements)).any?
end
private
sig { void }
def check_values
check_requirement_fields
check_subdependency_metadata
end
sig { void }
def check_requirement_fields
requirement_fields = [requirements, previous_requirements].compact
unless requirement_fields.all?(Array) &&
requirement_fields.flatten.all?(Hash)
raise ArgumentError, "requirements must be an array of hashes"
end
required_keys = %i(requirement file groups source)
optional_keys = %i(metadata)
unless requirement_fields.flatten
.all? { |r| required_keys.sort == (r.keys - optional_keys).sort }
raise ArgumentError, "each requirement must have the following " \
"required keys: #{required_keys.join(', ')}." \
"Optionally, it may have the following keys: " \
"#{optional_keys.join(', ')}."
end
return if requirement_fields.flatten.none? { |r| r[:requirement] == "" }
raise ArgumentError, "blank strings must not be provided as requirements"
end
sig { void }
def check_subdependency_metadata
return unless subdependency_metadata
unless subdependency_metadata.is_a?(Array) &&
T.must(subdependency_metadata).all?(Hash)
raise ArgumentError, "subdependency_metadata must be an array of hashes"
end
end
sig { params(hash: T::Hash[T.any(Symbol, String), T.untyped]).returns(T::Hash[Symbol, T.untyped]) }
def symbolize_keys(hash)
hash.keys.to_h { |k| [k.to_sym, hash[k]] }
end
end
end