class Google::Auth::ExternalAccount::AwsCredentials
credentials for a short-lived Google Cloud access token.
by utilizing the AWS EC2 metadata service and then exchanging the
This module handles the retrieval of credentials from Google Cloud
def fetch_metadata_role_name
AWS metadata server security credentials endpoint in order to retrieve
workload by querying the AWS metadata server. This is needed for the
Retrieves the AWS role currently attached to the current AWS
def fetch_metadata_role_name unless @credential_verification_url raise "Unable to determine the AWS metadata server security credentials endpoint" end get_aws_resource(@credential_verification_url, "IAM Role").body end
def fetch_metadata_security_credentials role_name
Retrieves the AWS security credentials required for signing AWS
def fetch_metadata_security_credentials role_name response = get_aws_resource "#{@credential_verification_url}/#{role_name}", "credentials" MultiJson.load response.body end
def fetch_security_credentials
requests from either the AWS security credentials environment variables
Retrieves the AWS security credentials required for signing AWS
def fetch_security_credentials env_aws_access_key_id = ENV[CredentialsLoader::AWS_ACCESS_KEY_ID_VAR] env_aws_secret_access_key = ENV[CredentialsLoader::AWS_SECRET_ACCESS_KEY_VAR] # This is normally not available for permanent credentials. env_aws_session_token = ENV[CredentialsLoader::AWS_SESSION_TOKEN_VAR] if env_aws_access_key_id && env_aws_secret_access_key return { access_key_id: env_aws_access_key_id, secret_access_key: env_aws_secret_access_key, session_token: env_aws_session_token } end role_name = fetch_metadata_role_name credentials = fetch_metadata_security_credentials role_name { access_key_id: credentials["AccessKeyId"], secret_access_key: credentials["SecretAccessKey"], session_token: credentials["Token"] } end
def get_aws_resource url, name, data: nil, headers: {}
def get_aws_resource url, name, data: nil, headers: {} begin unless [nil, url].include? @imdsv2_session_token_url headers["x-aws-ec2-metadata-token"] = get_aws_resource( @imdsv2_session_token_url, "Session Token", headers: { "x-aws-ec2-metadata-token-ttl-seconds": "300" } ).body end response = if data headers["Content-Type"] = "application/json" connection.post url, data, headers else connection.get url, nil, headers end raise Faraday::Error unless response.success? response rescue Faraday::Error raise "Failed to retrieve AWS #{name}." end end
def initialize options = {}
def initialize options = {} base_setup options @audience = options[:audience] @credential_source = options[:credential_source] || {} @environment_id = @credential_source["environment_id"] @region_url = validate_metadata_server @credential_source["region_url"], "region_url" @credential_verification_url = validate_metadata_server @credential_source["url"], "url" @regional_cred_verification_url = @credential_source["regional_cred_verification_url"] @imdsv2_session_token_url = validate_metadata_server @credential_source["imdsv2_session_token_url"], "imdsv2_session_token_url" # These will be lazily loaded when needed, or will raise an error if not provided @region = nil @request_signer = nil end
def region
def region @region = ENV[CredentialsLoader::AWS_REGION_VAR] || ENV[CredentialsLoader::AWS_DEFAULT_REGION_VAR] unless @region raise "region_url or region must be set for external account credentials" unless @region_url @region ||= get_aws_resource(@region_url, "region").body[0..-2] end @region end
def retrieve_subject_token!
-
(string)- The retrieved subject token.
def retrieve_subject_token! if @request_signer.nil? @region = region @request_signer = AwsRequestSigner.new @region end request = { method: "POST", url: @regional_cred_verification_url.sub("{region}", @region) } request_options = @request_signer.generate_signed_request fetch_security_credentials, request request_headers = request_options[:headers] request_headers["x-goog-cloud-target-resource"] = @audience aws_signed_request = { headers: [], method: request_options[:method], url: request_options[:url] } aws_signed_request[:headers] = request_headers.keys.sort.map do |key| { key: key, value: request_headers[key] } end uri_escape aws_signed_request.to_json end
def uri_escape string
def uri_escape string if string.nil? nil else CGI.escape(string.encode("UTF-8")).gsub("+", "%20").gsub("%7E", "~") end end
def validate_metadata_server url, name
def validate_metadata_server url, name return nil if url.nil? host = URI(url).host raise "Invalid host #{host} for #{name}." unless ["169.254.169.254", "[fd00:ec2::254]"].include? host url end