class Google::Auth::ImpersonatedServiceAccountCredentials

The short-lived token and its expiration time are cached.
and then that claim is exchanged for a short-lived token at an IAMCredentials endpoint.
This is a two-step process: first authentication claim from the base credentials is created
Authenticates requests using impersonation from base credentials.

def self.make_creds options = {}

Returns:
  • (Google::Auth::ImpersonatedServiceAccountCredentials) -

Options Hash: (**options)
  • :source_credentials (Object) -- The authenticated principal that will be used
  • :scope (Array, String) -- The scope(s) for the short-lived impersonation token,
  • :impersonation_url (String) -- The URL to impersonate the service account.
  • :base_credentials (Object) -- The authenticated principal.

Parameters:
  • options (Hash) -- A hash of options to configure the credentials.
def self.make_creds options = {}
  new options
end

def deep_hash_normalize old_hash

def deep_hash_normalize old_hash
  sym_hash = {}
  old_hash&.each { |k, v| sym_hash[k.to_sym] = recursive_hash_normalize_keys v }
  sym_hash
end

def duplicate options = {}

Returns:
  • (Google::Auth::ImpersonatedServiceAccountCredentials) -

Parameters:
  • options (Hash) -- Overrides for the credentials parameters.
def duplicate options = {}
  options = deep_hash_normalize options
  options = {
    base_credentials: @base_credentials,
    source_credentials: @source_credentials,
    impersonation_url: @impersonation_url,
    scope: @scope
  }.merge(options)
  self.class.new options
end

def expires_at= new_expires_at

to Time object.
Setter for the expires_at value that makes sure it is converted
def expires_at= new_expires_at
  @expires_at = normalize_timestamp new_expires_at
end

def expires_within? seconds

Returns:
  • (Boolean) - Whether the access token expires within the given time frame

Parameters:
  • seconds (Integer) -- The number of seconds to check against the token's expiration time.
def expires_within? seconds
  # This method is needed for BaseClient
  @expires_at && @expires_at - Time.now.utc < seconds
end

def fetch_access_token! _options = {}

Returns:
  • (String) - The newly generated impersonation access token.

Raises:
  • (Google::Auth::AuthorizationError) - For other unexpected response statuses.
  • (Google::Auth::UnexpectedStatusError) - If the response status is 403 or 500.

Parameters:
  • _options (Hash) -- (optional) Additional options for token retrieval (currently unused).

Other tags:
    Private: -
def fetch_access_token! _options = {}
  auth_header = prepare_auth_header
  resp = make_impersonation_request auth_header
  case resp.status
  when 200
    response = MultiJson.load resp.body
    self.expires_at = response["expireTime"]
    @access_token = response["accessToken"]
    access_token
  when 403, 500
    handle_error_response resp, UnexpectedStatusError
  else
    handle_error_response resp, AuthorizationError
  end
end

def handle_error_response resp, error_class

Raises:
  • (StandardError) - The appropriate error with details

Parameters:
  • error_class (Class) -- The error class to instantiate
  • resp (Faraday::Response) -- The HTTP response

Other tags:
    Private: -
def handle_error_response resp, error_class
  msg = "Unexpected error code #{resp.status}.\n #{resp.env.response_body} #{ERROR_SUFFIX}"
  raise error_class.with_details(
    msg,
    credential_type_name: self.class.name,
    principal: principal
  )
end

def initialize options = {}

Returns:
  • (Google::Auth::ImpersonatedServiceAccountCredentials) -

Raises:
  • (ArgumentError) - If any of the required options are missing.

Options Hash: (**options)
  • :source_credentials (Object) -- The authenticated principal that will be used
  • :scope (Array, String) -- The scope(s) for the short-lived impersonation token,
  • :impersonation_url (String) -- The URL to impersonate the service account.
  • :base_credentials (Object) -- The authenticated principal.

Parameters:
  • options (Hash) -- A hash of options to configure the credentials.
def initialize options = {}
  @base_credentials, @impersonation_url, @scope =
    options.values_at :base_credentials,
                      :impersonation_url,
                      :scope
  # Fail-fast checks for required parameters
  if @base_credentials.nil? && !options.key?(:source_credentials)
    raise ArgumentError, "Missing required option: either :base_credentials or :source_credentials"
  end
  raise ArgumentError, "Missing required option: :impersonation_url" if @impersonation_url.nil?
  raise ArgumentError, "Missing required option: :scope" if @scope.nil?
  # Some credentials (all Signet-based ones and this one) include scope and a bunch of transient state
  # (e.g. refresh status) as part of themselves
  # so a copy needs to be created with the scope overriden and transient state dropped.
  #
  # If a credentials does not support `duplicate` we'll try to use it as is assuming it has a broad enough scope.
  # This might result in an "access denied" error downstream when the token from that credentials is being used
  # for the token exchange.
  @source_credentials = if options.key? :source_credentials
                          options[:source_credentials]
                        elsif @base_credentials.respond_to? :duplicate
                          @base_credentials.duplicate({
                                                        scope: IAM_SCOPE
                                                      })
                        else
                          @base_credentials
                        end
end

def logger

Returns:
  • (Logger, nil) - The logger of the credentials.
def logger
  @source_credentials.logger if source_credentials.respond_to? :logger
end

def make_impersonation_request auth_header

Returns:
  • (Faraday::Response) - The HTTP response from the impersonation endpoint

Parameters:
  • auth_header (Hash) -- The authorization header containing the source token

Other tags:
    Private: -
def make_impersonation_request auth_header
  connection.post @impersonation_url do |req|
    req.headers.merge! auth_header
    req.headers["Content-Type"] = "application/json"
    req.body = MultiJson.dump({ scope: @scope })
  end
end

def normalize_timestamp time

Raises:
  • (Google::Auth::CredentialsError) - If the input is not a Time, String, or nil.

Returns:
  • (Time, nil) - The normalized Time object, or nil if the input is nil.

Parameters:
  • time (Time, String, nil) -- The timestamp to normalize.
def normalize_timestamp time
  case time
  when NilClass
    nil
  when Time
    time
  when String
    Time.parse time
  else
    message = "Invalid time value #{time}"
    raise CredentialsError.with_details(message, credential_type_name: self.class.name, principal: principal)
  end
end

def prepare_auth_header

Returns:
  • (Hash) - The authorization header with the source credentials' token

Other tags:
    Private: -
def prepare_auth_header
  auth_header = {}
  @source_credentials.updater_proc.call auth_header
  auth_header
end

def principal

Returns:
  • (String, Symbol) - The string representation of the principal,

Other tags:
    Private: -
def principal
  if @source_credentials.respond_to? :principal
    @source_credentials.principal
  else
    :unknown
  end
end

def recursive_hash_normalize_keys val

Convert all keys in this hash (nested) to symbols for uniform retrieval
def recursive_hash_normalize_keys val
  if val.is_a? Hash
    deep_hash_normalize val
  else
    val
  end
end

def token_type

This method is needed for BaseClient.
Returns the type of token (access_token).
def token_type
  :access_token
end

def universe_domain

Returns:
  • (String) - The universe domain of the credentials.
def universe_domain
  @source_credentials.universe_domain
end