# Phusion Passenger - https://www.phusionpassenger.com/
# Copyright (c) 2010-2013 Phusion
#
# "Phusion Passenger" is a trademark of Hongli Lai & Ninh Bui.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require 'phusion_passenger/constants'
require 'phusion_passenger/debug_logging'
require 'phusion_passenger/message_channel'
require 'phusion_passenger/utils'
require 'phusion_passenger/utils/unseekable_socket'
module PhusionPassenger
class RequestHandler
# This class encapsulates the logic of a single RequestHandler thread.
class ThreadHandler
include DebugLogging
include Utils
class Interrupted < StandardError
end
REQUEST_METHOD = 'REQUEST_METHOD'.freeze
PING = 'PING'.freeze
OOBW = 'OOBW'.freeze
PASSENGER_CONNECT_PASSWORD = 'PASSENGER_CONNECT_PASSWORD'.freeze
CONTENT_LENGTH = 'CONTENT_LENGTH'.freeze
TRANSFER_ENCODING = 'TRANSFER_ENCODING'.freeze
MAX_HEADER_SIZE = 128 * 1024
OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS = ObjectSpace.respond_to?(:live_objects)
OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS = ObjectSpace.respond_to?(:allocated_objects)
OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS = ObjectSpace.respond_to?(:count_objects)
GC_SUPPORTS_TIME = GC.respond_to?(:time)
GC_SUPPORTS_CLEAR_STATS = GC.respond_to?(:clear_stats)
attr_reader :thread
attr_reader :stats_mutex
attr_reader :interruptable
attr_reader :iteration
def initialize(request_handler, options = {})
@request_handler = request_handler
@server_socket = Utils.require_option(options, :server_socket)
@socket_name = Utils.require_option(options, :socket_name)
@protocol = Utils.require_option(options, :protocol)
@app_group_name = Utils.require_option(options, :app_group_name)
Utils.install_options_as_ivars(self, options,
:app,
:analytics_logger,
:connect_password
)
@stats_mutex = Mutex.new
@interruptable = false
@iteration = 0
if @protocol == :session
metaclass = class << self; self; end
metaclass.class_eval do
alias parse_request parse_session_request
end
elsif @protocol == :http
metaclass = class << self; self; end
metaclass.class_eval do
alias parse_request parse_http_request
end
else
raise ArgumentError, "Unknown protocol specified"
end
end
def install
@thread = Thread.current
Thread.current[:passenger_thread_handler] = self
PhusionPassenger.call_event(:starting_request_handler_thread)
end
def main_loop(finish_callback)
socket_wrapper = Utils::UnseekableSocket.new
channel = MessageChannel.new
buffer = ''
buffer.force_encoding('binary') if buffer.respond_to?(:force_encoding)
begin
finish_callback.call
while true
hijacked = accept_and_process_next_request(socket_wrapper, channel, buffer)
socket_wrapper = Utils::UnseekableSocket.new if hijacked
end
rescue Interrupted
# Do nothing.
end
debug("Thread handler main loop exited normally")
ensure
@stats_mutex.synchronize { @interruptable = true }
end
private
# Returns true if the socket has been hijacked, false otherwise.
def accept_and_process_next_request(socket_wrapper, channel, buffer)
@stats_mutex.synchronize do
@interruptable = true
end
connection = socket_wrapper.wrap(@server_socket.accept)
@stats_mutex.synchronize do
@interruptable = false
@iteration += 1
end
trace(3, "Accepted new request on socket #{@socket_name}")
channel.io = connection
if headers = parse_request(connection, channel, buffer)
prepare_request(connection, headers)
begin
if headers[REQUEST_METHOD] == PING
process_ping(headers, connection)
elsif headers[REQUEST_METHOD] == OOBW
process_oobw(headers, connection)
else
process_request(headers, connection, socket_wrapper, @protocol == :http)
end
rescue Exception
has_error = true
raise
ensure
if headers[RACK_HIJACK_IO]
socket_wrapper = nil
connection = nil
channel = nil
end
finalize_request(connection, headers, has_error)
trace(3, "Request done.")
end
else
trace(2, "No headers parsed; disconnecting client.")
end
rescue Interrupted
raise
rescue => e
if socket_wrapper && socket_wrapper.source_of_exception?(e)
# EPIPE is harmless, it just means that the client closed the connection.
# Other errors might indicate a problem so we print them, but they're
# probably not bad enough to warrant stopping the request handler.
if !e.is_a?(Errno::EPIPE)
print_exception("Passenger RequestHandler's client socket", e)
end
else
if @analytics_logger && headers && headers[PASSENGER_TXN_ID]
log_analytics_exception(headers, e)
end
raise e if should_reraise_error?(e)
end
ensure
# The 'close_write' here prevents forked child
# processes from unintentionally keeping the
# connection open.
if connection && !connection.closed?
begin
connection.close_write
rescue SystemCallError
end
begin
connection.close
rescue SystemCallError
end
end
end
def parse_session_request(connection, channel, buffer)
headers_data = channel.read_scalar(buffer, MAX_HEADER_SIZE)
if headers_data.nil?
return
end
headers = Utils.split_by_null_into_hash(headers_data)
if @connect_password && headers[PASSENGER_CONNECT_PASSWORD] != @connect_password
warn "*** Passenger RequestHandler warning: " <<
"someone tried to connect with an invalid connect password."
return
else
return headers
end
rescue SecurityError => e
warn("*** Passenger RequestHandler warning: " <<
"HTTP header size exceeded maximum.")
return
end
# Like parse_session_request, but parses an HTTP request. This is a very minimalistic
# HTTP parser and is not intended to be complete, fast or secure, since the HTTP server
# socket is intended to be used for debugging purposes only.
def parse_http_request(connection, channel, buffer)
headers = {}
data = ""
while data !~ /\r\n\r\n/ && data.size < MAX_HEADER_SIZE
data << connection.readpartial(16 * 1024)
end
if data.size >= MAX_HEADER_SIZE
warn("*** Passenger RequestHandler warning: " <<
"HTTP header size exceeded maximum.")
return
end
data.gsub!(/\r\n\r\n.*/, '')
data.split("\r\n").each_with_index do |line, i|
if i == 0
# GET / HTTP/1.1
line =~ /^([A-Za-z]+) (.+?) (HTTP\/\d\.\d)$/
request_method = $1
request_uri = $2
protocol = $3
path_info, query_string = request_uri.split("?", 2)
headers[REQUEST_METHOD] = request_method
headers["REQUEST_URI"] = request_uri
headers["QUERY_STRING"] = query_string || ""
headers["SCRIPT_NAME"] = ""
headers["PATH_INFO"] = path_info
headers["SERVER_NAME"] = "127.0.0.1"
headers["SERVER_PORT"] = connection.addr[1].to_s
headers["SERVER_PROTOCOL"] = protocol
else
header, value = line.split(/\s*:\s*/, 2)
header.upcase! # "Foo-Bar" => "FOO-BAR"
header.gsub!("-", "_") # => "FOO_BAR"
if header == CONTENT_LENGTH || header == "CONTENT_TYPE"
headers[header] = value
else
headers["HTTP_#{header}"] = value
end
end
end
if @connect_password && headers["HTTP_X_PASSENGER_CONNECT_PASSWORD"] != @connect_password
warn "*** Passenger RequestHandler warning: " <<
"someone tried to connect with an invalid connect password."
return
else
return headers
end
rescue EOFError
return
end
def process_ping(env, connection)
connection.write("pong")
end
def process_oobw(env, connection)
PhusionPassenger.call_event(:oob_work)
connection.write("oobw done")
end
# def process_request(env, connection, socket_wrapper, full_http_response)
# raise NotImplementedError, "Override with your own implementation!"
# end
def prepare_request(connection, headers)
if (!headers.has_key?(CONTENT_LENGTH) && !headers.has_key?(TRANSFER_ENCODING)) ||
headers[CONTENT_LENGTH] == 0
connection.simulate_eof!
end
if @analytics_logger && headers[PASSENGER_TXN_ID]
txn_id = headers[PASSENGER_TXN_ID]
union_station_key = headers[PASSENGER_UNION_STATION_KEY]
log = @analytics_logger.continue_transaction(txn_id,
@app_group_name,
:requests, union_station_key)
headers[PASSENGER_ANALYTICS_WEB_LOG] = log
Thread.current[PASSENGER_ANALYTICS_WEB_LOG] = log
Thread.current[PASSENGER_TXN_ID] = txn_id
Thread.current[PASSENGER_UNION_STATION_KEY] = union_station_key
if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS
log.message("Initial objects on heap: #{ObjectSpace.live_objects}")
end
if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS
log.message("Initial objects allocated so far: #{ObjectSpace.allocated_objects}")
elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS
count = ObjectSpace.count_objects
log.message("Initial objects allocated so far: #{count[:TOTAL] - count[:FREE]}")
end
if GC_SUPPORTS_TIME
log.message("Initial GC time: #{GC.time}")
end
log.begin_measure("app request handler processing")
end
#################
end
def finalize_request(connection, headers, has_error)
if connection
connection.stop_simulating_eof!
end
log = headers[PASSENGER_ANALYTICS_WEB_LOG]
if log && !log.closed?
exception_occurred = false
begin
log.end_measure("app request handler processing", has_error)
if OBJECT_SPACE_SUPPORTS_LIVE_OBJECTS
log.message("Final objects on heap: #{ObjectSpace.live_objects}")
end
if OBJECT_SPACE_SUPPORTS_ALLOCATED_OBJECTS
log.message("Final objects allocated so far: #{ObjectSpace.allocated_objects}")
elsif OBJECT_SPACE_SUPPORTS_COUNT_OBJECTS
count = ObjectSpace.count_objects
log.message("Final objects allocated so far: #{count[:TOTAL] - count[:FREE]}")
end
if GC_SUPPORTS_TIME
log.message("Final GC time: #{GC.time}")
end
if GC_SUPPORTS_CLEAR_STATS
# Clear statistics to void integer wraps.
GC.clear_stats
end
Thread.current[PASSENGER_ANALYTICS_WEB_LOG] = nil
rescue Exception
# Maybe this exception was raised while communicating
# with the logging agent. If that is the case then
# log.close may also raise an exception, but we're only
# interested in the original exception. So if this
# situation occurs we must ignore any exceptions raised
# by log.close.
exception_occurred = true
raise
ensure
# It is important that the following call receives an ACK
# from the logging agent and that we don't close the socket
# connection until the ACK has been received, otherwise
# the helper agent may close the transaction before this
# process's openTransaction command is processed.
begin
log.close
rescue
raise if !exception_occurred
end
end
end
#################
end
def log_analytics_exception(env, exception)
log = @analytics_logger.new_transaction(
@app_group_name,
:exceptions,
env[PASSENGER_UNION_STATION_KEY])
begin
request_txn_id = env[PASSENGER_TXN_ID]
message = exception.message
message = exception.to_s if message.empty?
message = [message].pack('m')
message.gsub!("\n", "")
backtrace_string = [exception.backtrace.join("\n")].pack('m')
backtrace_string.gsub!("\n", "")
log.message("Request transaction ID: #{request_txn_id}")
log.message("Message: #{message}")
log.message("Class: #{exception.class.name}")
log.message("Backtrace: #{backtrace_string}")
ensure
log.close
end
end
def should_reraise_error?(e)
# Stubable by unit tests.
return true
end
def should_reraise_app_error?(e, socket_wrapper)
return false
end
def should_swallow_app_error?(e, socket_wrapper)
return socket_wrapper && socket_wrapper.source_of_exception?(e) && e.is_a?(Errno::EPIPE)
end
end
end # class RequestHandler
end # module PhusionPassenger