class Aws::SharedConfig
@api private
def self.config_reader(*attrs)
Uses the get_config_value below to control where
Add an accessor method (similar to attr_reader) to return a configuration value
def self.config_reader(*attrs) attrs.each do |attr| define_method(attr) { |opts = {}| get_config_value(attr.to_s, opts) } end end
def assume_role_credentials_from_config(opts = {})
Will always attempt first to assume a role from the shared credentials
Attempts to assume a role from shared config or shared credentials file.
def assume_role_credentials_from_config(opts = {}) p = opts.delete(:profile) || @profile_name chain_config = opts.delete(:chain_config) credentials = assume_role_from_profile(@parsed_credentials, p, opts, chain_config) if @parsed_config credentials ||= assume_role_from_profile(@parsed_config, p, opts, chain_config) end credentials end
def assume_role_from_profile(cfg, profile, opts, chain_config)
def assume_role_from_profile(cfg, profile, opts, chain_config) if cfg && prof_cfg = cfg[profile] opts[:source_profile] ||= prof_cfg['source_profile'] credential_source = opts.delete(:credential_source) credential_source ||= prof_cfg['credential_source'] if opts[:source_profile] && credential_source raise Errors::CredentialSourceConflictError, "Profile #{profile} has a source_profile, and "\ 'a credential_source. For assume role credentials, must '\ 'provide only source_profile or credential_source, not both.' elsif opts[:source_profile] opts[:visited_profiles] ||= Set.new opts[:credentials] = resolve_source_profile(opts[:source_profile], opts) if opts[:credentials] opts[:role_session_name] ||= prof_cfg['role_session_name'] opts[:role_session_name] ||= 'default_session' opts[:role_arn] ||= prof_cfg['role_arn'] opts[:duration_seconds] ||= prof_cfg['duration_seconds'] opts[:external_id] ||= prof_cfg['external_id'] opts[:serial_number] ||= prof_cfg['mfa_serial'] opts[:profile] = opts.delete(:source_profile) opts.delete(:visited_profiles) AssumeRoleCredentials.new(opts) else raise Errors::NoSourceProfileError, "Profile #{profile} has a role_arn, and source_profile, but the"\ ' source_profile does not have credentials.' end elsif credential_source opts[:credentials] = credentials_from_source( credential_source, chain_config ) if opts[:credentials] opts[:role_session_name] ||= prof_cfg['role_session_name'] opts[:role_session_name] ||= 'default_session' opts[:role_arn] ||= prof_cfg['role_arn'] opts[:duration_seconds] ||= prof_cfg['duration_seconds'] opts[:external_id] ||= prof_cfg['external_id'] opts[:serial_number] ||= prof_cfg['mfa_serial'] opts.delete(:source_profile) # Cleanup AssumeRoleCredentials.new(opts) else raise Errors::NoSourceCredentials, "Profile #{profile} could not get source credentials from"\ " provider #{credential_source}" end elsif prof_cfg['role_arn'] raise Errors::NoSourceProfileError, "Profile #{profile} has a role_arn, but no source_profile." end end end
def assume_role_process_credentials_from_config(profile)
def assume_role_process_credentials_from_config(profile) validate_profile_exists(profile) credential_process = @parsed_credentials.fetch(profile, {})['credential_process'] if @parsed_config credential_process ||= @parsed_config.fetch(profile, {})['credential_process'] end ProcessCredentials.new(credential_process) if credential_process end
def assume_role_web_identity_credentials_from_config(opts = {})
def assume_role_web_identity_credentials_from_config(opts = {}) p = opts[:profile] || @profile_name if @config_enabled && @parsed_config entry = @parsed_config.fetch(p, {}) if entry['web_identity_token_file'] && entry['role_arn'] cfg = { role_arn: entry['role_arn'], web_identity_token_file: entry['web_identity_token_file'], role_session_name: entry['role_session_name'] } cfg[:region] = opts[:region] if opts[:region] AssumeRoleWebIdentityCredentials.new(cfg) end end end
def config_enabled?
-
(Boolean)
- returns `true` if use of the shared config file is
def config_enabled? @config_enabled ? true : false end
def configured_endpoint(opts = {})
(**opts)
-
:service_id
(String
) -- -
:profile
(String
) --
Parameters:
-
opts
(Hash
) --
def configured_endpoint(opts = {}) # services section is only allowed in the shared config file (not credentials) profile = opts[:profile] || @profile_name service_id = opts[:service_id]&.gsub(" ", "_")&.downcase if @parsed_config && (prof_config = @parsed_config[profile]) services_section_name = prof_config['services'] if (services_config = @parsed_config["services #{services_section_name}"]) && (service_config = services_config[service_id]) return service_config['endpoint_url'] if service_config['endpoint_url'] end return prof_config['endpoint_url'] end nil end
def credentials(opts = {})
-
(Aws::Credentials)
- credentials sourced from configuration values,
Options Hash:
(**options)
-
:profile
(String
) -- the name of the configuration file from
Parameters:
-
opts
(Hash
) --
def credentials(opts = {}) p = opts[:profile] || @profile_name validate_profile_exists(p) if (credentials = credentials_from_shared(p, opts)) credentials elsif (credentials = credentials_from_config(p, opts)) credentials end end
def credentials_from_config(profile, _opts)
def credentials_from_config(profile, _opts) if @parsed_config && prof_config = @parsed_config[profile] credentials_from_profile(prof_config) end end
def credentials_from_profile(prof_config)
def credentials_from_profile(prof_config) creds = Credentials.new( prof_config['aws_access_key_id'], prof_config['aws_secret_access_key'], prof_config['aws_session_token'] ) creds if creds.set? end
def credentials_from_shared(profile, _opts)
def credentials_from_shared(profile, _opts) if @parsed_credentials && prof_config = @parsed_credentials[profile] credentials_from_profile(prof_config) end end
def credentials_from_source(credential_source, config)
def credentials_from_source(credential_source, config) case credential_source when 'Ec2InstanceMetadata' InstanceProfileCredentials.new( retries: config ? config.instance_profile_credentials_retries : 0, http_open_timeout: config ? config.instance_profile_credentials_timeout : 1, http_read_timeout: config ? config.instance_profile_credentials_timeout : 1 ) when 'EcsContainer' ECSCredentials.new else raise Errors::InvalidCredentialSourceError, "Unsupported credential_source: #{credential_source}" end end
def default_shared_config_path(file)
def default_shared_config_path(file) File.join(Dir.home, '.aws', file) rescue ArgumentError # Dir.home raises ArgumentError when ENV['home'] is not set nil end
def determine_config_path
def determine_config_path ENV['AWS_CONFIG_FILE'] || default_shared_config_path('config') end
def determine_credentials_path
def determine_credentials_path ENV['AWS_SHARED_CREDENTIALS_FILE'] || default_shared_config_path('credentials') end
def determine_profile(options)
def determine_profile(options) ret = options[:profile_name] ret ||= ENV['AWS_PROFILE'] ret ||= 'default' ret end
def fresh(options = {})
- Api: - private
def fresh(options = {}) @profile_name = nil @credentials_path = nil @config_path = nil @parsed_credentials = {} @parsed_config = nil @config_enabled = options[:config_enabled] ? true : false @profile_name = determine_profile(options) @credentials_path = options[:credentials_path] || determine_credentials_path load_credentials_file if loadable?(@credentials_path) if @config_enabled @config_path = options[:config_path] || determine_config_path load_config_file if loadable?(@config_path) end end
def get_config_value(key, opts)
Only loads a value when config_enabled is true
Get a config value from from shared credential/config files.
def get_config_value(key, opts) p = opts[:profile] || @profile_name value = @parsed_credentials.fetch(p, {})[key] if @parsed_credentials value ||= @parsed_config.fetch(p, {})[key] if @config_enabled && @parsed_config value end
def initialize(options = {})
(**options)
-
:config_enabled
(Boolean
) -- If true, loads the shared config -
:profile_name
(String
) -- The credential/config profile name -
:config_path
(String
) -- Path to the shared config file. -
:credentials_path
(String
) -- Path to the shared credentials
Parameters:
-
options
(Hash
) --
def initialize(options = {}) @parsed_config = nil @profile_name = determine_profile(options) @config_enabled = options[:config_enabled] @credentials_path = options[:credentials_path] || determine_credentials_path @credentials_path = File.expand_path(@credentials_path) if @credentials_path @parsed_credentials = {} load_credentials_file if loadable?(@credentials_path) if @config_enabled @config_path = options[:config_path] || determine_config_path @config_path = File.expand_path(@config_path) if @config_path load_config_file if loadable?(@config_path) end end
def load_config_file
def load_config_file @parsed_config = IniParser.ini_parse(File.read(@config_path)) end
def load_credentials_file
def load_credentials_file @parsed_credentials = IniParser.ini_parse( File.read(@credentials_path) ) end
def loadable?(path)
- Note: - This method does not indicate if the file found at {#path}
Returns:
-
(Boolean)
- Returns `true` if a credential file
def loadable?(path) !path.nil? && File.exist?(path) && File.readable?(path) end
def resolve_source_profile(profile, opts = {})
def resolve_source_profile(profile, opts = {}) if opts[:visited_profiles] && opts[:visited_profiles].include?(profile) raise Errors::SourceProfileCircularReferenceError end opts[:visited_profiles].add(profile) if opts[:visited_profiles] profile_config = @parsed_credentials[profile] if @config_enabled profile_config ||= @parsed_config[profile] end if (creds = credentials(profile: profile)) creds # static credentials elsif profile_config && profile_config['source_profile'] opts.delete(:source_profile) assume_role_credentials_from_config(opts.merge(profile: profile)) elsif (provider = assume_role_web_identity_credentials_from_config(opts.merge(profile: profile))) provider.credentials if provider.credentials.set? elsif (provider = assume_role_process_credentials_from_config(profile)) provider.credentials if provider.credentials.set? elsif (provider = sso_credentials_from_config(profile: profile)) provider.credentials if provider.credentials.set? end end
def sso_credentials_from_config(opts = {})
Will always attempt first to load from the shared credentials
Attempts to load from shared config or shared credentials file.
def sso_credentials_from_config(opts = {}) p = opts[:profile] || @profile_name credentials = sso_credentials_from_profile(@parsed_credentials, p) if @parsed_config credentials ||= sso_credentials_from_profile(@parsed_config, p) end credentials end
def sso_credentials_from_profile(cfg, profile)
If any of the sso_ profile values are present, attempt to construct
def sso_credentials_from_profile(cfg, profile) if @parsed_config && (prof_config = cfg[profile]) && !(prof_config.keys & SSO_CREDENTIAL_PROFILE_KEYS).empty? if sso_session_name = prof_config['sso_session'] sso_session = sso_session(cfg, profile, sso_session_name) sso_region = sso_session['sso_region'] sso_start_url = sso_session['sso_start_url'] # validate sso_region and sso_start_url don't conflict if set on profile and session if prof_config['sso_region'] && prof_config['sso_region'] != sso_region raise ArgumentError, "sso-session #{sso_session_name}'s sso_region (#{sso_region}) " \ "does not match the profile #{profile}'s sso_region (#{prof_config['sso_region']}'" end if prof_config['sso_start_url'] && prof_config['sso_start_url'] != sso_start_url raise ArgumentError, "sso-session #{sso_session_name}'s sso_start_url (#{sso_start_url}) " \ "does not match the profile #{profile}'s sso_start_url (#{prof_config['sso_start_url']}'" end else sso_region = prof_config['sso_region'] sso_start_url = prof_config['sso_start_url'] end SSOCredentials.new( sso_account_id: prof_config['sso_account_id'], sso_role_name: prof_config['sso_role_name'], sso_session: prof_config['sso_session'], sso_region: sso_region, sso_start_url: sso_start_url ) end end
def sso_session(cfg, profile, sso_session_name)
def sso_session(cfg, profile, sso_session_name) # aws sso-configure may add quotes around sso session names with whitespace sso_session = cfg["sso-session #{sso_session_name}"] || cfg["sso-session '#{sso_session_name}'"] unless sso_session raise ArgumentError, "sso-session #{sso_session_name} must be defined in the config file. " \ "Referenced by profile #{profile}" end unless sso_session['sso_region'] raise ArgumentError, "sso-session #{sso_session_name} missing required parameter: sso_region" end sso_session end
def sso_token_from_config(opts = {})
Will always attempt first to load from the shared credentials
Attempts to load from shared config or shared credentials file.
def sso_token_from_config(opts = {}) p = opts[:profile] || @profile_name token = sso_token_from_profile(@parsed_credentials, p) if @parsed_config token ||= sso_token_from_profile(@parsed_config, p) end token end
def sso_token_from_profile(cfg, profile)
If the required sso_ profile values are present, attempt to construct
def sso_token_from_profile(cfg, profile) if @parsed_config && (prof_config = cfg[profile]) && !(prof_config.keys & SSO_TOKEN_PROFILE_KEYS).empty? sso_session_name = prof_config['sso_session'] sso_session = sso_session(cfg, profile, sso_session_name) SSOTokenProvider.new( sso_session: sso_session_name, sso_region: sso_session['sso_region'] ) end end
def validate_profile_exists(profile)
def validate_profile_exists(profile) unless (@parsed_credentials && @parsed_credentials[profile]) || (@parsed_config && @parsed_config[profile]) msg = "Profile `#{profile}' not found in #{@credentials_path}"\ "#{" or #{@config_path}" if @config_path}" raise Errors::NoSuchProfileError, msg end end