class RubyLLM::MCP::Auth::TokenManager
Handles token exchange, refresh, and client credentials flows
Service for managing OAuth token operations
def add_client_secret_if_needed(params, client_info)
-
client_info(ClientInfo) -- client info -
params(Hash) -- token request parameters
def add_client_secret_if_needed(params, client_info) return unless client_info.client_secret return unless client_info.metadata.token_endpoint_auth_method == "client_secret_post" params[:client_secret] = client_info.client_secret end
def build_auth_code_params(client_info, code, pkce, redirect_uri, server_url)
-
(Hash)- token exchange parameters
Parameters:
-
server_url(String) -- MCP server URL -
redirect_uri(String) -- redirect URI -
pkce(PKCE) -- PKCE parameters -
code(String) -- authorization code -
client_info(ClientInfo) -- client info
def build_auth_code_params(client_info, code, pkce, redirect_uri, server_url) params = { grant_type: "authorization_code", code: code, redirect_uri: redirect_uri, client_id: client_info.client_id, code_verifier: pkce.code_verifier, resource: server_url } add_client_secret_if_needed(params, client_info) params end
def build_refresh_params(client_info, token, server_url)
-
(Hash)- refresh parameters
Parameters:
-
server_url(String) -- MCP server URL -
token(Token) -- current token -
client_info(ClientInfo) -- client info
def build_refresh_params(client_info, token, server_url) params = { grant_type: "refresh_token", refresh_token: token.refresh_token, client_id: client_info.client_id, resource: server_url } add_client_secret_if_needed(params, client_info) params end
def exchange_authorization_code(server_metadata, client_info, code, pkce, server_url)
-
(Token)- access token
Parameters:
-
server_url(String) -- MCP server URL -
pkce(PKCE) -- PKCE parameters -
code(String) -- authorization code -
client_info(ClientInfo) -- client info -
server_metadata(ServerMetadata) -- server metadata
def exchange_authorization_code(server_metadata, client_info, code, pkce, server_url) logger.debug("Exchanging authorization code for access token") registered_redirect_uri = client_info.metadata.redirect_uris.first params = build_auth_code_params(client_info, code, pkce, registered_redirect_uri, server_url) response = post_token_exchange(server_metadata, params) response = retry_if_redirect_mismatch(response, server_metadata, params, registered_redirect_uri) validate_token_response!(response, "Token exchange") parse_token_response(response) end
def exchange_client_credentials(server_metadata, client_info, scope, server_url)
-
(Token)- access token
Parameters:
-
server_url(String) -- MCP server URL -
scope(String, nil) -- requested scope -
client_info(ClientInfo) -- client info with secret -
server_metadata(ServerMetadata) -- server metadata
def exchange_client_credentials(server_metadata, client_info, scope, server_url) logger.debug("Exchanging client credentials for access token") params = { grant_type: "client_credentials", client_id: client_info.client_id, client_secret: client_info.client_secret, scope: scope, resource: server_url }.compact response = post_token_exchange(server_metadata, params) validate_token_response!(response, "Token exchange") parse_token_response(response) end
def extract_oauth_error(source)
-
(Hash, nil)- OAuth error fields or nil
Parameters:
-
source(String, Hash) -- response body string or parsed JSON hash
def extract_oauth_error(source) data = source.is_a?(Hash) ? source : JSON.parse(source) error = data["error"] || data[:error] return nil unless error { error: error, error_description: data["error_description"] || data[:error_description], error_uri: data["error_uri"] || data[:error_uri] } rescue JSON::ParserError nil end
def initialize(http_client, logger)
def initialize(http_client, logger) @http_client = http_client @logger = logger end
def parse_refresh_response(response, old_token)
-
(Token)- new token
Parameters:
-
old_token(Token) -- previous token -
response(HTTPX::Response) -- HTTP response
def parse_refresh_response(response, old_token) data = JSON.parse(response.body.to_s) raise_oauth_error!("Token refresh", extract_oauth_error(data), response.status) access_token = data["access_token"] if access_token.nil? || access_token.empty? raise Errors::TransportError.new( message: "Token refresh failed: invalid token response (missing access_token)", code: response.status ) end Token.new( access_token: access_token, token_type: data["token_type"] || "Bearer", expires_in: data["expires_in"], scope: data["scope"], refresh_token: data["refresh_token"] || old_token.refresh_token ) end
def parse_token_response(response)
-
(Token)- parsed token
Parameters:
-
response(HTTPX::Response) -- HTTP response
def parse_token_response(response) data = JSON.parse(response.body.to_s) raise_oauth_error!("Token exchange", extract_oauth_error(data), response.status) access_token = data["access_token"] if access_token.nil? || access_token.empty? raise Errors::TransportError.new( message: "Token exchange failed: invalid token response (missing access_token)", code: response.status ) end Token.new( access_token: access_token, token_type: data["token_type"] || "Bearer", expires_in: data["expires_in"], scope: data["scope"], refresh_token: data["refresh_token"] ) end
def post_token_exchange(server_metadata, params)
-
(HTTPX::Response)- HTTP response
Parameters:
-
params(Hash) -- form parameters -
server_metadata(ServerMetadata) -- server metadata
def post_token_exchange(server_metadata, params) http_client.post( server_metadata.token_endpoint, headers: { "Content-Type" => "application/x-www-form-urlencoded" }, form: params ) end
def post_token_refresh(server_metadata, params)
-
(HTTPX::Response)- HTTP response
Parameters:
-
params(Hash) -- form parameters -
server_metadata(ServerMetadata) -- server metadata
def post_token_refresh(server_metadata, params) response = http_client.post( server_metadata.token_endpoint, headers: { "Content-Type" => "application/x-www-form-urlencoded" }, form: params ) if response.is_a?(HTTPX::ErrorResponse) logger.warn("Token refresh failed: #{response.error&.message || 'Request failed'}") elsif response.status != 200 logger.warn("Token refresh failed: HTTP #{response.status}") end response end
def raise_oauth_error!(context, oauth_error, status_code)
-
(Errors::TransportError)- when oauth_error is present
Parameters:
-
status_code(Integer, nil) -- HTTP response status code -
oauth_error(Hash, nil) -- OAuth error fields -
context(String) -- context for the error
def raise_oauth_error!(context, oauth_error, status_code) return unless oauth_error error = oauth_error[:error] description = oauth_error[:error_description] error_uri = oauth_error[:error_uri] message = "#{context} failed: OAuth error '#{error}'" message += ": #{description}" if description message += " (#{error_uri})" if error_uri raise Errors::TransportError.new( message: message, code: status_code, error: error ) end
def refresh_token(server_metadata, client_info, token, server_url)
-
(Token, nil)- new token or nil if refresh failed
Parameters:
-
server_url(String) -- MCP server URL -
token(Token) -- current token with refresh_token -
client_info(ClientInfo) -- client info -
server_metadata(ServerMetadata) -- server metadata
def refresh_token(server_metadata, client_info, token, server_url) return nil unless token.refresh_token logger.debug("Refreshing access token") params = build_refresh_params(client_info, token, server_url) response = post_token_refresh(server_metadata, params) # Return nil on error responses return nil if response.is_a?(HTTPX::ErrorResponse) if response.status != 200 oauth_error = extract_oauth_error(response.body.to_s) raise_oauth_error!("Token refresh", oauth_error, response.status) if oauth_error return nil end parse_refresh_response(response, token) rescue Errors::TransportError => e logger.warn(e.message) nil rescue JSON::ParserError => e logger.warn("Invalid token refresh response: #{e.message}") nil rescue HTTPX::Error => e logger.warn("Network error during token refresh: #{e.message}") nil end
def retry_if_redirect_mismatch(response, server_metadata, params, registered_redirect_uri)
-
(HTTPX::Response)- response (possibly retried)
Parameters:
-
registered_redirect_uri(String) -- registered redirect URI -
params(Hash) -- exchange parameters -
server_metadata(ServerMetadata) -- server metadata -
response(HTTPX::Response) -- initial response
def retry_if_redirect_mismatch(response, server_metadata, params, registered_redirect_uri) # Don't retry on error responses return response if response.is_a?(HTTPX::ErrorResponse) return response if response.status == 200 redirect_hint = HttpResponseHandler.extract_redirect_mismatch(response.body.to_s) return response unless redirect_hint return response if redirect_hint[:expected] == registered_redirect_uri logger.warn("Redirect URI mismatch, retrying with: #{redirect_hint[:expected]}") params[:redirect_uri] = redirect_hint[:expected] post_token_exchange(server_metadata, params) end
def validate_token_response!(response, context)
-
(Errors::TransportError)- if response is invalid
Parameters:
-
context(String) -- context for error messages -
response(HTTPX::Response, HTTPX::ErrorResponse) -- HTTP response
def validate_token_response!(response, context) # Handle HTTPX ErrorResponse if response.is_a?(HTTPX::ErrorResponse) error_message = response.error&.message || "Request failed" raise Errors::TransportError.new(message: "#{context} failed: #{error_message}") end oauth_error = extract_oauth_error(response.body.to_s) raise_oauth_error!(context, oauth_error, response.status) if oauth_error return if response.status == 200 raise Errors::TransportError.new( message: "#{context} failed: HTTP #{response.status}", code: response.status ) end