class Aws::SharedConfig

@api private

def assume_role_credentials_from_config(opts = {})

file, if present.
Will always attempt first to assume a role from the shared credentials
Attempts to assume a role from shared config or shared credentials file.
def assume_role_credentials_from_config(opts = {})
  p = opts.delete(:profile) || @profile_name
  chain_config = opts.delete(:chain_config)
  credentials = assume_role_from_profile(@parsed_credentials, p, opts, chain_config)
  if @parsed_config
    credentials ||= assume_role_from_profile(@parsed_config, p, opts, chain_config)
  end
  credentials
end

def assume_role_from_profile(cfg, profile, opts, chain_config)

def assume_role_from_profile(cfg, profile, opts, chain_config)
  if cfg && prof_cfg = cfg[profile]
    opts[:source_profile] ||= prof_cfg["source_profile"]
    credential_source = opts.delete(:credential_source)
    credential_source ||= prof_cfg["credential_source"]
    if opts[:source_profile] && credential_source
      raise Errors::CredentialSourceConflictError.new(
        "Profile #{profile} has a source_profile, and "\
          "a credential_source. For assume role credentials, must "\
          "provide only source_profile or credential_source, not both."
      )
    elsif opts[:source_profile]
      opts[:credentials] = credentials(profile: opts[:source_profile])
      if opts[:credentials]
        opts[:role_session_name] ||= prof_cfg["role_session_name"]
        opts[:role_session_name] ||= "default_session"
        opts[:role_arn] ||= prof_cfg["role_arn"]
        opts[:external_id] ||= prof_cfg["external_id"]
        opts[:serial_number] ||= prof_cfg["mfa_serial"]
        opts[:profile] = opts.delete(:source_profile)
        AssumeRoleCredentials.new(opts)
      else
        raise Errors::NoSourceProfileError.new(
          "Profile #{profile} has a role_arn, and source_profile, but the"\
            " source_profile does not have credentials."
        )
      end
    elsif credential_source
      opts[:credentials] = credentials_from_source(
        credential_source,
        chain_config
      )
      if opts[:credentials]
        opts[:role_session_name] ||= prof_cfg["role_session_name"]
        opts[:role_session_name] ||= "default_session"
        opts[:role_arn] ||= prof_cfg["role_arn"]
        opts[:external_id] ||= prof_cfg["external_id"]
        opts[:serial_number] ||= prof_cfg["mfa_serial"]
        opts.delete(:source_profile) # Cleanup
        AssumeRoleCredentials.new(opts)
      else
        raise Errors::NoSourceCredentials.new(
          "Profile #{profile} could not get source credentials from"\
            " provider #{credential_source}"
        )
      end
    elsif prof_cfg["role_arn"]
      raise Errors::NoSourceProfileError.new(
        "Profile #{profile} has a role_arn, but no source_profile."
      )
    else
      nil
    end
  else
    nil
  end
end

def config_enabled?

Returns:
  • (Boolean) - returns `true` if use of the shared config file is
def config_enabled?
  @config_enabled ? true : false
end

def credentials(opts = {})

Returns:
  • (Aws::Credentials) - credentials sourced from configuration values,

Options Hash: (**options)
  • :profile (String) -- the name of the configuration file from

Parameters:
  • opts (Hash) --
def credentials(opts = {})
  p = opts[:profile] || @profile_name
  validate_profile_exists(p) if credentials_present?
  if credentials = credentials_from_shared(p, opts)
    credentials
  elsif credentials = credentials_from_config(p, opts)
    credentials
  else
    nil
  end
end

def credentials_complete(creds)

def credentials_complete(creds)
  creds.set?
end

def credentials_from_config(profile, opts)

def credentials_from_config(profile, opts)
  if @parsed_config && prof_config = @parsed_config[profile]
    credentials_from_profile(prof_config)
  else
    nil
  end
end

def credentials_from_profile(prof_config)

def credentials_from_profile(prof_config)
  creds = Credentials.new(
    prof_config['aws_access_key_id'],
    prof_config['aws_secret_access_key'],
    prof_config['aws_session_token']
  )
  if credentials_complete(creds)
    creds
  else
    nil
  end
end

def credentials_from_shared(profile, opts)

def credentials_from_shared(profile, opts)
  if @parsed_credentials && prof_config = @parsed_credentials[profile]
    credentials_from_profile(prof_config)
  else
    nil
  end
end

def credentials_from_source(credential_source, config)

def credentials_from_source(credential_source, config)
  case credential_source
  when "Ec2InstanceMetadata"
    InstanceProfileCredentials.new(
      retries: config ? config.instance_profile_credentials_retries : 0,
      http_open_timeout: config ? config.instance_profile_credentials_timeout : 1,
      http_read_timeout: config ? config.instance_profile_credentials_timeout : 1
    )
  when "EcsContainer"
    ECSCredentials.new
  else
    raise Errors::InvalidCredentialSourceError.new(
      "Unsupported credential_source: #{credential_source}"
    )
  end
