class Google::Auth::ExternalAccount::AwsCredentials
then exchanging the credentials for a short-lived Google Cloud access token.
This module handles the retrieval of credentials from Google Cloud by utilizing the AWS EC2 metadata service and
def fetch_metadata_role_name
This is needed for the AWS metadata server security credentials endpoint in order to retrieve the AWS security
Retrieves the AWS role currently attached to the current AWS workload by querying the AWS metadata server.
def fetch_metadata_role_name unless @credential_verification_url raise "Unable to determine the AWS metadata server security credentials endpoint" end get_aws_resource(@credential_verification_url, "IAM Role").body end
def fetch_metadata_security_credentials role_name
def fetch_metadata_security_credentials role_name response = get_aws_resource "#{@credential_verification_url}/#{role_name}", "credentials" MultiJson.load response.body end
def fetch_security_credentials
Retrieves the AWS security credentials required for signing AWS requests from either the AWS security
def fetch_security_credentials env_aws_access_key_id = ENV[CredentialsLoader::AWS_ACCESS_KEY_ID_VAR] env_aws_secret_access_key = ENV[CredentialsLoader::AWS_SECRET_ACCESS_KEY_VAR] # This is normally not available for permanent credentials. env_aws_session_token = ENV[CredentialsLoader::AWS_SESSION_TOKEN_VAR] if env_aws_access_key_id && env_aws_secret_access_key return { access_key_id: env_aws_access_key_id, secret_access_key: env_aws_secret_access_key, session_token: env_aws_session_token } end role_name = fetch_metadata_role_name credentials = fetch_metadata_security_credentials role_name { access_key_id: credentials["AccessKeyId"], secret_access_key: credentials["SecretAccessKey"], session_token: credentials["Token"] } end
def get_aws_resource url, name, data: nil, headers: {}
def get_aws_resource url, name, data: nil, headers: {} begin headers["x-aws-ec2-metadata-token"] = imdsv2_session_token response = if data headers["Content-Type"] = "application/json" connection.post url, data, headers else connection.get url, nil, headers end raise Faraday::Error unless response.success? response rescue Faraday::Error raise "Failed to retrieve AWS #{name}." end end
def imdsv2_session_token
def imdsv2_session_token return @imdsv2_session_token unless imdsv2_session_token_invalid? raise "IMDSV2 token url must be provided" if @imdsv2_session_token_url.nil? begin response = connection.put @imdsv2_session_token_url do |req| req.headers["x-aws-ec2-metadata-token-ttl-seconds"] = IMDSV2_TOKEN_EXPIRATION_IN_SECONDS.to_s end rescue Faraday::Error => e raise "Fetching AWS IMDSV2 token error: #{e}" end raise Faraday::Error unless response.success? @imdsv2_session_token = response.body @imdsv2_session_token_expiry = Time.now + IMDSV2_TOKEN_EXPIRATION_IN_SECONDS @imdsv2_session_token end
def imdsv2_session_token_invalid?
def imdsv2_session_token_invalid? return true if @imdsv2_session_token.nil? @imdsv2_session_token_expiry.nil? || @imdsv2_session_token_expiry < Time.now end
def initialize options = {}
def initialize options = {} base_setup options @audience = options[:audience] @credential_source = options[:credential_source] || {} @environment_id = @credential_source[:environment_id] @region_url = @credential_source[:region_url] @credential_verification_url = @credential_source[:url] @regional_cred_verification_url = @credential_source[:regional_cred_verification_url] @imdsv2_session_token_url = @credential_source[:imdsv2_session_token_url] # These will be lazily loaded when needed, or will raise an error if not provided @region = nil @request_signer = nil @imdsv2_session_token = nil @imdsv2_session_token_expiry = nil end
def region
def region @region = ENV[CredentialsLoader::AWS_REGION_VAR] || ENV[CredentialsLoader::AWS_DEFAULT_REGION_VAR] unless @region raise "region_url or region must be set for external account credentials" unless @region_url @region ||= get_aws_resource(@region_url, "region").body[0..-2] end @region end
def retrieve_subject_token!
-
(string)- The retrieved subject token.
def retrieve_subject_token! if @request_signer.nil? @region = region @request_signer = AwsRequestSigner.new @region end request = { method: "POST", url: @regional_cred_verification_url.sub("{region}", @region) } request_options = @request_signer.generate_signed_request fetch_security_credentials, request request_headers = request_options[:headers] request_headers["x-goog-cloud-target-resource"] = @audience aws_signed_request = { headers: [], method: request_options[:method], url: request_options[:url] } aws_signed_request[:headers] = request_headers.keys.sort.map do |key| { key: key, value: request_headers[key] } end uri_escape aws_signed_request.to_json end
def uri_escape string
def uri_escape string if string.nil? nil else CGI.escape(string.encode("UTF-8")).gsub("+", "%20").gsub("%7E", "~") end end