class Google::Auth::ImpersonatedServiceAccountCredentials
The short-lived token and its expiration time are cached.
and then that claim is exchanged for a short-lived token at an IAMCredentials endpoint.
This is a two-step process: first authentication claim from the base credentials is created
Authenticates requests using impersonation from base credentials.
def self.make_creds options = {}
-
(Google::Auth::ImpersonatedServiceAccountCredentials)-
Options Hash:
(**options)-
:json_key_io(IO) -- The IO object that contains the credential configuration. -
:source_credentials(Object) -- The authenticated principal that will be used -
:scope(Array) -- The scope(s) for the short-lived impersonation token,, String -
:impersonation_url(String) -- The URL to impersonate the service account. -
:base_credentials(Object) -- The authenticated principal.
Parameters:
-
options(Hash) -- A hash of options to configure the credentials.
Other tags:
- Note: - Warning:
def self.make_creds options = {} if options[:json_key_io] make_creds_from_json options else new options end end
def self.make_creds_from_json options
- Private: -
def self.make_creds_from_json options json_key_io = options[:json_key_io] if options[:base_credentials] || options[:source_credentials] raise Google::Auth::InitializationError, "json_key_io is not compatible with base_credentials or source_credentials" end require "googleauth/default_credentials" impersonated_json = MultiJson.load json_key_io.read source_credentials_info = impersonated_json["source_credentials"] if source_credentials_info["type"] == CREDENTIAL_TYPE_NAME raise Google::Auth::InitializationError, "Source credentials can't be of type impersonated_service_account, " \ "use delegates to chain impersonation." end source_credentials = DefaultCredentials.make_creds( json_key_io: StringIO.new(MultiJson.dump(source_credentials_info)) ) impersonation_url = impersonated_json["service_account_impersonation_url"] scope = options[:scope] || impersonated_json["scopes"] new( source_credentials: source_credentials, impersonation_url: impersonation_url, scope: scope ) end
def deep_hash_normalize old_hash
def deep_hash_normalize old_hash sym_hash = {} old_hash&.each { |k, v| sym_hash[k.to_sym] = recursive_hash_normalize_keys v } sym_hash end
def duplicate options = {}
-
(Google::Auth::ImpersonatedServiceAccountCredentials)-
Parameters:
-
options(Hash) -- Overrides for the credentials parameters.
def duplicate options = {} options = deep_hash_normalize options options = { base_credentials: @base_credentials, source_credentials: @source_credentials, impersonation_url: @impersonation_url, scope: @scope }.merge(options) self.class.new options end
def expires_at= new_expires_at
Setter for the expires_at value that makes sure it is converted
def expires_at= new_expires_at @expires_at = normalize_timestamp new_expires_at end
def expires_within? seconds
-
(Boolean)- Whether the access token expires within the given time frame
Parameters:
-
seconds(Integer) -- The number of seconds to check against the token's expiration time.
def expires_within? seconds # This method is needed for BaseClient @expires_at && @expires_at - Time.now.utc < seconds end
def fetch_access_token! _options = {}
-
(String)- The newly generated impersonation access token.
Raises:
-
(Google::Auth::AuthorizationError)- For other unexpected response statuses. -
(Google::Auth::UnexpectedStatusError)- If the response status is 403 or 500.
Parameters:
-
_options(Hash) -- (optional) Additional options for token retrieval (currently unused).
Other tags:
- Private: -
def fetch_access_token! _options = {} auth_header = prepare_auth_header resp = make_impersonation_request auth_header case resp.status when 200 response = MultiJson.load resp.body self.expires_at = response["expireTime"] @access_token = response["accessToken"] access_token when 403, 500 handle_error_response resp, UnexpectedStatusError else handle_error_response resp, AuthorizationError end end
def handle_error_response resp, error_class
-
(StandardError)- The appropriate error with details
Parameters:
-
error_class(Class) -- The error class to instantiate -
resp(Faraday::Response) -- The HTTP response
Other tags:
- Private: -
def handle_error_response resp, error_class msg = "Unexpected error code #{resp.status}.\n #{resp.env.response_body} #{ERROR_SUFFIX}" raise error_class.with_details( msg, credential_type_name: self.class.name, principal: principal ) end
def initialize options = {}
-
(Google::Auth::ImpersonatedServiceAccountCredentials)-
Raises:
-
(ArgumentError)- If any of the required options are missing.
Options Hash:
(**options)-
:source_credentials(Object) -- The authenticated principal that will be used -
:scope(Array) -- The scope(s) for the short-lived impersonation token,, String -
:impersonation_url(String) -- The URL to impersonate the service account. -
:base_credentials(Object) -- The authenticated principal.
Parameters:
-
options(Hash) -- A hash of options to configure the credentials.
def initialize options = {} @base_credentials, @impersonation_url, @scope = options.values_at :base_credentials, :impersonation_url, :scope # Fail-fast checks for required parameters if @base_credentials.nil? && !options.key?(:source_credentials) raise ArgumentError, "Missing required option: either :base_credentials or :source_credentials" end raise ArgumentError, "Missing required option: :impersonation_url" if @impersonation_url.nil? raise ArgumentError, "Missing required option: :scope" if @scope.nil? # Some credentials (all Signet-based ones and this one) include scope and a bunch of transient state # (e.g. refresh status) as part of themselves # so a copy needs to be created with the scope overriden and transient state dropped. # # If a credentials does not support `duplicate` we'll try to use it as is assuming it has a broad enough scope. # This might result in an "access denied" error downstream when the token from that credentials is being used # for the token exchange. @source_credentials = if options.key? :source_credentials options[:source_credentials] elsif @base_credentials.respond_to? :duplicate @base_credentials.duplicate({ scope: IAM_SCOPE }) else @base_credentials end end
def logger
-
(Logger, nil)- The logger of the credentials.
def logger @source_credentials.logger if source_credentials.respond_to? :logger end
def make_impersonation_request auth_header
-
(Faraday::Response)- The HTTP response from the impersonation endpoint
Parameters:
-
auth_header(Hash) -- The authorization header containing the source token
Other tags:
- Private: -
def make_impersonation_request auth_header connection.post @impersonation_url do |req| req.headers.merge! auth_header req.headers["Content-Type"] = "application/json" req.body = MultiJson.dump({ scope: @scope }) end end
def normalize_timestamp time
-
(Google::Auth::CredentialsError)- If the input is not a Time, String, or nil.
Returns:
-
(Time, nil)- The normalized Time object, or nil if the input is nil.
Parameters:
-
time(Time, String, nil) -- The timestamp to normalize.
def normalize_timestamp time case time when NilClass nil when Time time when String Time.parse time else message = "Invalid time value #{time}" raise CredentialsError.with_details(message, credential_type_name: self.class.name, principal: principal) end end
def prepare_auth_header
-
(Hash)- The authorization header with the source credentials' token
Other tags:
- Private: -
def prepare_auth_header auth_header = {} @source_credentials.updater_proc.call auth_header auth_header end
def principal
-
(String, Symbol)- The string representation of the principal,
Other tags:
- Private: -
def principal if @source_credentials.respond_to? :principal @source_credentials.principal else :unknown end end
def recursive_hash_normalize_keys val
def recursive_hash_normalize_keys val if val.is_a? Hash deep_hash_normalize val else val end end
def token_type
Returns the type of token (access_token).
def token_type :access_token end
def universe_domain
-
(String)- The universe domain of the credentials.
def universe_domain @source_credentials.universe_domain end