class PhusionPassenger::RequestHandler::ThreadHandler
This class encapsulates the logic of a single RequestHandler thread.
def accept_and_process_next_request(socket_wrapper, channel, buffer)
def accept_and_process_next_request(socket_wrapper, channel, buffer) @stats_mutex.synchronize do @interruptable = true end if @last_connection connection = @last_connection channel.io = connection @last_connection = nil headers = parse_request(connection, channel, buffer) else connection = socket_wrapper.wrap(@server_socket.accept) end @stats_mutex.synchronize do @interruptable = false @iteration += 1 end trace(3, "Accepted new request on socket #{@socket_name}") if !headers # New socket accepted, instead of keeping-alive an old one channel.io = connection headers = parse_request(connection, channel, buffer) end if headers prepare_request(connection, headers) begin if headers[REQUEST_METHOD] == GET process_request(headers, connection, socket_wrapper, @protocol == :http) elsif headers[REQUEST_METHOD] == PING process_ping(headers, connection) elsif headers[REQUEST_METHOD] == OOBW process_oobw(headers, connection) else process_request(headers, connection, socket_wrapper, @protocol == :http) end rescue Exception has_error = true raise ensure if headers[RACK_HIJACK_IO] socket_wrapper = nil connection = nil channel = nil end finalize_request(connection, headers, has_error) trace(3, "Request done.") end else trace(2, "No headers parsed; disconnecting client.") end rescue Interrupted raise rescue => e if socket_wrapper && socket_wrapper.source_of_exception?(e) # EPIPE is harmless, it just means that the client closed the connection. # Other errors might indicate a problem so we print them, but they're # probably not bad enough to warrant stopping the request handler. if !e.is_a?(Errno::EPIPE) print_exception("Passenger RequestHandler's client socket", e) end else if headers PhusionPassenger.log_request_exception(headers, e) end raise e if should_reraise_error?(e) end ensure # Close connection if keep-alive not possible if connection && !connection.closed? && !@last_connection # The 'close_write' here prevents forked child # processes from unintentionally keeping the # connection open. begin connection.close_write rescue SystemCallError, IOError end begin connection.close rescue SystemCallError end end end
def finalize_request(connection, headers, has_error)
def finalize_request(connection, headers, has_error) transaction = headers[UNION_STATION_REQUEST_TRANSACTION] Thread.current[UNION_STATION_CORE] = nil Thread.current[UNION_STATION_REQUEST_TRANSACTION] = nil if connection connection.stop_simulating_eof! end if transaction && !transaction.closed? exception_occurred = false begin transaction.end_measure("app request handler processing", has_error) if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS transaction.message("Final objects on heap: #{ObjectSpace.live_objects}") end if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS transaction.message("Final objects allocated so far: #{ObjectSpace.allocated_objects}") elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS count = ObjectSpace.count_objects transaction.message("Final objects allocated so far: #{count[:TOTAL] - count[:FREE]}") end if GC_SUPPORTS_TIME transaction.message("Final GC time: #{GC.time}") end if GC_SUPPORTS_CLEAR_STATS # Clear statistics to void integer wraps. GC.clear_stats end rescue Exception # Maybe this exception was raised while communicating # with the logging agent. If that is the case then # transaction.close may also raise an exception, but we're only # interested in the original exception. So if this # situation occurs we must ignore any exceptions raised # by transaction.close. exception_occurred = true raise ensure # It is important that the following call receives an ACK # from the logging agent and that we don't close the socket # connection until the ACK has been received, otherwise # the helper agent may close the transaction before this # process's openTransaction command is processed. begin transaction.close rescue raise if !exception_occurred end end end if !has_error && @keepalive_performed && connection trace(3, "Keep-aliving connection.") @last_connection = connection end ################# end
def initialize(request_handler, options = {})
def initialize(request_handler, options = {}) @request_handler = request_handler @server_socket = Utils.require_option(options, :server_socket) @socket_name = Utils.require_option(options, :socket_name) @protocol = Utils.require_option(options, :protocol) @app_group_name = Utils.require_option(options, :app_group_name) Utils.install_options_as_ivars(self, options, :app, :union_station_core, :connect_password, :keepalive_enabled ) @stats_mutex = Mutex.new @interruptable = false @iteration = 0 if @protocol == :session metaclass = class << self; self; end metaclass.class_eval do alias parse_request parse_session_request end elsif @protocol == :http metaclass = class << self; self; end metaclass.class_eval do alias parse_request parse_http_request end else raise ArgumentError, "Unknown protocol specified" end end
def install
def install @thread = Thread.current Thread.current[:passenger_thread_handler] = self PhusionPassenger.call_event(:starting_request_handler_thread) end
def main_loop(finish_callback)
def main_loop(finish_callback) socket_wrapper = Utils::UnseekableSocket.new channel = MessageChannel.new buffer = '' buffer.force_encoding('binary') if buffer.respond_to?(:force_encoding) begin finish_callback.call while true hijacked = accept_and_process_next_request(socket_wrapper, channel, buffer) socket_wrapper = Utils::UnseekableSocket.new if hijacked end rescue Interrupted # Do nothing. end debug("Thread handler main loop exited normally") ensure @stats_mutex.synchronize { @interruptable = true } end
def parse_http_request(connection, channel, buffer)
HTTP parser and is not intended to be complete, fast or secure, since the HTTP server
Like parse_session_request, but parses an HTTP request. This is a very minimalistic
def parse_http_request(connection, channel, buffer) headers = {} data = "" while data !~ /\r\n\r\n/ && data.size < MAX_HEADER_SIZE data << connection.readpartial(16 * 1024) end if data.size >= MAX_HEADER_SIZE warn("*** Passenger RequestHandler warning: " << "HTTP header size exceeded maximum.") return end data.gsub!(/\r\n\r\n.*/, '') data.split("\r\n").each_with_index do |line, i| if i == 0 # GET / HTTP/1.1 line =~ /^([A-Za-z]+) (.+?) (HTTP\/\d\.\d)$/ request_method = $1 request_uri = $2 protocol = $3 path_info, query_string = request_uri.split("?", 2) headers[REQUEST_METHOD] = request_method headers["REQUEST_URI"] = request_uri headers["QUERY_STRING"] = query_string || "" headers["SCRIPT_NAME"] = "" headers["PATH_INFO"] = path_info headers["SERVER_NAME"] = "127.0.0.1" headers["SERVER_PORT"] = connection.addr[1].to_s headers["SERVER_PROTOCOL"] = protocol else header, value = line.split(/\s*:\s*/, 2) header.upcase! # "Foo-Bar" => "FOO-BAR" header.gsub!("-", "_") # => "FOO_BAR" if header == CONTENT_LENGTH || header == "CONTENT_TYPE" headers[header] = value else headers["HTTP_#{header}"] = value end end end if @connect_password && headers["HTTP_X_PASSENGER_CONNECT_PASSWORD"] != @connect_password warn "*** Passenger RequestHandler warning: " << "someone tried to connect with an invalid connect password." return else return headers end rescue EOFError return end
def parse_session_request(connection, channel, buffer)
def parse_session_request(connection, channel, buffer) headers_data = channel.read_scalar(buffer, MAX_HEADER_SIZE) if headers_data.nil? return end headers = Utils::NativeSupportUtils.split_by_null_into_hash(headers_data) if @connect_password && headers[PASSENGER_CONNECT_PASSWORD] != @connect_password warn "*** Passenger RequestHandler warning: " << "someone tried to connect with an invalid connect password." return else return headers end rescue SecurityError => e warn("*** Passenger RequestHandler warning: " << "HTTP header size exceeded maximum.") return end
def prepare_request(connection, headers)
def prepare_request(connection, headers) transfer_encoding = headers[TRANSFER_ENCODING] content_length = headers[CONTENT_LENGTH] @can_keepalive = @keepalive_enabled && !transfer_encoding && !content_length @keepalive_performed = false if !transfer_encoding && !content_length connection.simulate_eof! end if @union_station_core && headers[PASSENGER_TXN_ID] txn_id = headers[PASSENGER_TXN_ID] union_station_key = headers[PASSENGER_UNION_STATION_KEY] transaction = @union_station_core.continue_transaction(txn_id, @app_group_name, :requests, union_station_key) headers[UNION_STATION_REQUEST_TRANSACTION] = transaction headers[UNION_STATION_CORE] = @union_station_core headers[PASSENGER_APP_GROUP_NAME] = @app_group_name Thread.current[UNION_STATION_REQUEST_TRANSACTION] = transaction Thread.current[UNION_STATION_CORE] = @union_station_core Thread.current[PASSENGER_TXN_ID] = txn_id Thread.current[PASSENGER_UNION_STATION_KEY] = union_station_key if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS transaction.message("Initial objects on heap: #{ObjectSpace.live_objects}") end if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS transaction.message("Initial objects allocated so far: #{ObjectSpace.allocated_objects}") elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS count = ObjectSpace.count_objects transaction.message("Initial objects allocated so far: #{count[:TOTAL] - count[:FREE]}") end if GC_SUPPORTS_TIME transaction.message("Initial GC time: #{GC.time}") end transaction.begin_measure("app request handler processing") end ################# end
def process_oobw(env, connection)
def process_oobw(env, connection) PhusionPassenger.call_event(:oob_work) connection.write("oobw done") end
def process_ping(env, connection)
def process_ping(env, connection) connection.write("pong") end
def should_reraise_app_error?(e, socket_wrapper)
def should_reraise_app_error?(e, socket_wrapper) return false end
def should_reraise_error?(e)
def should_reraise_error?(e) # Stubable by unit tests. return true end
def should_swallow_app_error?(e, socket_wrapper)
def should_swallow_app_error?(e, socket_wrapper) return socket_wrapper && socket_wrapper.source_of_exception?(e) && e.is_a?(Errno::EPIPE) end