class StatelyDB::Common::Auth::AuthTokenProvider::Actor

This is designed to be used with Async::Actor and run on a dedicated thread.
Actor for managing the token refresh

def close

Returns:
  • (void) -
def close
  @scheduled&.stop
  @token_fetcher&.close
end

def get_token(force: false)

Returns:
  • (String) - The current access token

Parameters:
  • force (Boolean) -- Whether to force a refresh of the token
def get_token(force: false)
  if force
    @token_state = nil
  else
    token, ok = valid_access_token
    return token if ok
  end
  refresh_token.wait
end

def init

Returns:
  • (void) -
def init
  # disable the async lib logger. We do our own error handling and propagation
  Console.logger.disable(Async::Task)
  refresh_token
end

def initialize(endpoint:, access_key:, base_retry_backoff_secs:)

Parameters:
  • base_retry_backoff_secs (Float) -- The base retry backoff in seconds
  • access_key (String) -- The StatelyDB access key credential
  • endpoint (String) -- The endpoint of the OAuth server
def initialize(endpoint:, access_key:, base_retry_backoff_secs:)
  super()
  if access_key.nil?
    raise StatelyDB::Error.new(
      "Unable to find an access key in the STATELY_ACCESS_KEY " \
      "environment variable. Either pass your credentials in " \
      "the options when creating a client or set this environment variable.",
      code: GRPC::Core::StatusCodes::UNAUTHENTICATED,
      stately_code: "Unauthenticated"
    )
  end
  @token_fetcher = StatelyDB::Common::Auth::StatelyAccessTokenFetcher.new(
    endpoint: endpoint,
    access_key: access_key,
    base_retry_backoff_secs: base_retry_backoff_secs
  )
  @token_state = nil
  @pending_refresh = nil
end

def refresh_token

Returns:
  • (::Async::Task) - A task that will resolve to the new access token
def refresh_token
  Async do
    # we use an Async::Condition to dedupe multiple requests here
    # if the condition exists, we wait on it to complete
    # otherwise we create a condition, make the request, then signal the condition with the result
    # If there is an error then we signal that instead so we can raise it for the waiters.
    if @pending_refresh.nil?
      begin
        @pending_refresh = Async::Condition.new
        new_access_token = refresh_token_impl
        # now broadcast the new token to any waiters
        @pending_refresh.signal(new_access_token)
        new_access_token
      rescue StandardError => e
        @pending_refresh.signal(e)
        raise e
      ensure
        # delete the condition to restart the process
        @pending_refresh = nil
      end
    else
      res = @pending_refresh.wait
      # if the refresh result is an error, re-raise it.
      # otherwise return the token
      raise res if res.is_a?(StandardError)
      res
    end
  end
end

def refresh_token_impl

Returns:
  • (String) - The new access token
def refresh_token_impl
  Sync do
    token_result = @token_fetcher.fetch
    new_expires_in_secs = token_result.expires_in_secs
    new_expires_at_unix_secs = Time.now.to_i + new_expires_in_secs
    # only update the token state if the new expiry is later than the current one
    if @token_state.nil? || new_expires_at_unix_secs > @token_state.expires_at_unix_secs
      @token_state = TokenState.new(token: token_result.token, expires_at_unix_secs: new_expires_at_unix_secs)
    else
      # otherwise use the existing expiry time for scheduling the refresh
      new_expires_in_secs = @token_state.expires_at_unix_secs - Time.now.to_i
    end
    # Schedule a refresh of the token ahead of the expiry time
    # Calculate a random multiplier between 0.9 and 0.95 to to apply to the expiry
    # so that we refresh in the background ahead of expiration, but avoid
    # multiple processes hammering the service at the same time.
    jitter = (Random.rand * 0.05) + 0.9
    delay_secs = new_expires_in_secs * jitter
    # do this on the fiber scheduler (the root scheduler) to avoid infinite recursion
    @scheduled ||= Fiber.scheduler.async do
      # Kernel.sleep is non-blocking if Ruby 3.1+ and Async 2+
      # https://github.com/socketry/async/issues/305#issuecomment-1945188193
      sleep(delay_secs)
      refresh_token
      @scheduled = nil
    end
    @token_state.token
  end
end

def valid_access_token

Returns:
  • (Array) - The current access token and whether it is valid
def valid_access_token
  return "", false if @token_state.nil?
  return "", false if @token_state.expires_at_unix_secs < Time.now.to_i
  [@token_state.token, true]
end