class RubyLLM::MCP::Auth::TokenManager

Handles token exchange, refresh, and client credentials flows
Service for managing OAuth token operations

def add_client_secret_if_needed(params, client_info)

Parameters:
  • client_info (ClientInfo) -- client info
  • params (Hash) -- token request parameters
def add_client_secret_if_needed(params, client_info)
  return unless client_info.client_secret
  return unless client_info.metadata.token_endpoint_auth_method == "client_secret_post"
  params[:client_secret] = client_info.client_secret
end

def build_auth_code_params(client_info, code, pkce, redirect_uri, server_url)

Returns:
  • (Hash) - token exchange parameters

Parameters:
  • server_url (String) -- MCP server URL
  • redirect_uri (String) -- redirect URI
  • pkce (PKCE) -- PKCE parameters
  • code (String) -- authorization code
  • client_info (ClientInfo) -- client info
def build_auth_code_params(client_info, code, pkce, redirect_uri, server_url)
  params = {
    grant_type: "authorization_code",
    code: code,
    redirect_uri: redirect_uri,
    client_id: client_info.client_id,
    code_verifier: pkce.code_verifier,
    resource: server_url
  }
  add_client_secret_if_needed(params, client_info)
  params
end

def build_refresh_params(client_info, token, server_url)

Returns:
  • (Hash) - refresh parameters

Parameters:
  • server_url (String) -- MCP server URL
  • token (Token) -- current token
  • client_info (ClientInfo) -- client info
def build_refresh_params(client_info, token, server_url)
  params = {
    grant_type: "refresh_token",
    refresh_token: token.refresh_token,
    client_id: client_info.client_id,
    resource: server_url
  }
  add_client_secret_if_needed(params, client_info)
  params
end

def exchange_authorization_code(server_metadata, client_info, code, pkce, server_url)

Returns:
  • (Token) - access token

Parameters:
  • server_url (String) -- MCP server URL
  • pkce (PKCE) -- PKCE parameters
  • code (String) -- authorization code
  • client_info (ClientInfo) -- client info
  • server_metadata (ServerMetadata) -- server metadata
def exchange_authorization_code(server_metadata, client_info, code, pkce, server_url)
  logger.debug("Exchanging authorization code for access token")
  registered_redirect_uri = client_info.metadata.redirect_uris.first
  params = build_auth_code_params(client_info, code, pkce, registered_redirect_uri, server_url)
  response = post_token_exchange(server_metadata, params)
  response = retry_if_redirect_mismatch(response, server_metadata, params, registered_redirect_uri)
  validate_token_response!(response, "Token exchange")
  parse_token_response(response)
end

def exchange_client_credentials(server_metadata, client_info, scope, server_url)

Returns:
  • (Token) - access token

Parameters:
  • server_url (String) -- MCP server URL
  • scope (String, nil) -- requested scope
  • client_info (ClientInfo) -- client info with secret
  • server_metadata (ServerMetadata) -- server metadata
def exchange_client_credentials(server_metadata, client_info, scope, server_url)
  logger.debug("Exchanging client credentials for access token")
  params = {
    grant_type: "client_credentials",
    client_id: client_info.client_id,
    client_secret: client_info.client_secret,
    scope: scope,
    resource: server_url
  }.compact
  response = post_token_exchange(server_metadata, params)
  validate_token_response!(response, "Token exchange")
  parse_token_response(response)
end

def extract_oauth_error(source)

Returns:
  • (Hash, nil) - OAuth error fields or nil

Parameters:
  • source (String, Hash) -- response body string or parsed JSON hash
def extract_oauth_error(source)
  data = source.is_a?(Hash) ? source : JSON.parse(source)
  error = data["error"] || data[:error]
  return nil unless error
  {
    error: error,
    error_description: data["error_description"] || data[:error_description],
    error_uri: data["error_uri"] || data[:error_uri]
  }
rescue JSON::ParserError
  nil
end

def initialize(http_client, logger)

def initialize(http_client, logger)
  @http_client = http_client
  @logger = logger
end

def parse_refresh_response(response, old_token)

Returns:
  • (Token) - new token

Parameters:
  • old_token (Token) -- previous token
  • response (HTTPX::Response) -- HTTP response
def parse_refresh_response(response, old_token)
  data = JSON.parse(response.body.to_s)
  raise_oauth_error!("Token refresh", extract_oauth_error(data), response.status)
  access_token = data["access_token"]
  if access_token.nil? || access_token.empty?
    raise Errors::TransportError.new(
      message: "Token refresh failed: invalid token response (missing access_token)",
      code: response.status
    )
  end
  Token.new(
    access_token: access_token,
    token_type: data["token_type"] || "Bearer",
    expires_in: data["expires_in"],
    scope: data["scope"],
    refresh_token: data["refresh_token"] || old_token.refresh_token
  )
end

def parse_token_response(response)

Returns:
  • (Token) - parsed token

Parameters:
  • response (HTTPX::Response) -- HTTP response
