class PhusionPassenger::RequestHandler::ThreadHandler

This class encapsulates the logic of a single RequestHandler thread.

def accept_and_process_next_request(socket_wrapper, channel, buffer)

Returns true if the socket has been hijacked, false otherwise.
def accept_and_process_next_request(socket_wrapper, channel, buffer)
	@stats_mutex.synchronize do
		@interruptable = true
	end
	connection = socket_wrapper.wrap(@server_socket.accept)
	@stats_mutex.synchronize do
		@interruptable = false
		@iteration    += 1
	end
	trace(3, "Accepted new request on socket #{@socket_name}")
	channel.io = connection
	if headers = parse_request(connection, channel, buffer)
		prepare_request(connection, headers)
		begin
			if headers[REQUEST_METHOD] == PING
				process_ping(headers, connection)
			elsif headers[REQUEST_METHOD] == OOBW
				process_oobw(headers, connection)
			else
				process_request(headers, connection, socket_wrapper, @protocol == :http)
			end
		rescue Exception
			has_error = true
			raise
		ensure
			if headers[RACK_HIJACK_IO]
				socket_wrapper = nil
				connection = nil
				channel = nil
			end
			finalize_request(connection, headers, has_error)
			trace(3, "Request done.")
		end
	else
		trace(2, "No headers parsed; disconnecting client.")
	end
rescue Interrupted
	raise
rescue => e
	if socket_wrapper && socket_wrapper.source_of_exception?(e)
		# EPIPE is harmless, it just means that the client closed the connection.
		# Other errors might indicate a problem so we print them, but they're
		# probably not bad enough to warrant stopping the request handler.
		if !e.is_a?(Errno::EPIPE)
			print_exception("Passenger RequestHandler's client socket", e)
		end
	else
		if @analytics_logger && headers && headers[PASSENGER_TXN_ID]
			log_analytics_exception(headers, e)
		end
		raise e if should_reraise_error?(e)
	end
ensure
	# The 'close_write' here prevents forked child
	# processes from unintentionally keeping the
	# connection open.
	if connection && !connection.closed?
		begin
			connection.close_write
		rescue SystemCallError
		end
		begin
			connection.close
		rescue SystemCallError
		end
	end
end

def finalize_request(connection, headers, has_error)

def finalize_request(connection, headers, has_error)
	if connection
		connection.stop_simulating_eof!
	end
	log = headers[PASSENGER_ANALYTICS_WEB_LOG]
	if log && !log.closed?
		exception_occurred = false
		begin
			log.end_measure("app request handler processing", has_error)
			if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS
				log.message("Final objects on heap: #{ObjectSpace.live_objects}")
			end
			if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS
				log.message("Final objects allocated so far: #{ObjectSpace.allocated_objects}")
			elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS
				count = ObjectSpace.count_objects
				log.message("Final objects allocated so far: #{count[:TOTAL] - count[:FREE]}")
			end
			if GC_SUPPORTS_TIME
				log.message("Final GC time: #{GC.time}")
			end
			if GC_SUPPORTS_CLEAR_STATS
				# Clear statistics to void integer wraps.
				GC.clear_stats
			end
			Thread.current[PASSENGER_ANALYTICS_WEB_LOG] = nil
		rescue Exception
			# Maybe this exception was raised while communicating
			# with the logging agent. If that is the case then
			# log.close may also raise an exception, but we're only
			# interested in the original exception. So if this
			# situation occurs we must ignore any exceptions raised
			# by log.close.
			exception_occurred = true
			raise
		ensure
			# It is important that the following call receives an ACK
			# from the logging agent and that we don't close the socket
			# connection until the ACK has been received, otherwise
			# the helper agent may close the transaction before this
			# process's openTransaction command is processed.
			begin
				log.close
			rescue
				raise if !exception_occurred
			end
		end
	end
	
	#################
end

def initialize(request_handler, options = {})

def initialize(request_handler, options = {})
	@request_handler   = request_handler
	@server_socket     = Utils.require_option(options, :server_socket)
	@socket_name       = Utils.require_option(options, :socket_name)
	@protocol          = Utils.require_option(options, :protocol)
	@app_group_name    = Utils.require_option(options, :app_group_name)
	Utils.install_options_as_ivars(self, options,
		:app,
		:analytics_logger,
		:connect_password
	)
	@stats_mutex   = Mutex.new
	@interruptable = false
	@iteration     = 0
	if @protocol == :session
		metaclass = class << self; self; end
		metaclass.class_eval do
			alias parse_request parse_session_request
		end
	elsif @protocol == :http
		metaclass = class << self; self; end
		metaclass.class_eval do
			alias parse_request parse_http_request
		end
	else
		raise ArgumentError, "Unknown protocol specified"
	end
end

def install

def install
	@thread = Thread.current
	Thread.current[:passenger_thread_handler] = self
	PhusionPassenger.call_event(:starting_request_handler_thread)
end

def log_analytics_exception(env, exception)

def log_analytics_exception(env, exception)
	log = @analytics_logger.new_transaction(
		@app_group_name,
		:exceptions,
		env[PASSENGER_UNION_STATION_KEY])
	begin
		request_txn_id = env[PASSENGER_TXN_ID]
		message = exception.message
		message = exception.to_s if message.empty?
		message = [message].pack('m')
		message.gsub!("\n", "")
		backtrace_string = [exception.backtrace.join("\n")].pack('m')
		backtrace_string.gsub!("\n", "")
		log.message("Request transaction ID: #{request_txn_id}")
		log.message("Message: #{message}")
		log.message("Class: #{exception.class.name}")
		log.message("Backtrace: #{backtrace_string}")
	ensure
		log.close
	end
