class RubyLLM::MCP::Transports::StreamableHTTP
Main StreamableHTTP transport class
def active_clients_count
def active_clients_count @clients_mutex.synchronize do @clients.size end end
def add_on_response_body_chunk_callback(client, options)
def add_on_response_body_chunk_callback(client, options) buffer = +"" client.on_response_body_chunk do |request, response, chunk| # Only process chunks for text/event-stream and if still running next unless @running && !@abort_controller if chunk.include?("event: stop") RubyLLM::MCP.logger.debug "Closing SSE stream" request.close end content_type = response.headers["content-type"] if content_type&.include?("text/event-stream") buffer << chunk.to_s while (event_data = extract_sse_event(buffer)) raw_event, remaining_buffer = event_data buffer.replace(remaining_buffer) next unless raw_event && raw_event[:data] if raw_event[:id] options.on_resumption_token&.call(raw_event[:id]) end process_sse_event(raw_event, options.replay_message_id) end end end end
def alive?
def alive? @running end
def build_common_headers
def build_common_headers headers = @headers.dup headers["mcp-session-id"] = @session_id if @session_id headers["mcp-protocol-version"] = @protocol_version if @protocol_version headers["X-CLIENT-ID"] = @client_id headers["Origin"] = @uri.to_s headers end
def calculate_reconnection_delay(attempt)
def calculate_reconnection_delay(attempt) initial = @reconnection_options.initial_reconnection_delay factor = @reconnection_options.reconnection_delay_grow_factor max_delay = @reconnection_options.max_reconnection_delay [initial * (factor**attempt), max_delay].min end
def cleanup_connection
def cleanup_connection clients_to_close = [] @clients_mutex.synchronize do clients_to_close = @clients.dup @clients.clear end clients_to_close.each do |client| client.close if client.respond_to?(:close) rescue StandardError => e RubyLLM::MCP.logger.debug "Error closing HTTPX client: #{e.message}" end @connection = nil end
def cleanup_sse_resources
def cleanup_sse_resources @running = false @abort_controller = true @sse_mutex.synchronize do if @sse_thread&.alive? @sse_thread.kill @sse_thread.join(5) # Wait up to 5 seconds for thread to finish @sse_thread = nil end end # Clear any pending requests @pending_mutex.synchronize do @pending_requests.each_value do |queue| queue.close if queue.respond_to?(:close) rescue StandardError # Ignore errors when closing queues end @pending_requests.clear end end
def close
def close terminate_session cleanup_sse_resources cleanup_connection end
def close_client(client)
def close_client(client) client.close if client.respond_to?(:close) rescue StandardError => e RubyLLM::MCP.logger.debug "Error closing HTTPX client: #{e.message}" ensure unregister_client(client) end
def create_connection
def create_connection client = Support::HTTPClient.connection.with( timeout: { connect_timeout: 10, read_timeout: @request_timeout / 1000, write_timeout: @request_timeout / 1000, operation_timeout: @request_timeout / 1000 } ) if @oauth_options&.enabled? client = client.plugin(:oauth).oauth_auth( issuer: @oauth_options.issuer, client_id: @oauth_options.client_id, client_secret: @oauth_options.client_secret, scope: @oauth_options.scope ) client.with_access_token end register_client(client) end
def create_connection_with_sse_callbacks(options, headers)
def create_connection_with_sse_callbacks(options, headers) client = HTTPX.plugin(:callbacks) client = add_on_response_body_chunk_callback(client, options) client = client.with( timeout: { connect_timeout: 10, read_timeout: @request_timeout / 1000, write_timeout: @request_timeout / 1000, operation_timeout: @request_timeout / 1000 }, headers: headers ) if @version == :http1 client = client.with( ssl: { alpn_protocols: ["http/1.1"] } ) end register_client(client) end
def create_connection_with_streaming_callbacks(request_id)
def create_connection_with_streaming_callbacks(request_id) buffer = +"" client = Support::HTTPClient.connection.plugin(:callbacks) .on_response_body_chunk do |request, _response, chunk| next unless @running && !@abort_controller RubyLLM::MCP.logger.debug "Received chunk: #{chunk.bytesize} bytes for #{request.uri}" buffer << chunk process_sse_buffer_events(buffer, request_id&.to_s) end .with( timeout: { connect_timeout: 10, read_timeout: @request_timeout / 1000, write_timeout: @request_timeout / 1000, operation_timeout: @request_timeout / 1000 } ) if @oauth_options&.enabled? client = client.plugin(:oauth).oauth_auth( issuer: @oauth_options.issuer, client_id: @oauth_options.client_id, client_secret: @oauth_options.client_secret, scope: @oauth_options.scope ) client.with_access_token end register_client(client) end
def extract_resource_metadata_url(response)
def extract_resource_metadata_url(response) # Extract resource metadata URL from response headers if present # Guard against error responses that don't have headers return nil unless response.respond_to?(:headers) metadata_url = response.headers["mcp-resource-metadata-url"] metadata_url ? URI(metadata_url) : nil end
def extract_sse_event(buffer)
def extract_sse_event(buffer) # Support both Unix (\n\n) and Windows (\r\n\r\n) line endings separator = if buffer.include?("\r\n\r\n") "\r\n\r\n" elsif buffer.include?("\n\n") "\n\n" else return nil end raw, rest = buffer.split(separator, 2) [parse_sse_event(raw), rest || ""] end
def handle_accepted_response(original_message)
def handle_accepted_response(original_message) # 202 Accepted - start SSE stream if this was an initialization if original_message.is_a?(Hash) && original_message["method"] == "initialize" start_sse_stream end nil end
def handle_client_error(response)
def handle_client_error(response) begin # Safely access response body response_body = response.respond_to?(:body) ? response.body.to_s : "Unknown error" error_body = JSON.parse(response_body) if error_body.is_a?(Hash) && error_body["error"] error_message = error_body["error"]["message"] || error_body["error"]["code"] if error_message.to_s.downcase.include?("session") raise Errors::TransportError.new( code: response.status, message: "Server error: #{error_message} (Current session ID: #{@session_id || 'none'})" ) end raise Errors::TransportError.new( code: response.status, message: "Server error: #{error_message}" ) end rescue JSON::ParserError # Fall through to generic error end # Safely access response attributes response_body = response.respond_to?(:body) ? response.body.to_s : "Unknown error" status_code = response.respond_to?(:status) ? response.status : "Unknown" raise Errors::TransportError.new( code: status_code, message: "HTTP client error: #{status_code} - #{response_body}" ) end
def handle_httpx_error_response!(response, context:, allow_eof_for_sse: false)
def handle_httpx_error_response!(response, context:, allow_eof_for_sse: false) return false unless response.is_a?(HTTPX::ErrorResponse) error = response.error # Special handling for EOFError in SSE contexts if allow_eof_for_sse && error.is_a?(EOFError) RubyLLM::MCP.logger.info "SSE stream closed: #{response.error.message}" return :eof_handled end if error.is_a?(HTTPX::ReadTimeoutError) raise Errors::TimeoutError.new( message: "Request timed out after #{@request_timeout / 1000} seconds", request_id: context[:request_id] ) end error_message = response.error&.message || "Request failed" RubyLLM::MCP.logger.error "HTTPX error in #{context[:location]}: #{error_message}" raise Errors::TransportError.new( code: nil, message: "HTTPX Error #{context}: #{error_message}" ) end
def handle_response(response, request_id, original_message)
def handle_response(response, request_id, original_message) # Handle HTTPX error responses first handle_httpx_error_response!(response, context: { location: "handling response", request_id: request_id }) # Extract session ID if present (only for successful responses) session_id = response.headers["mcp-session-id"] @session_id = session_id if session_id case response.status when 200 handle_success_response(response, request_id, original_message) when 202 handle_accepted_response(original_message) when 404 handle_session_expired when 405, 401 # TODO: Implement 401 handling this once we are adding authorization # Method not allowed - acceptable for some endpoints nil when 400...500 handle_client_error(response) else response_body = response.respond_to?(:body) ? response.body.to_s : "Unknown error" raise Errors::TransportError.new( code: response.status, message: "HTTP request failed: #{response.status} - #{response_body}" ) end end
def handle_session_expired
def handle_session_expired @session_id = nil raise Errors::SessionExpiredError.new( message: "Session expired, re-initialization required" ) end
def handle_success_response(response, request_id, _original_message)
def handle_success_response(response, request_id, _original_message) content_type = response.respond_to?(:headers) ? response.headers["content-type"] : nil if content_type&.include?("text/event-stream") start_sse_stream nil elsif content_type&.include?("application/json") response_body = response.respond_to?(:body) ? response.body.to_s : "{}" if response_body == "null" # Fix related to official MCP Ruby SDK implementation response_body = "{}" end json_response = JSON.parse(response_body) result = RubyLLM::MCP::Result.new(json_response, session_id: @session_id) if request_id @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } end result else raise Errors::TransportError.new( code: -1, message: "Unexpected content type: #{content_type}" ) end rescue StandardError => e raise Errors::TransportError.new( message: "Invalid JSON response: #{e.message}", error: e ) end
def initialize( # rubocop:disable Metrics/ParameterLists
def initialize( # rubocop:disable Metrics/ParameterLists url:, request_timeout:, coordinator:, headers: {}, reconnection: {}, version: :http2, oauth: nil, rate_limit: nil, reconnection_options: nil, session_id: nil ) @url = URI(url) @coordinator = coordinator @request_timeout = request_timeout @headers = headers || {} @session_id = session_id @version = version @reconnection_options = reconnection_options || ReconnectionOptions.new @protocol_version = nil @session_id = session_id @resource_metadata_url = nil @client_id = SecureRandom.uuid @reconnection_options = ReconnectionOptions.new(**reconnection) @oauth_options = OAuthOptions.new(**oauth) unless oauth.nil? @rate_limiter = Support::RateLimiter.new(**rate_limit) if rate_limit @id_counter = 0 @id_mutex = Mutex.new @pending_requests = {} @pending_mutex = Mutex.new @running = true @abort_controller = nil @sse_thread = nil @sse_mutex = Mutex.new # Thread-safe collection of all HTTPX clients @clients = [] @clients_mutex = Mutex.new @connection = create_connection end
def parse_sse_event(raw)
def parse_sse_event(raw) event = {} raw.each_line do |line| line = line.strip case line when /^data:\s*(.*)/ (event[:data] ||= []) << ::Regexp.last_match(1) when /^event:\s*(.*)/ event[:event] = ::Regexp.last_match(1) when /^id:\s*(.*)/ event[:id] = ::Regexp.last_match(1) end end event[:data] = event[:data]&.join("\n") event end
def process_sse_buffer_events(buffer, _request_id)
def process_sse_buffer_events(buffer, _request_id) return unless @running && !@abort_controller while (event_data = extract_sse_event(buffer)) raw_event, remaining_buffer = event_data buffer.replace(remaining_buffer) process_sse_event(raw_event, nil) if raw_event && raw_event[:data] end end
def process_sse_event(raw_event, replay_message_id)
def process_sse_event(raw_event, replay_message_id) return unless raw_event[:data] return unless @running && !@abort_controller begin event_data = JSON.parse(raw_event[:data]) # Handle replay message ID if specified if replay_message_id && event_data.is_a?(Hash) && event_data["id"] event_data["id"] = replay_message_id end result = RubyLLM::MCP::Result.new(event_data, session_id: @session_id) RubyLLM::MCP.logger.debug "SSE Result Received: #{result.inspect}" result = @coordinator.process_result(result) return if result.nil? request_id = result.id&.to_s if request_id @pending_mutex.synchronize do response_queue = @pending_requests.delete(request_id) response_queue&.push(result) end end rescue JSON::ParserError => e RubyLLM::MCP.logger.warn "Failed to parse SSE event data: #{raw_event[:data]} - #{e.message}" rescue Errors::UnknownRequest => e RubyLLM::MCP.logger.warn "Unknown request from MCP server: #{e.message}" rescue StandardError => e RubyLLM::MCP.logger.error "Error processing SSE event: #{e.message}" raise Errors::TransportError.new( message: "Error processing SSE event: #{e.message}", error: e ) end end
def register_client(client)
def register_client(client) @clients_mutex.synchronize do @clients << client end client end
def request(body, add_id: true, wait_for_response: true)
def request(body, add_id: true, wait_for_response: true) if @rate_limiter&.exceeded? sleep(1) while @rate_limiter&.exceeded? end @rate_limiter&.add # Generate a unique request ID for requests if add_id && body.is_a?(Hash) && !body.key?("id") @id_mutex.synchronize { @id_counter += 1 } body["id"] = @id_counter end request_id = body.is_a?(Hash) ? body["id"] : nil is_initialization = body.is_a?(Hash) && body["method"] == "initialize" response_queue = setup_response_queue(request_id, wait_for_response) result = send_http_request(body, request_id, is_initialization: is_initialization) return result if result.is_a?(RubyLLM::MCP::Result) if wait_for_response && request_id wait_for_response_with_timeout(request_id.to_s, response_queue) end end
def send_http_request(body, request_id, is_initialization: false)
def send_http_request(body, request_id, is_initialization: false) headers = build_common_headers headers["Content-Type"] = "application/json" headers["Accept"] = "application/json, text/event-stream" json_body = JSON.generate(body) RubyLLM::MCP.logger.debug "Sending Request: #{json_body}" begin # Set up connection with streaming callbacks if not initialization connection = if is_initialization @connection else create_connection_with_streaming_callbacks(request_id) end response = connection.post(@url, json: body, headers: headers) handle_response(response, request_id, body) ensure @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } if request_id end end
def set_protocol_version(version)
def set_protocol_version(version) @protocol_version = version end
def setup_response_queue(request_id, wait_for_response)
def setup_response_queue(request_id, wait_for_response) response_queue = Queue.new if wait_for_response && request_id @pending_mutex.synchronize do @pending_requests[request_id.to_s] = response_queue end end response_queue end
def start
def start @abort_controller = false end
def start_sse(options) # rubocop:disable Metrics/MethodLength
def start_sse(options) # rubocop:disable Metrics/MethodLength attempt_count = 0 begin headers = build_common_headers headers["Accept"] = "text/event-stream" if options.resumption_token headers["Last-Event-ID"] = options.resumption_token end # Set up SSE streaming connection with callbacks connection = create_connection_with_sse_callbacks(options, headers) response = connection.get(@url) # Handle HTTPX error responses first error_result = handle_httpx_error_response!(response, context: { location: "SSE connection" }, allow_eof_for_sse: true) return if error_result == :eof_handled case response.status when 200 # SSE stream established successfully RubyLLM::MCP.logger.debug "SSE stream established" # Response will be processed through callbacks when 405, 401 # Server doesn't support SSE - this is acceptable RubyLLM::MCP.logger.info "Server does not support SSE streaming" nil when 409 # Conflict - SSE connection already exists for this session # This is expected when reusing sessions and is acceptable RubyLLM::MCP.logger.debug "SSE stream already exists for this session" nil else reason_phrase = response.respond_to?(:reason_phrase) ? response.reason_phrase : nil raise Errors::TransportError.new( code: response.status, message: "Failed to open SSE stream: #{reason_phrase || response.status}" ) end rescue StandardError => e RubyLLM::MCP.logger.error "SSE stream error: #{e.message}" # Attempt reconnection with exponential backoff if @running && !@abort_controller && attempt_count < @reconnection_options.max_retries delay = calculate_reconnection_delay(attempt_count) RubyLLM::MCP.logger.info "Reconnecting SSE stream in #{delay}ms..." sleep(delay / 1000.0) attempt_count += 1 retry end raise e end end
def start_sse_stream(options = StartSSEOptions.new)
def start_sse_stream(options = StartSSEOptions.new) return unless @running && !@abort_controller @sse_mutex.synchronize do return if @sse_thread&.alive? @sse_thread = Thread.new do start_sse(options) end end end
def terminate_session
def terminate_session return unless @session_id begin headers = build_common_headers response = @connection.delete(@url, headers: headers) # Handle HTTPX error responses first handle_httpx_error_response!(response, context: { location: "terminating session" }) # 405 Method Not Allowed is acceptable per spec unless [200, 405].include?(response.status) reason_phrase = response.respond_to?(:reason_phrase) ? response.reason_phrase : nil raise Errors::TransportError.new( code: response.status, message: "Failed to terminate session: #{reason_phrase || response.status}" ) end @session_id = nil rescue StandardError => e raise Errors::TransportError.new( message: "Failed to terminate session: #{e.message}", code: nil, error: e ) end end
def unregister_client(client)
def unregister_client(client) @clients_mutex.synchronize do @clients.delete(client) end end
def wait_for_response_with_timeout(request_id, response_queue)
def wait_for_response_with_timeout(request_id, response_queue) with_timeout(@request_timeout / 1000, request_id: request_id) do response_queue.pop end rescue RubyLLM::MCP::Errors::TimeoutError => e log_message = "StreamableHTTP request timeout (ID: #{request_id}) after #{@request_timeout / 1000} seconds" RubyLLM::MCP.logger.error(log_message) @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } raise e end