def parse_token_response(response)
  data = JSON.parse(response.body.to_s)
  raise_oauth_error!("Token exchange", extract_oauth_error(data), response.status)
  access_token = data["access_token"]
  if access_token.nil? || access_token.empty?
    raise Errors::TransportError.new(
      message: "Token exchange failed: invalid token response (missing access_token)",
      code: response.status
    )
  end
  Token.new(
    access_token: access_token,
    token_type: data["token_type"] || "Bearer",
    expires_in: data["expires_in"],
    scope: data["scope"],
    refresh_token: data["refresh_token"]
  )
end

def post_token_exchange(server_metadata, params)

Returns:
  • (HTTPX::Response) - HTTP response

Parameters:
  • params (Hash) -- form parameters
  • server_metadata (ServerMetadata) -- server metadata
def post_token_exchange(server_metadata, params)
  http_client.post(
    server_metadata.token_endpoint,
    headers: { "Content-Type" => "application/x-www-form-urlencoded" },
    form: params
  )
end

def post_token_refresh(server_metadata, params)

Returns:
  • (HTTPX::Response) - HTTP response

Parameters:
  • params (Hash) -- form parameters
  • server_metadata (ServerMetadata) -- server metadata
def post_token_refresh(server_metadata, params)
  response = http_client.post(
    server_metadata.token_endpoint,
    headers: { "Content-Type" => "application/x-www-form-urlencoded" },
    form: params
  )
  if response.is_a?(HTTPX::ErrorResponse)
    logger.warn("Token refresh failed: #{response.error&.message || 'Request failed'}")
  elsif response.status != 200
    logger.warn("Token refresh failed: HTTP #{response.status}")
  end
  response
end

def raise_oauth_error!(context, oauth_error, status_code)

Raises:
  • (Errors::TransportError) - when oauth_error is present

Parameters:
  • status_code (Integer, nil) -- HTTP response status code
  • oauth_error (Hash, nil) -- OAuth error fields
  • context (String) -- context for the error
def raise_oauth_error!(context, oauth_error, status_code)
  return unless oauth_error
  error = oauth_error[:error]
  description = oauth_error[:error_description]
  error_uri = oauth_error[:error_uri]
  message = "#{context} failed: OAuth error '#{error}'"
  message += ": #{description}" if description
  message += " (#{error_uri})" if error_uri
  raise Errors::TransportError.new(
    message: message,
    code: status_code,
    error: error
  )
end

def refresh_token(server_metadata, client_info, token, server_url)

Returns:
  • (Token, nil) - new token or nil if refresh failed

Parameters:
  • server_url (String) -- MCP server URL
  • token (Token) -- current token with refresh_token
  • client_info (ClientInfo) -- client info
  • server_metadata (ServerMetadata) -- server metadata
def refresh_token(server_metadata, client_info, token, server_url)
  return nil unless token.refresh_token
  logger.debug("Refreshing access token")
  params = build_refresh_params(client_info, token, server_url)
  response = post_token_refresh(server_metadata, params)
  # Return nil on error responses
  return nil if response.is_a?(HTTPX::ErrorResponse)
  if response.status != 200
    oauth_error = extract_oauth_error(response.body.to_s)
    raise_oauth_error!("Token refresh", oauth_error, response.status) if oauth_error
    return nil
  end
  parse_refresh_response(response, token)
rescue Errors::TransportError => e
  logger.warn(e.message)
  nil
rescue JSON::ParserError => e
  logger.warn("Invalid token refresh response: #{e.message}")
  nil
rescue HTTPX::Error => e
  logger.warn("Network error during token refresh: #{e.message}")
  nil
end

def retry_if_redirect_mismatch(response, server_metadata, params, registered_redirect_uri)

Returns:
  • (HTTPX::Response) - response (possibly retried)

Parameters:
  • registered_redirect_uri (String) -- registered redirect URI
  • params (Hash) -- exchange parameters
  • server_metadata (ServerMetadata) -- server metadata
  • response (HTTPX::Response) -- initial response
def retry_if_redirect_mismatch(response, server_metadata, params, registered_redirect_uri)
  # Don't retry on error responses
  return response if response.is_a?(HTTPX::ErrorResponse)
  return response if response.status == 200
  redirect_hint = HttpResponseHandler.extract_redirect_mismatch(response.body.to_s)
  return response unless redirect_hint
  return response if redirect_hint[:expected] == registered_redirect_uri
  logger.warn("Redirect URI mismatch, retrying with: #{redirect_hint[:expected]}")
  params[:redirect_uri] = redirect_hint[:expected]
  post_token_exchange(server_metadata, params)
end

def validate_token_response!(response, context)

Raises:
  • (Errors::TransportError) - if response is invalid

Parameters:
  • context (String) -- context for error messages
  • response (HTTPX::Response, HTTPX::ErrorResponse) -- HTTP response
def validate_token_response!(response, context)
  # Handle HTTPX ErrorResponse
  if response.is_a?(HTTPX::ErrorResponse)
    error_message = response.error&.message || "Request failed"
    raise Errors::TransportError.new(message: "#{context} failed: #{error_message}")
  end
  oauth_error = extract_oauth_error(response.body.to_s)
  raise_oauth_error!(context, oauth_error, response.status) if oauth_error
  return if response.status == 200
  raise Errors::TransportError.new(
    message: "#{context} failed: HTTP #{response.status}",
    code: response.status
  )
end