end

def main_loop(finish_callback)

def main_loop(finish_callback)
	socket_wrapper = Utils::UnseekableSocket.new
	channel        = MessageChannel.new
	buffer         = ''
	buffer.force_encoding('binary') if buffer.respond_to?(:force_encoding)
	
	begin
		finish_callback.call
		while true
			hijacked = accept_and_process_next_request(socket_wrapper, channel, buffer)
			socket_wrapper = Utils::UnseekableSocket.new if hijacked
		end
	rescue Interrupted
		# Do nothing.
	end
	debug("Thread handler main loop exited normally")
ensure
	@stats_mutex.synchronize { @interruptable = true }
end

def parse_http_request(connection, channel, buffer)

socket is intended to be used for debugging purposes only.
HTTP parser and is not intended to be complete, fast or secure, since the HTTP server
Like parse_session_request, but parses an HTTP request. This is a very minimalistic
def parse_http_request(connection, channel, buffer)
	headers = {}
	
	data = ""
	while data !~ /\r\n\r\n/ && data.size < MAX_HEADER_SIZE
		data << connection.readpartial(16 * 1024)
	end
	if data.size >= MAX_HEADER_SIZE
		warn("*** Passenger RequestHandler warning: " <<
			"HTTP header size exceeded maximum.")
		return
	end
	
	data.gsub!(/\r\n\r\n.*/, '')
	data.split("\r\n").each_with_index do |line, i|
		if i == 0
			# GET / HTTP/1.1
			line =~ /^([A-Za-z]+) (.+?) (HTTP\/\d\.\d)$/
			request_method = $1
			request_uri    = $2
			protocol       = $3
			path_info, query_string    = request_uri.split("?", 2)
			headers[REQUEST_METHOD]    = request_method
			headers["REQUEST_URI"]     = request_uri
			headers["QUERY_STRING"]    = query_string || ""
			headers["SCRIPT_NAME"]     = ""
			headers["PATH_INFO"]       = path_info
			headers["SERVER_NAME"]     = "127.0.0.1"
			headers["SERVER_PORT"]     = connection.addr[1].to_s
			headers["SERVER_PROTOCOL"] = protocol
		else
			header, value = line.split(/\s*:\s*/, 2)
			header.upcase!            # "Foo-Bar" => "FOO-BAR"
			header.gsub!("-", "_")    #           => "FOO_BAR"
			if header == CONTENT_LENGTH || header == "CONTENT_TYPE"
				headers[header] = value
			else
				headers["HTTP_#{header}"] = value
			end
		end
	end
	
	if @connect_password && headers["HTTP_X_PASSENGER_CONNECT_PASSWORD"] != @connect_password
		warn "*** Passenger RequestHandler warning: " <<
			"someone tried to connect with an invalid connect password."
		return
	else
		return headers
	end
rescue EOFError
	return
end

def parse_session_request(connection, channel, buffer)

def parse_session_request(connection, channel, buffer)
	headers_data = channel.read_scalar(buffer, MAX_HEADER_SIZE)
	if headers_data.nil?
		return
	end
	headers = Utils.split_by_null_into_hash(headers_data)
	if @connect_password && headers[PASSENGER_CONNECT_PASSWORD] != @connect_password
		warn "*** Passenger RequestHandler warning: " <<
			"someone tried to connect with an invalid connect password."
		return
	else
		return headers
	end
rescue SecurityError => e
	warn("*** Passenger RequestHandler warning: " <<
		"HTTP header size exceeded maximum.")
	return
end

def prepare_request(connection, headers)

def prepare_request(connection, headers)
	if (!headers.has_key?(CONTENT_LENGTH) && !headers.has_key?(TRANSFER_ENCODING)) ||
	  headers[CONTENT_LENGTH] == 0
		connection.simulate_eof!
	end
	if @analytics_logger && headers[PASSENGER_TXN_ID]
		txn_id = headers[PASSENGER_TXN_ID]
		union_station_key = headers[PASSENGER_UNION_STATION_KEY]
		log = @analytics_logger.continue_transaction(txn_id,
			@app_group_name,
			:requests, union_station_key)
		headers[PASSENGER_ANALYTICS_WEB_LOG] = log
		Thread.current[PASSENGER_ANALYTICS_WEB_LOG] = log
		Thread.current[PASSENGER_TXN_ID] = txn_id
		Thread.current[PASSENGER_UNION_STATION_KEY] = union_station_key
		if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS
			log.message("Initial objects on heap: #{ObjectSpace.live_objects}")
		end
		if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS
			log.message("Initial objects allocated so far: #{ObjectSpace.allocated_objects}")
		elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS
			count = ObjectSpace.count_objects
			log.message("Initial objects allocated so far: #{count[:TOTAL] - count[:FREE]}")
		end
		if GC_SUPPORTS_TIME
			log.message("Initial GC time: #{GC.time}")
		end
		log.begin_measure("app request handler processing")
	end
	
	#################
end

def process_oobw(env, connection)

def process_oobw(env, connection)
	PhusionPassenger.call_event(:oob_work)
	connection.write("oobw done")
end

def process_ping(env, connection)

def process_ping(env, connection)
	connection.write("pong")
end

def should_reraise_app_error?(e, socket_wrapper)

def should_reraise_app_error?(e, socket_wrapper)
	return false
end

def should_reraise_error?(e)

def should_reraise_error?(e)
	# Stubable by unit tests.
	return true
end

def should_swallow_app_error?(e, socket_wrapper)

def should_swallow_app_error?(e, socket_wrapper)
	return socket_wrapper && socket_wrapper.source_of_exception?(e) && e.is_a?(Errno::EPIPE)
end