class Google::Auth::ExternalAccount::AwsCredentials
then exchanging the credentials for a short-lived Google Cloud access token.
This module handles the retrieval of credentials from Google Cloud by utilizing the AWS EC2 metadata service and
def fetch_metadata_role_name
-
(Google::Auth::CredentialsError)- If the credential verification URL is not set or if the request fails
Returns:
-
(String)- The AWS role name
def fetch_metadata_role_name unless @credential_verification_url raise CredentialsError.with_details( "Unable to determine the AWS metadata server security credentials endpoint", credential_type_name: self.class.name, principal: principal ) end get_aws_resource(@credential_verification_url, "IAM Role").body end
def fetch_metadata_security_credentials role_name
def fetch_metadata_security_credentials role_name response = get_aws_resource "#{@credential_verification_url}/#{role_name}", "credentials" MultiJson.load response.body end
def fetch_security_credentials
Retrieves the AWS security credentials required for signing AWS requests from either the AWS security
def fetch_security_credentials env_aws_access_key_id = ENV[CredentialsLoader::AWS_ACCESS_KEY_ID_VAR] env_aws_secret_access_key = ENV[CredentialsLoader::AWS_SECRET_ACCESS_KEY_VAR] # This is normally not available for permanent credentials. env_aws_session_token = ENV[CredentialsLoader::AWS_SESSION_TOKEN_VAR] if env_aws_access_key_id && env_aws_secret_access_key return { access_key_id: env_aws_access_key_id, secret_access_key: env_aws_secret_access_key, session_token: env_aws_session_token } end role_name = fetch_metadata_role_name credentials = fetch_metadata_security_credentials role_name { access_key_id: credentials["AccessKeyId"], secret_access_key: credentials["SecretAccessKey"], session_token: credentials["Token"] } end
def get_aws_resource url, name, data: nil, headers: {}
-
(Google::Auth::CredentialsError)- If the request fails
Returns:
-
(Faraday::Response)- The successful response
Parameters:
-
headers(Hash) -- Optional request headers -
data(Hash, nil) -- Optional data to send in POST requests -
name(String) -- Resource name for error messages -
url(String) -- The AWS endpoint URL
def get_aws_resource url, name, data: nil, headers: {} begin headers["x-aws-ec2-metadata-token"] = imdsv2_session_token response = if data headers["Content-Type"] = "application/json" connection.post url, data, headers else connection.get url, nil, headers end raise Faraday::Error unless response.success? response rescue Faraday::Error raise CredentialsError.with_details( "Failed to retrieve AWS #{name}.", credential_type_name: self.class.name, principal: principal ) end end
def imdsv2_session_token
-
(Google::Auth::CredentialsError)- If the token URL is missing or there's an error retrieving the token
Returns:
-
(String)- The IMDSv2 session token
def imdsv2_session_token return @imdsv2_session_token unless imdsv2_session_token_invalid? if @imdsv2_session_token_url.nil? raise CredentialsError.with_details( "IMDSV2 token url must be provided", credential_type_name: self.class.name, principal: principal ) end begin response = connection.put @imdsv2_session_token_url do |req| req.headers["x-aws-ec2-metadata-token-ttl-seconds"] = IMDSV2_TOKEN_EXPIRATION_IN_SECONDS.to_s end raise Faraday::Error unless response.success? rescue Faraday::Error => e raise CredentialsError.with_details( "Fetching AWS IMDSV2 token error: #{e}", credential_type_name: self.class.name, principal: principal ) end @imdsv2_session_token = response.body @imdsv2_session_token_expiry = Time.now + IMDSV2_TOKEN_EXPIRATION_IN_SECONDS @imdsv2_session_token end
def imdsv2_session_token_invalid?
def imdsv2_session_token_invalid? return true if @imdsv2_session_token.nil? @imdsv2_session_token_expiry.nil? || @imdsv2_session_token_expiry < Time.now end
def initialize options = {}
def initialize options = {} base_setup options @audience = options[:audience] @credential_source = options[:credential_source] || {} @environment_id = @credential_source[:environment_id] @region_url = @credential_source[:region_url] @credential_verification_url = @credential_source[:url] @regional_cred_verification_url = @credential_source[:regional_cred_verification_url] @imdsv2_session_token_url = @credential_source[:imdsv2_session_token_url] # These will be lazily loaded when needed, or will raise an error if not provided @region = nil @request_signer = nil @imdsv2_session_token = nil @imdsv2_session_token_expiry = nil end
def region
-
(Google::Auth::CredentialsError)- If the region is not set in the environment
Returns:
-
(String)- The name of the AWS region
def region @region = ENV[CredentialsLoader::AWS_REGION_VAR] || ENV[CredentialsLoader::AWS_DEFAULT_REGION_VAR] unless @region unless @region_url raise CredentialsError.with_details( "region_url or region must be set for external account credentials", credential_type_name: self.class.name, principal: principal ) end @region ||= get_aws_resource(@region_url, "region").body[0..-2] end @region end
def retrieve_subject_token!
-
(string)- The retrieved subject token.
def retrieve_subject_token! if @request_signer.nil? @region = region @request_signer = AwsRequestSigner.new @region end request = { method: "POST", url: @regional_cred_verification_url.sub("{region}", @region) } request_options = @request_signer.generate_signed_request fetch_security_credentials, request request_headers = request_options[:headers] request_headers["x-goog-cloud-target-resource"] = @audience aws_signed_request = { headers: [], method: request_options[:method], url: request_options[:url] } aws_signed_request[:headers] = request_headers.keys.sort.map do |key| { key: key, value: request_headers[key] } end uri_escape aws_signed_request.to_json end
def uri_escape string
def uri_escape string if string.nil? nil else CGI.escape(string.encode("UTF-8")).gsub("+", "%20").gsub("%7E", "~") end end