class Google::Auth::ExternalAccount::AwsCredentials

credentials for a short-lived Google Cloud access token.
by utilizing the AWS EC2 metadata service and then exchanging the
This module handles the retrieval of credentials from Google Cloud

def fetch_metadata_role_name

the AWS security credentials needed to sign requests to AWS APIs.
AWS metadata server security credentials endpoint in order to retrieve
workload by querying the AWS metadata server. This is needed for the
Retrieves the AWS role currently attached to the current AWS
def fetch_metadata_role_name
  unless @credential_verification_url
    raise "Unable to determine the AWS metadata server security credentials endpoint"
  end
  get_aws_resource(@credential_verification_url, "IAM Role").body
end

def fetch_metadata_security_credentials role_name

requests from the AWS metadata server.
Retrieves the AWS security credentials required for signing AWS
def fetch_metadata_security_credentials role_name
  response = get_aws_resource "#{@credential_verification_url}/#{role_name}", "credentials"
  MultiJson.load response.body
end

def fetch_security_credentials

or from the AWS metadata server.
requests from either the AWS security credentials environment variables
Retrieves the AWS security credentials required for signing AWS
def fetch_security_credentials
  env_aws_access_key_id = ENV[CredentialsLoader::AWS_ACCESS_KEY_ID_VAR]
  env_aws_secret_access_key = ENV[CredentialsLoader::AWS_SECRET_ACCESS_KEY_VAR]
  # This is normally not available for permanent credentials.
  env_aws_session_token = ENV[CredentialsLoader::AWS_SESSION_TOKEN_VAR]
  if env_aws_access_key_id && env_aws_secret_access_key
    return {
      access_key_id: env_aws_access_key_id,
      secret_access_key: env_aws_secret_access_key,
      session_token: env_aws_session_token
    }
  end
  role_name = fetch_metadata_role_name
  credentials = fetch_metadata_security_credentials role_name
  {
    access_key_id: credentials["AccessKeyId"],
    secret_access_key: credentials["SecretAccessKey"],
    session_token: credentials["Token"]
  }
end

def get_aws_resource url, name, data: nil, headers: {}

def get_aws_resource url, name, data: nil, headers: {}
  begin
    unless [nil, url].include? @imdsv2_session_token_url
      headers["x-aws-ec2-metadata-token"] = get_aws_resource(
        @imdsv2_session_token_url,
        "Session Token",
        headers: { "x-aws-ec2-metadata-token-ttl-seconds": "300" }
      ).body
    end
    response = if data
                 headers["Content-Type"] = "application/json"
                 connection.post url, data, headers
               else
                 connection.get url, nil, headers
               end
    raise Faraday::Error unless response.success?
    response
  rescue Faraday::Error
    raise "Failed to retrieve AWS #{name}."
  end
end

def initialize options = {}

def initialize options = {}
  base_setup options
  @audience = options[:audience]
  @credential_source = options[:credential_source] || {}
  @environment_id = @credential_source["environment_id"]
  @region_url = validate_metadata_server @credential_source["region_url"], "region_url"
  @credential_verification_url = validate_metadata_server @credential_source["url"], "url"
  @regional_cred_verification_url = @credential_source["regional_cred_verification_url"]
  @imdsv2_session_token_url = validate_metadata_server @credential_source["imdsv2_session_token_url"],
                                                       "imdsv2_session_token_url"
  # These will be lazily loaded when needed, or will raise an error if not provided
  @region = nil
  @request_signer = nil
end

def region

def region
  @region = ENV[CredentialsLoader::AWS_REGION_VAR] || ENV[CredentialsLoader::AWS_DEFAULT_REGION_VAR]
  unless @region
    raise "region_url or region must be set for external account credentials" unless @region_url
    @region ||= get_aws_resource(@region_url, "region").body[0..-2]
  end
  @region
end

def retrieve_subject_token!

Returns:
  • (string) - The retrieved subject token.
def retrieve_subject_token!
  if @request_signer.nil?
    @region = region
    @request_signer = AwsRequestSigner.new @region
  end
  request = {
    method: "POST",
    url: @regional_cred_verification_url.sub("{region}", @region)
  }
  request_options = @request_signer.generate_signed_request fetch_security_credentials, request
  request_headers = request_options[:headers]
  request_headers["x-goog-cloud-target-resource"] = @audience
  aws_signed_request = {
    headers: [],
    method: request_options[:method],
    url: request_options[:url]
  }
  aws_signed_request[:headers] = request_headers.keys.sort.map do |key|
    { key: key, value: request_headers[key] }
  end
  uri_escape aws_signed_request.to_json
end

def uri_escape string

def uri_escape string
  if string.nil?
    nil
  else
    CGI.escape(string.encode("UTF-8")).gsub("+", "%20").gsub("%7E", "~")
  end
end

def validate_metadata_server url, name

def validate_metadata_server url, name
  return nil if url.nil?
  host = URI(url).host
  raise "Invalid host #{host} for #{name}." unless ["169.254.169.254", "[fd00:ec2::254]"].include? host
  url
end