end

def credentials_present?

def credentials_present?
  (@parsed_credentials && !@parsed_credentials.empty?) ||
    (@parsed_config && !@parsed_config.empty?)
end

def credentials_process(profile)

def credentials_process(profile)
  validate_profile_exists(profile)
  @parsed_config[profile]['credential_process']
end

def csm_client_id(opts = {})

def csm_client_id(opts = {})
  p = opts[:profile] || @profile_name
  if @config_enabled
    if @parsed_credentials
      value = @parsed_credentials.fetch(p, {})["csm_client_id"]
    end
    if @parsed_config
      value ||= @parsed_config.fetch(p, {})["csm_client_id"]
    end
    value
  else
    nil
  end
end

def csm_enabled(opts = {})

def csm_enabled(opts = {})
  p = opts[:profile] || @profile_name
  if @config_enabled
    if @parsed_credentials
      value = @parsed_credentials.fetch(p, {})["csm_enabled"]
    end
    if @parsed_config
      value ||= @parsed_config.fetch(p, {})["csm_enabled"]
    end
    value
  else
    nil
  end
end

def csm_port(opts = {})

def csm_port(opts = {})
  p = opts[:profile] || @profile_name
  if @config_enabled
    if @parsed_credentials
      value = @parsed_credentials.fetch(p, {})["csm_port"]
    end
    if @parsed_config
      value ||= @parsed_config.fetch(p, {})["csm_port"]
    end
    value
  else
    nil
  end
end

def default_shared_config_path(file)

def default_shared_config_path(file)
  File.join(Dir.home, '.aws', file)
rescue ArgumentError
  # Dir.home raises ArgumentError when ENV['home'] is not set
  nil
end

def determine_config_path

def determine_config_path
  ENV['AWS_CONFIG_FILE'] || default_shared_config_path('config')
end

def determine_credentials_path

def determine_credentials_path
  ENV['AWS_SHARED_CREDENTIALS_FILE'] || default_shared_config_path('credentials')
end

def determine_profile(options)

def determine_profile(options)
  ret = options[:profile_name]
  ret ||= ENV["AWS_PROFILE"]
  ret ||= "default"
  ret
end

def endpoint_discovery(opts = {})

def endpoint_discovery(opts = {})
  p = opts[:profile] || @profile_name
  if @config_enabled && @parsed_config
    @parsed_config.fetch(p, {})["endpoint_discovery_enabled"]
  end
end

def fresh(options = {})

Other tags:
    Api: - private
def fresh(options = {})
  @profile_name = nil
  @credentials_path = nil
  @config_path = nil
  @parsed_credentials = {}
  @parsed_config = nil
  @config_enabled = options[:config_enabled] ? true : false
  @profile_name = determine_profile(options)
  @credentials_path = options[:credentials_path] ||
    determine_credentials_path
  load_credentials_file if loadable?(@credentials_path)
  if @config_enabled
    @config_path = options[:config_path] || determine_config_path
    load_config_file if loadable?(@config_path)
  end
end

def initialize(options = {})

Options Hash: (**options)
  • :config_enabled (Boolean) -- If true, loads the shared config
  • :profile_name (String) -- The credential/config profile name
  • :config_path (String) -- Path to the shared config file.
  • :credentials_path (String) -- Path to the shared credentials

Parameters:
  • options (Hash) --
def initialize(options = {})
  @parsed_config = nil
  @profile_name = determine_profile(options)
  @config_enabled = options[:config_enabled]
  @credentials_path = options[:credentials_path] ||
    determine_credentials_path
  @parsed_credentials = {}
  load_credentials_file if loadable?(@credentials_path)
  if @config_enabled
    @config_path = options[:config_path] || determine_config_path
    load_config_file if loadable?(@config_path)
  end
end

def load_config_file

def load_config_file
  @parsed_config = IniParser.ini_parse(File.read(@config_path))
end

def load_credentials_file

def load_credentials_file
  @parsed_credentials = IniParser.ini_parse(
    File.read(@credentials_path)
  )
end

def loadable?(path)

Other tags:
    Note: - This method does not indicate if the file found at {#path}

Returns:
  • (Boolean) - Returns `true` if a credential file
def loadable?(path)
  !path.nil? && File.exist?(path) && File.readable?(path)
end

def region(opts = {})

def region(opts = {})
  p = opts[:profile] || @profile_name
  if @config_enabled
    if @parsed_credentials
      region = @parsed_credentials.fetch(p, {})["region"]
    end
    if @parsed_config
      region ||= @parsed_config.fetch(p, {})["region"]
    end
    region
  else
    nil
  end
end

def validate_profile_exists(profile)

def validate_profile_exists(profile)
  unless (@parsed_credentials && @parsed_credentials[profile]) ||
      (@parsed_config && @parsed_config[profile])
    msg = "Profile `#{profile}' not found in #{@credentials_path}"
    msg << " or #{@config_path}" if @config_path
    raise Errors::NoSuchProfileError.new(msg)
  end
end