class Aws::SharedConfig

@api private

def self.config_reader(*attrs)

values are loaded from
Uses the get_config_value below to control where
Add an accessor method (similar to attr_reader) to return a configuration value
def self.config_reader(*attrs)
  attrs.each do |attr|
    define_method(attr) { |opts = {}| get_config_value(attr.to_s, opts) }
  end
end

def assume_role_credentials_from_config(opts = {})

file, if present.
Will always attempt first to assume a role from the shared credentials
Attempts to assume a role from shared config or shared credentials file.
def assume_role_credentials_from_config(opts = {})
  p = opts.delete(:profile) || @profile_name
  chain_config = opts.delete(:chain_config)
  credentials = assume_role_from_profile(@parsed_credentials, p, opts, chain_config)
  if @parsed_config
    credentials ||= assume_role_from_profile(@parsed_config, p, opts, chain_config)
  end
  credentials
end

def assume_role_from_profile(cfg, profile, opts, chain_config)

def assume_role_from_profile(cfg, profile, opts, chain_config)
  if cfg && prof_cfg = cfg[profile]
    opts[:source_profile] ||= prof_cfg['source_profile']
    credential_source = opts.delete(:credential_source)
    credential_source ||= prof_cfg['credential_source']
    if opts[:source_profile] && credential_source
      raise Errors::CredentialSourceConflictError,
        "Profile #{profile} has a source_profile, and "\
        'a credential_source. For assume role credentials, must '\
        'provide only source_profile or credential_source, not both.'
    elsif opts[:source_profile]
      opts[:visited_profiles] ||= Set.new
      opts[:credentials] = resolve_source_profile(opts[:source_profile], opts)
      if opts[:credentials]
        opts[:role_session_name] ||= prof_cfg['role_session_name']
        opts[:role_session_name] ||= 'default_session'
        opts[:role_arn] ||= prof_cfg['role_arn']
        opts[:duration_seconds] ||= prof_cfg['duration_seconds']
        opts[:external_id] ||= prof_cfg['external_id']
        opts[:serial_number] ||= prof_cfg['mfa_serial']
        opts[:profile] = opts.delete(:source_profile)
        opts.delete(:visited_profiles)
        AssumeRoleCredentials.new(opts)
      else
        raise Errors::NoSourceProfileError,
          "Profile #{profile} has a role_arn, and source_profile, but the"\
          ' source_profile does not have credentials.'
      end
    elsif credential_source
      opts[:credentials] = credentials_from_source(
        credential_source,
        chain_config
      )
      if opts[:credentials]
        opts[:role_session_name] ||= prof_cfg['role_session_name']
        opts[:role_session_name] ||= 'default_session'
        opts[:role_arn] ||= prof_cfg['role_arn']
        opts[:duration_seconds] ||= prof_cfg['duration_seconds']
        opts[:external_id] ||= prof_cfg['external_id']
        opts[:serial_number] ||= prof_cfg['mfa_serial']
        opts.delete(:source_profile) # Cleanup
        AssumeRoleCredentials.new(opts)
      else
        raise Errors::NoSourceCredentials,
          "Profile #{profile} could not get source credentials from"\
          " provider #{credential_source}"
      end
    elsif prof_cfg['role_arn']
      raise Errors::NoSourceProfileError, "Profile #{profile} has a role_arn, but no source_profile."
    end
  end
end

def assume_role_process_credentials_from_config(profile)

def assume_role_process_credentials_from_config(profile)
  validate_profile_exists(profile)
  credential_process = @parsed_credentials.fetch(profile, {})['credential_process']
  if @parsed_config
    credential_process ||= @parsed_config.fetch(profile, {})['credential_process']
  end
  ProcessCredentials.new(credential_process) if credential_process
end

def assume_role_web_identity_credentials_from_config(opts = {})

def assume_role_web_identity_credentials_from_config(opts = {})
  p = opts[:profile] || @profile_name
  if @config_enabled && @parsed_config
    entry = @parsed_config.fetch(p, {})
    if entry['web_identity_token_file'] && entry['role_arn']
      cfg = {
        role_arn: entry['role_arn'],
        web_identity_token_file: entry['web_identity_token_file'],
        role_session_name: entry['role_session_name']
      }
      cfg[:region] = opts[:region] if opts[:region]
      AssumeRoleWebIdentityCredentials.new(cfg)
    end
  end
end

def config_enabled?

Returns:
  • (Boolean) - returns `true` if use of the shared config file is
def config_enabled?
  @config_enabled ? true : false
end

def configured_endpoint(opts = {})

Options Hash: (**opts)
  • :service_id (String) --
  • :profile (String) --

Parameters:
  • opts (Hash) --
def configured_endpoint(opts = {})
  # services section is only allowed in the shared config file (not credentials)
  profile = opts[:profile] || @profile_name
  service_id = opts[:service_id]&.gsub(" ", "_")&.downcase
  if @parsed_config && (prof_config = @parsed_config[profile])
    services_section_name = prof_config['services']
    if (services_config = @parsed_config["services #{services_section_name}"]) &&
      (service_config = services_config[service_id])
      return service_config['endpoint_url'] if service_config['endpoint_url']
    end
    return prof_config['endpoint_url']
  end
  nil
