lib/aws-sdk-core/shared_config.rb



# frozen_string_literal: true

module Aws
  # @api private
  class SharedConfig
    SSO_CREDENTIAL_PROFILE_KEYS = %w[sso_account_id sso_role_name].freeze
    SSO_PROFILE_KEYS = %w[sso_session sso_start_url sso_region sso_account_id sso_role_name].freeze
    SSO_TOKEN_PROFILE_KEYS = %w[sso_session].freeze
    SSO_SESSION_KEYS = %w[sso_region sso_start_url].freeze


    # @return [String]
    attr_reader :credentials_path

    # @return [String]
    attr_reader :config_path

    # @return [String]
    attr_reader :profile_name

    # Constructs a new SharedConfig provider object. This will load the shared
    # credentials file, and optionally the shared configuration file, as ini
    # files which support profiles.
    #
    # By default, the shared credential file (the default path for which is
    # `~/.aws/credentials`) and the shared config file (the default path for
    # which is `~/.aws/config`) are loaded. However, if you set the
    # `ENV['AWS_SDK_CONFIG_OPT_OUT']` environment variable, only the shared
    # credential file will be loaded. You can specify the shared credential
    # file path with the `ENV['AWS_SHARED_CREDENTIALS_FILE']` environment
    # variable or with the `:credentials_path` option. Similarly, you can
    # specify the shared config file path with the `ENV['AWS_CONFIG_FILE']`
    # environment variable or with the `:config_path` option.
    #
    # The default profile name is 'default'. You can specify the profile name
    # with the `ENV['AWS_PROFILE']` environment variable or with the
    # `:profile_name` option.
    #
    # @param [Hash] options
    # @option options [String] :credentials_path Path to the shared credentials
    #   file. If not specified, will check `ENV['AWS_SHARED_CREDENTIALS_FILE']`
    #   before using the default value of "#{Dir.home}/.aws/credentials".
    # @option options [String] :config_path Path to the shared config file.
    #   If not specified, will check `ENV['AWS_CONFIG_FILE']` before using the
    #   default value of "#{Dir.home}/.aws/config".
    # @option options [String] :profile_name The credential/config profile name
    #   to use. If not specified, will check `ENV['AWS_PROFILE']` before using
    #   the fixed default value of 'default'.
    # @option options [Boolean] :config_enabled If true, loads the shared config
    #   file and enables new config values outside of the old shared credential
    #   spec.
    def initialize(options = {})
      @parsed_config = nil
      @profile_name = determine_profile(options)
      @config_enabled = options[:config_enabled]
      @credentials_path = options[:credentials_path] ||
                          determine_credentials_path
      @credentials_path = File.expand_path(@credentials_path) if @credentials_path
      @parsed_credentials = {}
      load_credentials_file if loadable?(@credentials_path)
      if @config_enabled
        @config_path = options[:config_path] || determine_config_path
        @config_path = File.expand_path(@config_path) if @config_path
        load_config_file if loadable?(@config_path)
      end
    end

    # @api private
    def fresh(options = {})
      @profile_name = nil
      @credentials_path = nil
      @config_path = nil
      @parsed_credentials = {}
      @parsed_config = nil
      @config_enabled = options[:config_enabled] ? true : false
      @profile_name = determine_profile(options)
      @credentials_path = options[:credentials_path] ||
                          determine_credentials_path
      load_credentials_file if loadable?(@credentials_path)
      if @config_enabled
        @config_path = options[:config_path] || determine_config_path
        load_config_file if loadable?(@config_path)
      end
    end

    # @return [Boolean] Returns `true` if a credential file
    #   exists and has appropriate read permissions at {#path}.
    # @note This method does not indicate if the file found at {#path}
    #   will be parsable, only if it can be read.
    def loadable?(path)
      !path.nil? && File.exist?(path) && File.readable?(path)
    end

    # @return [Boolean] returns `true` if use of the shared config file is
    #   enabled.
    def config_enabled?
      @config_enabled ? true : false
    end

    # Sources static credentials from shared credential/config files.
    #
    # @param [Hash] opts
    # @option options [String] :profile the name of the configuration file from
    #   which credentials are being sourced.
    # @return [Aws::Credentials] credentials sourced from configuration values,
    #   or `nil` if no valid credentials were found.
    def credentials(opts = {})
      p = opts[:profile] || @profile_name
      validate_profile_exists(p)
      if (credentials = credentials_from_shared(p, opts))
        credentials
      elsif (credentials = credentials_from_config(p, opts))
        credentials
      end
    end

    # Attempts to assume a role from shared config or shared credentials file.
    # Will always attempt first to assume a role from the shared credentials
    # file, if present.
    def assume_role_credentials_from_config(opts = {})
      p = opts.delete(:profile) || @profile_name
      chain_config = opts.delete(:chain_config)
      credentials = assume_role_from_profile(@parsed_credentials, p, opts, chain_config)
      if @parsed_config
        credentials ||= assume_role_from_profile(@parsed_config, p, opts, chain_config)
      end
      credentials
    end

    def assume_role_web_identity_credentials_from_config(opts = {})
      p = opts[:profile] || @profile_name
      if @config_enabled && @parsed_config
        entry = @parsed_config.fetch(p, {})
        if entry['web_identity_token_file'] && entry['role_arn']
          cfg = {
            role_arn: entry['role_arn'],
            web_identity_token_file: entry['web_identity_token_file'],
            role_session_name: entry['role_session_name']
          }
          cfg[:region] = opts[:region] if opts[:region]
          AssumeRoleWebIdentityCredentials.new(cfg)
        end
      end
    end

    # Attempts to load from shared config or shared credentials file.
    # Will always attempt first to load from the shared credentials
    # file, if present.
    def sso_credentials_from_config(opts = {})
      p = opts[:profile] || @profile_name
      credentials = sso_credentials_from_profile(@parsed_credentials, p)
      if @parsed_config
        credentials ||= sso_credentials_from_profile(@parsed_config, p)
      end
      credentials
    end

    # Attempts to load from shared config or shared credentials file.
    # Will always attempt first to load from the shared credentials
    # file, if present.
    def sso_token_from_config(opts = {})
      p = opts[:profile] || @profile_name
      token = sso_token_from_profile(@parsed_credentials, p)
      if @parsed_config
        token ||= sso_token_from_profile(@parsed_config, p)
      end
      token
    end

    # Source a custom configured endpoint from the shared configuration file
    #
    # @param [Hash] opts
    # @option opts [String] :profile
    # @option opts [String] :service_id
    def configured_endpoint(opts = {})
      # services section is only allowed in the shared config file (not credentials)
      profile = opts[:profile] || @profile_name
      service_id = opts[:service_id]&.gsub(" ", "_")&.downcase
      if @parsed_config && (prof_config = @parsed_config[profile])
        services_section_name = prof_config['services']
        if (services_config = @parsed_config["services #{services_section_name}"]) &&
          (service_config = services_config[service_id])
          return service_config['endpoint_url'] if service_config['endpoint_url']
        end
        return prof_config['endpoint_url']
      end
      nil
    end

    # Add an accessor method (similar to attr_reader) to return a configuration value
    # Uses the get_config_value below to control where
    # values are loaded from
    def self.config_reader(*attrs)
      attrs.each do |attr|
        define_method(attr) { |opts = {}| get_config_value(attr.to_s, opts) }
      end
    end

    config_reader(
      :region,
      :ca_bundle,
      :credential_process,
      :endpoint_discovery_enabled,
      :use_dualstack_endpoint,
      :use_fips_endpoint,
      :ec2_metadata_service_endpoint,
      :ec2_metadata_service_endpoint_mode,
      :ec2_metadata_v1_disabled,
      :max_attempts,
      :retry_mode,
      :adaptive_retry_wait_to_fill,
      :correct_clock_skew,
      :csm_client_id,
      :csm_enabled,
      :csm_host,
      :csm_port,
      :sts_regional_endpoints,
      :s3_use_arn_region,
      :s3_us_east_1_regional_endpoint,
      :s3_disable_multiregion_access_points,
      :s3_disable_express_session_auth,
      :defaults_mode,
      :sdk_ua_app_id,
      :disable_request_compression,
      :request_min_compression_size_bytes,
      :ignore_configured_endpoint_urls
    )

    private

    # Get a config value from from shared credential/config files.
    # Only loads a value when config_enabled is true
    # Return a value from credentials preferentially over config
    def get_config_value(key, opts)
      p = opts[:profile] || @profile_name

      value = @parsed_credentials.fetch(p, {})[key] if @parsed_credentials
      value ||= @parsed_config.fetch(p, {})[key] if @config_enabled && @parsed_config
      value
    end

    def assume_role_from_profile(cfg, profile, opts, chain_config)
      if cfg && prof_cfg = cfg[profile]
        opts[:source_profile] ||= prof_cfg['source_profile']
        credential_source = opts.delete(:credential_source)
        credential_source ||= prof_cfg['credential_source']
        if opts[:source_profile] && credential_source
          raise Errors::CredentialSourceConflictError,
            "Profile #{profile} has a source_profile, and "\
            'a credential_source. For assume role credentials, must '\
            'provide only source_profile or credential_source, not both.'
        elsif opts[:source_profile]
          opts[:visited_profiles] ||= Set.new
          opts[:credentials] = resolve_source_profile(opts[:source_profile], opts)
          if opts[:credentials]
            opts[:role_session_name] ||= prof_cfg['role_session_name']
            opts[:role_session_name] ||= 'default_session'
            opts[:role_arn] ||= prof_cfg['role_arn']
            opts[:duration_seconds] ||= prof_cfg['duration_seconds']
            opts[:external_id] ||= prof_cfg['external_id']
            opts[:serial_number] ||= prof_cfg['mfa_serial']
            opts[:profile] = opts.delete(:source_profile)
            opts.delete(:visited_profiles)
            AssumeRoleCredentials.new(opts)
          else
            raise Errors::NoSourceProfileError,
              "Profile #{profile} has a role_arn, and source_profile, but the"\
              ' source_profile does not have credentials.'
          end
        elsif credential_source
          opts[:credentials] = credentials_from_source(
            credential_source,
            chain_config
          )
          if opts[:credentials]
            opts[:role_session_name] ||= prof_cfg['role_session_name']
            opts[:role_session_name] ||= 'default_session'
            opts[:role_arn] ||= prof_cfg['role_arn']
            opts[:duration_seconds] ||= prof_cfg['duration_seconds']
            opts[:external_id] ||= prof_cfg['external_id']
            opts[:serial_number] ||= prof_cfg['mfa_serial']
            opts.delete(:source_profile) # Cleanup
            AssumeRoleCredentials.new(opts)
          else
            raise Errors::NoSourceCredentials,
              "Profile #{profile} could not get source credentials from"\
              " provider #{credential_source}"
          end
        elsif prof_cfg['role_arn']
          raise Errors::NoSourceProfileError, "Profile #{profile} has a role_arn, but no source_profile."
        end
      end
    end

    def resolve_source_profile(profile, opts = {})
      if opts[:visited_profiles] && opts[:visited_profiles].include?(profile)
        raise Errors::SourceProfileCircularReferenceError
      end
      opts[:visited_profiles].add(profile) if opts[:visited_profiles]

      profile_config = @parsed_credentials[profile]
      if @config_enabled
        profile_config ||= @parsed_config[profile]
      end

      if (creds = credentials(profile: profile))
        creds # static credentials
      elsif profile_config && profile_config['source_profile']
        opts.delete(:source_profile)
        assume_role_credentials_from_config(opts.merge(profile: profile))
      elsif (provider = assume_role_web_identity_credentials_from_config(opts.merge(profile: profile)))
        provider.credentials if provider.credentials.set?
      elsif (provider = assume_role_process_credentials_from_config(profile))
        provider.credentials if provider.credentials.set?
      elsif (provider = sso_credentials_from_config(profile: profile))
        provider.credentials if provider.credentials.set?
      end
    end

    def credentials_from_source(credential_source, config)
      case credential_source
      when 'Ec2InstanceMetadata'
        InstanceProfileCredentials.new(
          retries: config ? config.instance_profile_credentials_retries : 0,
          http_open_timeout: config ? config.instance_profile_credentials_timeout : 1,
          http_read_timeout: config ? config.instance_profile_credentials_timeout : 1
        )
      when 'EcsContainer'
        ECSCredentials.new
      else
        raise Errors::InvalidCredentialSourceError, "Unsupported credential_source: #{credential_source}"
      end
    end

    def assume_role_process_credentials_from_config(profile)
      validate_profile_exists(profile)
      credential_process = @parsed_credentials.fetch(profile, {})['credential_process']
      if @parsed_config
        credential_process ||= @parsed_config.fetch(profile, {})['credential_process']
      end
      ProcessCredentials.new(credential_process) if credential_process
    end

    def credentials_from_shared(profile, _opts)
      if @parsed_credentials && prof_config = @parsed_credentials[profile]
        credentials_from_profile(prof_config)
      end
    end

    def credentials_from_config(profile, _opts)
      if @parsed_config && prof_config = @parsed_config[profile]
        credentials_from_profile(prof_config)
      end
    end

    # If any of the sso_ profile values are present, attempt to construct
    # SSOCredentials
    def sso_credentials_from_profile(cfg, profile)
      if @parsed_config &&
         (prof_config = cfg[profile]) &&
         !(prof_config.keys & SSO_CREDENTIAL_PROFILE_KEYS).empty?

        if sso_session_name = prof_config['sso_session']
          sso_session = sso_session(cfg, profile, sso_session_name)

          sso_region = sso_session['sso_region']
          sso_start_url = sso_session['sso_start_url']

          # validate sso_region and sso_start_url don't conflict if set on profile and session
          if prof_config['sso_region'] &&  prof_config['sso_region'] != sso_region
            raise ArgumentError,
                  "sso-session #{sso_session_name}'s sso_region (#{sso_region}) " \
                    "does not match the profile #{profile}'s sso_region (#{prof_config['sso_region']}'"
          end
          if prof_config['sso_start_url'] &&  prof_config['sso_start_url'] != sso_start_url
            raise ArgumentError,
                  "sso-session #{sso_session_name}'s sso_start_url (#{sso_start_url}) " \
                    "does not match the profile #{profile}'s sso_start_url (#{prof_config['sso_start_url']}'"
          end
        else
          sso_region = prof_config['sso_region']
          sso_start_url = prof_config['sso_start_url']
        end

        SSOCredentials.new(
          sso_account_id: prof_config['sso_account_id'],
          sso_role_name: prof_config['sso_role_name'],
          sso_session: prof_config['sso_session'],
          sso_region: sso_region,
          sso_start_url: sso_start_url
          )
      end
    end

    # If the required sso_ profile values are present, attempt to construct
    # SSOTokenProvider
    def sso_token_from_profile(cfg, profile)
      if @parsed_config &&
        (prof_config = cfg[profile]) &&
        !(prof_config.keys & SSO_TOKEN_PROFILE_KEYS).empty?

        sso_session_name = prof_config['sso_session']
        sso_session = sso_session(cfg, profile, sso_session_name)

        SSOTokenProvider.new(
          sso_session: sso_session_name,
          sso_region: sso_session['sso_region']
        )
      end
    end

    def credentials_from_profile(prof_config)
      creds = Credentials.new(
        prof_config['aws_access_key_id'],
        prof_config['aws_secret_access_key'],
        prof_config['aws_session_token']
      )
      creds if creds.set?
    end

    def load_credentials_file
      @parsed_credentials = IniParser.ini_parse(
        File.read(@credentials_path)
      )
    end

    def load_config_file
      @parsed_config = IniParser.ini_parse(File.read(@config_path))
    end

    def determine_credentials_path
      ENV['AWS_SHARED_CREDENTIALS_FILE'] || default_shared_config_path('credentials')
    end

    def determine_config_path
      ENV['AWS_CONFIG_FILE'] || default_shared_config_path('config')
    end

    def default_shared_config_path(file)
      File.join(Dir.home, '.aws', file)
    rescue ArgumentError
      # Dir.home raises ArgumentError when ENV['home'] is not set
      nil
    end

    def validate_profile_exists(profile)
      unless (@parsed_credentials && @parsed_credentials[profile]) ||
             (@parsed_config && @parsed_config[profile])
        msg = "Profile `#{profile}' not found in #{@credentials_path}"\
              "#{" or #{@config_path}" if @config_path}"
        raise Errors::NoSuchProfileError, msg
      end
    end

    def determine_profile(options)
      ret = options[:profile_name]
      ret ||= ENV['AWS_PROFILE']
      ret ||= 'default'
      ret
    end

    def sso_session(cfg, profile, sso_session_name)
      # aws sso-configure may add quotes around sso session names with whitespace
      sso_session = cfg["sso-session #{sso_session_name}"] || cfg["sso-session '#{sso_session_name}'"]

      unless sso_session
        raise ArgumentError,
          "sso-session #{sso_session_name} must be defined in the config file. " \
                    "Referenced by profile #{profile}"
      end

      unless sso_session['sso_region']
        raise ArgumentError, "sso-session #{sso_session_name} missing required parameter: sso_region"
      end

      sso_session
    end
  end
end