require 'eventmachine'
require 'goliath/constants'
require 'goliath/response'
require 'goliath/validation'
require 'async_rack'
require 'stringio'
module Goliath
# Goliath::Request is responsible for processing a request and returning
# the result back to the client.
#
# @private
class Request
include EM::Deferrable
include Constants
attr_accessor :app, :conn, :env, :response, :body
class << self
##
# Allow user to redefine how fibers are handled, the
# default is to spawn a new fiber each time but another
# option is to use a pool of fibers.
#
attr_accessor :execute_block
##
# Allow users to redefine what exactly is logged
#
attr_accessor :log_block
end
self.log_block = proc do |env, response, elapsed_time|
env[RACK_LOGGER].info("Status: #{response.status}, " +
"Content-Length: #{response.headers['Content-Length']}, " +
"Response Time: #{"%.2f" % elapsed_time}ms")
end
self.execute_block = proc do |&block|
Fiber.new(&block).resume
end
def initialize(app, conn, env)
@app = app
@conn = conn
@env = env
@response = Goliath::Response.new
@body = StringIO.new(INITIAL_BODY.dup)
@env[RACK_INPUT] = body
@env[ASYNC_CALLBACK] = method(:post_process)
@env[STREAM_SEND] = proc { |data| callback { @conn.send_data(data) } }
@env[STREAM_CLOSE] = proc { callback { @conn.terminate_request(false) } }
@env[STREAM_START] = proc do |status, headers|
callback do
@response.status = status
@response.headers = headers
@conn.send_data(@response.head)
@conn.send_data(@response.headers_output)
end
end
@state = :processing
end
# Invoked by connection when header parsing is complete.
# This method is invoked only once per request.
#
# @param h [Hash] Request headers
# @param parser [Http::Parser] The parser used to parse the request
# @return [Nil]
def parse_header(h, parser)
h.each do |k, v|
@env[HTTP_PREFIX + k.gsub('-','_').upcase] = v
end
%w(CONTENT_TYPE CONTENT_LENGTH).each do |name|
@env[name] = @env.delete("HTTP_#{name}") if @env["HTTP_#{name}"]
end
if @env['HTTP_HOST']
name, port = @env['HTTP_HOST'].split(':')
@env[SERVER_NAME] = name if name
@env[SERVER_PORT] = port if port
end
@env[REQUEST_METHOD] = parser.http_method
@env[REQUEST_URI] = parser.request_url
@env[QUERY_STRING] = parser.query_string
@env[HTTP_VERSION] = parser.http_version.join('.')
@env[SCRIPT_NAME] = parser.request_path
@env[REQUEST_PATH] = parser.request_path
@env[PATH_INFO] = parser.request_path
@env[FRAGMENT] = parser.fragment
yield if block_given?
begin
@env[ASYNC_HEADERS].call(@env, h) if @env[ASYNC_HEADERS]
rescue Exception => e
server_exception(e)
end
end
# Invoked by connection when new body data is
# parsed from the existing TCP stream.
#
# @note In theory, we can make this stream the
# data into the processing step for async
# uploads, etc. This would also require additional
# callbacks for headers, etc.. Maybe something to
# explore later.
#
# @param data [String] The received data
# @return [Nil]
def parse(data)
begin
if @env[ASYNC_BODY]
@env[ASYNC_BODY].call(@env, data)
else
@body << data
end
rescue Exception => e
server_exception(e)
end
end
# Called to determine if the request has received all data from the client
#
# @return [Boolean] True if all data is received, false otherwise
def finished?
@state == :finished
end
# Invoked by connection when upstream client
# terminates the current TCP session.
#
# @return [Nil]
def close
@response.close rescue nil
begin
@env[ASYNC_CLOSE].call(@env) if @env[ASYNC_CLOSE]
rescue Exception => e
@env[RACK_LOGGER].error("on_close Exception: #{e.class}, message: #{e.message}")
end
end
# Invoked by connection when the parsing of the
# HTTP request and body complete. From this point
# all synchronous middleware will run until either
# an immediate response is served, or an async
# response is indicated.
#
# @return [Nil]
def process
Goliath::Request.execute_block.call do
begin
@state = :finished
@env['rack.input'].rewind if @env['rack.input']
post_process(@app.call(@env))
rescue Exception => e
server_exception(e)
end
end
end
# Invoked by the app / middleware once the request
# is complete. A special async code is returned if
# the response is not ready yet.
#
# Sending of the data is deferred until the request
# is marked as ready to push data by the connection.
# Hence, two pipelined requests can come in via same
# connection, first can take 1s to render, while
# second may take 0.5. Because HTTP spec does not
# allow for interleaved data exchange, we block the
# second request until the first one is done and the
# data is sent.
#
# However, processing on the server is done in parallel
# so the actual time to serve both requests in scenario
# above, should be ~1s + data transfer time.
#
# @param results [Array] The status, headers and body to return to the client
# @return [Nil]
def post_process(results)
begin
status, headers, body = results
return if status && status == Goliath::Connection::AsyncResponse.first
callback do
begin
@response.status, @response.headers, @response.body = status, headers, body
@response.each { |chunk| @conn.send_data(chunk) }
elapsed_time = (Time.now.to_f - @env[:start_time]) * 1000
begin
Goliath::Request.log_block.call(@env, @response, elapsed_time)
rescue => err
# prevent an infinite loop if the block raised an error
@env[RACK_LOGGER].error("log block raised #{err}")
end
@conn.terminate_request(keep_alive)
rescue Exception => e
server_exception(e)
end
end
rescue Exception => e
server_exception(e)
end
end
private
# Handles logging server exceptions
#
# @param e [Exception] The exception to log
# @return [Nil]
def server_exception(e)
if e.is_a?(Goliath::Validation::Error)
status, headers, body = [e.status_code, {}, ('{"error":"%s"}' % e.message)]
else
@env[RACK_LOGGER].error("#{e.message}\n#{e.backtrace.join("\n")}")
message = Goliath.env?(:production) ? 'An error happened' : e.message
status, headers, body = [500, {}, message]
end
headers['Content-Length'] = body.bytesize.to_s
@env[:terminate_connection] = true
post_process([status, headers, body])
# Mark the request as complete to force a flush on the response.
# Note: #on_body and #response hooks may still fire if the data
# is already in the parser buffer.
succeed
end
# Used to determine if the connection should be kept open
#
# @return [Boolean] True to keep the connection open, false otherwise
def keep_alive
return false if @env[:terminate_connection]
case @env[HTTP_VERSION]
# HTTP 1.1: all requests are persistent requests, client
# must send a Connection:close header to indicate otherwise
when '1.1' then
(@env[HTTP_PREFIX + CONNECTION].downcase != 'close') rescue true
# HTTP 1.0: all requests are non keep-alive, client must
# send a Connection: Keep-Alive to indicate otherwise
when '1.0' then
(@env[HTTP_PREFIX + CONNECTION].downcase == 'keep-alive') rescue false
end
end
end
end