class Google::Auth::ExternalAccount::AwsCredentials

then exchanging the credentials for a short-lived Google Cloud access token.
This module handles the retrieval of credentials from Google Cloud by utilizing the AWS EC2 metadata service and

def fetch_metadata_role_name

Raises:
  • (Google::Auth::CredentialsError) - If the credential verification URL is not set or if the request fails

Returns:
  • (String) - The AWS role name
def fetch_metadata_role_name
  unless @credential_verification_url
    raise CredentialsError.with_details(
      "Unable to determine the AWS metadata server security credentials endpoint",
      credential_type_name: self.class.name,
      principal: principal
    )
  end
  get_aws_resource(@credential_verification_url, "IAM Role").body
end

def fetch_metadata_security_credentials role_name

Retrieves the AWS security credentials required for signing AWS requests from the AWS metadata server.
def fetch_metadata_security_credentials role_name
  response = get_aws_resource "#{@credential_verification_url}/#{role_name}", "credentials"
  MultiJson.load response.body
end

def fetch_security_credentials

credentials environment variables or from the AWS metadata server.
Retrieves the AWS security credentials required for signing AWS requests from either the AWS security
def fetch_security_credentials
  env_aws_access_key_id = ENV[CredentialsLoader::AWS_ACCESS_KEY_ID_VAR]
  env_aws_secret_access_key = ENV[CredentialsLoader::AWS_SECRET_ACCESS_KEY_VAR]
  # This is normally not available for permanent credentials.
  env_aws_session_token = ENV[CredentialsLoader::AWS_SESSION_TOKEN_VAR]
  if env_aws_access_key_id && env_aws_secret_access_key
    return {
      access_key_id: env_aws_access_key_id,
      secret_access_key: env_aws_secret_access_key,
      session_token: env_aws_session_token
    }
  end
  role_name = fetch_metadata_role_name
  credentials = fetch_metadata_security_credentials role_name
  {
    access_key_id: credentials["AccessKeyId"],
    secret_access_key: credentials["SecretAccessKey"],
    session_token: credentials["Token"]
  }
end

def get_aws_resource url, name, data: nil, headers: {}

Raises:
  • (Google::Auth::CredentialsError) - If the request fails

Returns:
  • (Faraday::Response) - The successful response

Parameters:
  • headers (Hash) -- Optional request headers
  • data (Hash, nil) -- Optional data to send in POST requests
  • name (String) -- Resource name for error messages
  • url (String) -- The AWS endpoint URL
def get_aws_resource url, name, data: nil, headers: {}
  begin
    headers["x-aws-ec2-metadata-token"] = imdsv2_session_token
    response = if data
                 headers["Content-Type"] = "application/json"
                 connection.post url, data, headers
               else
                 connection.get url, nil, headers
               end
    raise Faraday::Error unless response.success?
    response
  rescue Faraday::Error
    raise CredentialsError.with_details(
      "Failed to retrieve AWS #{name}.",
      credential_type_name: self.class.name,
      principal: principal
    )
  end
end

def imdsv2_session_token

Raises:
  • (Google::Auth::CredentialsError) - If the token URL is missing or there's an error retrieving the token

Returns:
  • (String) - The IMDSv2 session token
def imdsv2_session_token
  return @imdsv2_session_token unless imdsv2_session_token_invalid?
  if @imdsv2_session_token_url.nil?
    raise CredentialsError.with_details(
      "IMDSV2 token url must be provided",
      credential_type_name: self.class.name,
      principal: principal
    )
  end
  begin
    response = connection.put @imdsv2_session_token_url do |req|
      req.headers["x-aws-ec2-metadata-token-ttl-seconds"] = IMDSV2_TOKEN_EXPIRATION_IN_SECONDS.to_s
    end
    raise Faraday::Error unless response.success?
  rescue Faraday::Error => e
    raise CredentialsError.with_details(
      "Fetching AWS IMDSV2 token error: #{e}",
      credential_type_name: self.class.name,
      principal: principal
    )
  end
  @imdsv2_session_token = response.body
  @imdsv2_session_token_expiry = Time.now + IMDSV2_TOKEN_EXPIRATION_IN_SECONDS
  @imdsv2_session_token
end

def imdsv2_session_token_invalid?

def imdsv2_session_token_invalid?
  return true if @imdsv2_session_token.nil?
  @imdsv2_session_token_expiry.nil? || @imdsv2_session_token_expiry < Time.now
end

def initialize options = {}

def initialize options = {}
  base_setup options
  @audience = options[:audience]
  @credential_source = options[:credential_source] || {}
  @environment_id = @credential_source[:environment_id]
  @region_url = @credential_source[:region_url]
  @credential_verification_url = @credential_source[:url]
  @regional_cred_verification_url = @credential_source[:regional_cred_verification_url]
  @imdsv2_session_token_url = @credential_source[:imdsv2_session_token_url]
  # These will be lazily loaded when needed, or will raise an error if not provided
  @region = nil
  @request_signer = nil
  @imdsv2_session_token = nil
  @imdsv2_session_token_expiry = nil
end

def region

Raises:
  • (Google::Auth::CredentialsError) - If the region is not set in the environment

Returns:
  • (String) - The name of the AWS region
def region
  @region = ENV[CredentialsLoader::AWS_REGION_VAR] || ENV[CredentialsLoader::AWS_DEFAULT_REGION_VAR]
  unless @region
    unless @region_url
      raise CredentialsError.with_details(
        "region_url or region must be set for external account credentials",
        credential_type_name: self.class.name,
        principal: principal
      )
    end
    @region ||= get_aws_resource(@region_url, "region").body[0..-2]
  end
  @region
end

def retrieve_subject_token!

Returns:
  • (string) - The retrieved subject token.
def retrieve_subject_token!
  if @request_signer.nil?
    @region = region
    @request_signer = AwsRequestSigner.new @region
  end
  request = {
    method: "POST",
    url: @regional_cred_verification_url.sub("{region}", @region)
  }
  request_options = @request_signer.generate_signed_request fetch_security_credentials, request
  request_headers = request_options[:headers]
  request_headers["x-goog-cloud-target-resource"] = @audience
  aws_signed_request = {
    headers: [],
    method: request_options[:method],
    url: request_options[:url]
  }
  aws_signed_request[:headers] = request_headers.keys.sort.map do |key|
    { key: key, value: request_headers[key] }
  end
  uri_escape aws_signed_request.to_json
end

def uri_escape string

def uri_escape string
  if string.nil?
    nil
  else
    CGI.escape(string.encode("UTF-8")).gsub("+", "%20").gsub("%7E", "~")
  end
end