class Google::Auth::ExternalAccount::AwsCredentials

then exchanging the credentials for a short-lived Google Cloud access token.
This module handles the retrieval of credentials from Google Cloud by utilizing the AWS EC2 metadata service and

def fetch_metadata_role_name

credentials needed to sign requests to AWS APIs.
This is needed for the AWS metadata server security credentials endpoint in order to retrieve the AWS security
Retrieves the AWS role currently attached to the current AWS workload by querying the AWS metadata server.
def fetch_metadata_role_name
  unless @credential_verification_url
    raise "Unable to determine the AWS metadata server security credentials endpoint"
  end
  get_aws_resource(@credential_verification_url, "IAM Role").body
end

def fetch_metadata_security_credentials role_name

Retrieves the AWS security credentials required for signing AWS requests from the AWS metadata server.
def fetch_metadata_security_credentials role_name
  response = get_aws_resource "#{@credential_verification_url}/#{role_name}", "credentials"
  MultiJson.load response.body
end

def fetch_security_credentials

credentials environment variables or from the AWS metadata server.
Retrieves the AWS security credentials required for signing AWS requests from either the AWS security
def fetch_security_credentials
  env_aws_access_key_id = ENV[CredentialsLoader::AWS_ACCESS_KEY_ID_VAR]
  env_aws_secret_access_key = ENV[CredentialsLoader::AWS_SECRET_ACCESS_KEY_VAR]
  # This is normally not available for permanent credentials.
  env_aws_session_token = ENV[CredentialsLoader::AWS_SESSION_TOKEN_VAR]
  if env_aws_access_key_id && env_aws_secret_access_key
    return {
      access_key_id: env_aws_access_key_id,
      secret_access_key: env_aws_secret_access_key,
      session_token: env_aws_session_token
    }
  end
  role_name = fetch_metadata_role_name
  credentials = fetch_metadata_security_credentials role_name
  {
    access_key_id: credentials["AccessKeyId"],
    secret_access_key: credentials["SecretAccessKey"],
    session_token: credentials["Token"]
  }
end

def get_aws_resource url, name, data: nil, headers: {}

def get_aws_resource url, name, data: nil, headers: {}
  begin
    headers["x-aws-ec2-metadata-token"] = imdsv2_session_token
    response = if data
                 headers["Content-Type"] = "application/json"
                 connection.post url, data, headers
               else
                 connection.get url, nil, headers
               end
    raise Faraday::Error unless response.success?
    response
  rescue Faraday::Error
    raise "Failed to retrieve AWS #{name}."
  end
end

def imdsv2_session_token

def imdsv2_session_token
  return @imdsv2_session_token unless imdsv2_session_token_invalid?
  raise "IMDSV2 token url must be provided" if @imdsv2_session_token_url.nil?
  begin
    response = connection.put @imdsv2_session_token_url do |req|
      req.headers["x-aws-ec2-metadata-token-ttl-seconds"] = IMDSV2_TOKEN_EXPIRATION_IN_SECONDS.to_s
    end
  rescue Faraday::Error => e
    raise "Fetching AWS IMDSV2 token error: #{e}"
  end
  raise Faraday::Error unless response.success?
  @imdsv2_session_token = response.body
  @imdsv2_session_token_expiry = Time.now + IMDSV2_TOKEN_EXPIRATION_IN_SECONDS
  @imdsv2_session_token
end

def imdsv2_session_token_invalid?

def imdsv2_session_token_invalid?
  return true if @imdsv2_session_token.nil?
  @imdsv2_session_token_expiry.nil? || @imdsv2_session_token_expiry < Time.now
end

def initialize options = {}

def initialize options = {}
  base_setup options
  @audience = options[:audience]
  @credential_source = options[:credential_source] || {}
  @environment_id = @credential_source[:environment_id]
  @region_url = @credential_source[:region_url]
  @credential_verification_url = @credential_source[:url]
  @regional_cred_verification_url = @credential_source[:regional_cred_verification_url]
  @imdsv2_session_token_url = @credential_source[:imdsv2_session_token_url]
  # These will be lazily loaded when needed, or will raise an error if not provided
  @region = nil
  @request_signer = nil
  @imdsv2_session_token = nil
  @imdsv2_session_token_expiry = nil
end

def region

def region
  @region = ENV[CredentialsLoader::AWS_REGION_VAR] || ENV[CredentialsLoader::AWS_DEFAULT_REGION_VAR]
  unless @region
    raise "region_url or region must be set for external account credentials" unless @region_url
    @region ||= get_aws_resource(@region_url, "region").body[0..-2]
  end
  @region
end

def retrieve_subject_token!

Returns:
  • (string) - The retrieved subject token.
def retrieve_subject_token!
  if @request_signer.nil?
    @region = region
    @request_signer = AwsRequestSigner.new @region
  end
  request = {
    method: "POST",
    url: @regional_cred_verification_url.sub("{region}", @region)
  }
  request_options = @request_signer.generate_signed_request fetch_security_credentials, request
  request_headers = request_options[:headers]
  request_headers["x-goog-cloud-target-resource"] = @audience
  aws_signed_request = {
    headers: [],
    method: request_options[:method],
    url: request_options[:url]
  }
  aws_signed_request[:headers] = request_headers.keys.sort.map do |key|
    { key: key, value: request_headers[key] }
  end
  uri_escape aws_signed_request.to_json
end

def uri_escape string

def uri_escape string
  if string.nil?
    nil
  else
    CGI.escape(string.encode("UTF-8")).gsub("+", "%20").gsub("%7E", "~")
  end
end