end

def credentials(opts = {})

Returns:
  • (Aws::Credentials) - credentials sourced from configuration values,

Options Hash: (**options)
  • :profile (String) -- the name of the configuration file from

Parameters:
  • opts (Hash) --
def credentials(opts = {})
  p = opts[:profile] || @profile_name
  validate_profile_exists(p)
  if (credentials = credentials_from_shared(p, opts))
    credentials
  elsif (credentials = credentials_from_config(p, opts))
    credentials
  end
end

def credentials_from_config(profile, _opts)

def credentials_from_config(profile, _opts)
  if @parsed_config && prof_config = @parsed_config[profile]
    credentials_from_profile(prof_config)
  end
end

def credentials_from_profile(prof_config)

def credentials_from_profile(prof_config)
  creds = Credentials.new(
    prof_config['aws_access_key_id'],
    prof_config['aws_secret_access_key'],
    prof_config['aws_session_token']
  )
  creds if creds.set?
end

def credentials_from_shared(profile, _opts)

def credentials_from_shared(profile, _opts)
  if @parsed_credentials && prof_config = @parsed_credentials[profile]
    credentials_from_profile(prof_config)
  end
end

def credentials_from_source(credential_source, config)

def credentials_from_source(credential_source, config)
  case credential_source
  when 'Ec2InstanceMetadata'
    InstanceProfileCredentials.new(
      retries: config ? config.instance_profile_credentials_retries : 0,
      http_open_timeout: config ? config.instance_profile_credentials_timeout : 1,
      http_read_timeout: config ? config.instance_profile_credentials_timeout : 1
    )
  when 'EcsContainer'
    ECSCredentials.new
  else
    raise Errors::InvalidCredentialSourceError, "Unsupported credential_source: #{credential_source}"
  end
end

def default_shared_config_path(file)

def default_shared_config_path(file)
  File.join(Dir.home, '.aws', file)
rescue ArgumentError
  # Dir.home raises ArgumentError when ENV['home'] is not set
  nil
end

def determine_config_path

def determine_config_path
  ENV['AWS_CONFIG_FILE'] || default_shared_config_path('config')
end

def determine_credentials_path

def determine_credentials_path
  ENV['AWS_SHARED_CREDENTIALS_FILE'] || default_shared_config_path('credentials')
end

def determine_profile(options)

def determine_profile(options)
  ret = options[:profile_name]
  ret ||= ENV['AWS_PROFILE']
  ret ||= 'default'
  ret
end

def fresh(options = {})

Other tags:
    Api: - private
def fresh(options = {})
  @profile_name = nil
  @credentials_path = nil
  @config_path = nil
  @parsed_credentials = {}
  @parsed_config = nil
  @config_enabled = options[:config_enabled] ? true : false
  @profile_name = determine_profile(options)
  @credentials_path = options[:credentials_path] ||
                      determine_credentials_path
  load_credentials_file if loadable?(@credentials_path)
  if @config_enabled
    @config_path = options[:config_path] || determine_config_path
    load_config_file if loadable?(@config_path)
  end
end

def get_config_value(key, opts)

Return a value from credentials preferentially over config
Only loads a value when config_enabled is true
Get a config value from from shared credential/config files.
def get_config_value(key, opts)
  p = opts[:profile] || @profile_name
  value = @parsed_credentials.fetch(p, {})[key] if @parsed_credentials
  value ||= @parsed_config.fetch(p, {})[key] if @config_enabled && @parsed_config
  value
end

def initialize(options = {})

Options Hash: (**options)
  • :config_enabled (Boolean) -- If true, loads the shared config
  • :profile_name (String) -- The credential/config profile name
  • :config_path (String) -- Path to the shared config file.
  • :credentials_path (String) -- Path to the shared credentials

Parameters:
  • options (Hash) --
def initialize(options = {})
  @parsed_config = nil
  @profile_name = determine_profile(options)
  @config_enabled = options[:config_enabled]
  @credentials_path = options[:credentials_path] ||
                      determine_credentials_path
  @credentials_path = File.expand_path(@credentials_path) if @credentials_path
  @parsed_credentials = {}
  load_credentials_file if loadable?(@credentials_path)
  if @config_enabled
    @config_path = options[:config_path] || determine_config_path
    @config_path = File.expand_path(@config_path) if @config_path
    load_config_file if loadable?(@config_path)
  end
end

def load_config_file

def load_config_file
  @parsed_config = IniParser.ini_parse(File.read(@config_path))
end

def load_credentials_file

def load_credentials_file
  @parsed_credentials = IniParser.ini_parse(
    File.read(@credentials_path)
  )
end

def loadable?(path)

Other tags:
    Note: - This method does not indicate if the file found at {#path}

Returns:
  • (Boolean) - Returns `true` if a credential file
def loadable?(path)
  !path.nil? && File.exist?(path) && File.readable?(path)
end

def resolve_source_profile(profile, opts = {})

def resolve_source_profile(profile, opts = {})
  if opts[:visited_profiles] && opts[:visited_profiles].include?(profile)
    raise Errors::SourceProfileCircularReferenceError
  end
  opts[:visited_profiles].add(profile) if opts[:visited_profiles]
  profile_config = @parsed_credentials[profile]
  if @config_enabled
    profile_config ||= @parsed_config[profile]
  end
  if (creds = credentials(profile: profile))
    creds # static credentials
  elsif profile_config && profile_config['source_profile']
    opts.delete(:source_profile)
    assume_role_credentials_from_config(opts.merge(profile: profile))
  elsif (provider = assume_role_web_identity_credentials_from_config(opts.merge(profile: profile)))
    provider.credentials if provider.credentials.set?
  elsif (provider = assume_role_process_credentials_from_config(profile))
    provider.credentials if provider.credentials.set?
  elsif (provider = sso_credentials_from_config(profile: profile))
    provider.credentials if provider.credentials.set?
  end
end

def sso_credentials_from_config(opts = {})

file, if present.
Will always attempt first to load from the shared credentials
Attempts to load from shared config or shared credentials file.
def sso_credentials_from_config(opts = {})
  p = opts[:profile] || @profile_name
  credentials = sso_credentials_from_profile(@parsed_credentials, p)
  if @parsed_config
    credentials ||= sso_credentials_from_profile(@parsed_config, p)
  end
  credentials
end

def sso_credentials_from_profile(cfg, profile)

SSOCredentials
If any of the sso_ profile values are present, attempt to construct
def sso_credentials_from_profile(cfg, profile)
  if @parsed_config &&
     (prof_config = cfg[profile]) &&
     !(prof_config.keys & SSO_CREDENTIAL_PROFILE_KEYS).empty?
    if sso_session_name = prof_config['sso_session']
      sso_session = sso_session(cfg, profile, sso_session_name)
      sso_region = sso_session['sso_region']
      sso_start_url = sso_session['sso_start_url']
      # validate sso_region and sso_start_url don't conflict if set on profile and session
      if prof_config['sso_region'] &&  prof_config['sso_region'] != sso_region
        raise ArgumentError,
              "sso-session #{sso_session_name}'s sso_region (#{sso_region}) " \
                "does not match the profile #{profile}'s sso_region (#{prof_config['sso_region']}'"
      end
      if prof_config['sso_start_url'] &&  prof_config['sso_start_url'] != sso_start_url
        raise ArgumentError,
              "sso-session #{sso_session_name}'s sso_start_url (#{sso_start_url}) " \
                "does not match the profile #{profile}'s sso_start_url (#{prof_config['sso_start_url']}'"
      end
    else
      sso_region = prof_config['sso_region']
      sso_start_url = prof_config['sso_start_url']
    end
    SSOCredentials.new(
      sso_account_id: prof_config['sso_account_id'],
      sso_role_name: prof_config['sso_role_name'],
      sso_session: prof_config['sso_session'],
      sso_region: sso_region,
      sso_start_url: sso_start_url
      )
  end
end

def sso_session(cfg, profile, sso_session_name)

def sso_session(cfg, profile, sso_session_name)
  # aws sso-configure may add quotes around sso session names with whitespace
  sso_session = cfg["sso-session #{sso_session_name}"] || cfg["sso-session '#{sso_session_name}'"]
  unless sso_session
    raise ArgumentError,
      "sso-session #{sso_session_name} must be defined in the config file. " \
                "Referenced by profile #{profile}"
  end
  unless sso_session['sso_region']
    raise ArgumentError, "sso-session #{sso_session_name} missing required parameter: sso_region"
  end
  sso_session
end

def sso_token_from_config(opts = {})

file, if present.
Will always attempt first to load from the shared credentials
Attempts to load from shared config or shared credentials file.
def sso_token_from_config(opts = {})
  p = opts[:profile] || @profile_name
  token = sso_token_from_profile(@parsed_credentials, p)
  if @parsed_config
    token ||= sso_token_from_profile(@parsed_config, p)
  end
  token
end

def sso_token_from_profile(cfg, profile)

SSOTokenProvider
If the required sso_ profile values are present, attempt to construct
def sso_token_from_profile(cfg, profile)
  if @parsed_config &&
    (prof_config = cfg[profile]) &&
    !(prof_config.keys & SSO_TOKEN_PROFILE_KEYS).empty?
    sso_session_name = prof_config['sso_session']
    sso_session = sso_session(cfg, profile, sso_session_name)
    SSOTokenProvider.new(
      sso_session: sso_session_name,
      sso_region: sso_session['sso_region']
    )
  end
end

def validate_profile_exists(profile)

def validate_profile_exists(profile)
  unless (@parsed_credentials && @parsed_credentials[profile]) ||
         (@parsed_config && @parsed_config[profile])
    msg = "Profile `#{profile}' not found in #{@credentials_path}"\
          "#{" or #{@config_path}" if @config_path}"
    raise Errors::NoSuchProfileError, msg
  end
end