# frozen_string_literal: true
require 'qeweney'
require 'stringio'
module TP2
class HTTP1Connection
attr_reader :fd
def initialize(machine, fd, opts, &app)
@machine = machine
@fd = fd
@opts = opts
@logger = opts[:logger]
@stream = UM::Stream.new(machine, fd)
@app = app
@done = nil
@response_headers = nil
end
def run
loop do
@done = nil
@response_headers = nil
persist = serve_request
break if !persist
end
rescue UM::Terminate
# server is terminated, do nothing
rescue StandardError => e
@logger&.call(e)
ensure
@machine.close_async(@fd)
end
# Returns true if connection should persist
def serve_request
headers = parse_headers
return false if !headers
request = Qeweney::Request.new(headers, self)
@app.call(request)
persist_connection?(headers)
rescue ProtocolError => e
msg = "Protocol error: #{e.message}. Closing connection..."
@logger&.call(msg)
false
rescue SystemCallError => e
msg = "I/O error: #{e.class} #{e.message}. Closing connection..."
@logger&.call(msg)
false
rescue StandardError => e
msg = "Internal error while serving request: #{e.class} #{e.message} (#{e.backtrace.inspect}). Abandoning connection..."
@logger&.call(msg)
if request && !@done
respond(request, 'Internal server error', ':status' => Qeweney::Status::INTERNAL_SERVER_ERROR)
end
false
end
def get_body(req)
headers = req.headers
content_length = headers['content-length']
return read(content_length.to_i) if content_length
chunked_encoding = headers['transfer-encoding']&.downcase == 'chunked'
return get_body_chunked_encoding(headers) if chunked_encoding
# if content-length is not specified, we read to EOF, up to max 1MB size
read(1 << 20, nil, false)
end
def get_body_chunk(req, _buffered_only = false)
headers = req.headers
content_length = headers['content-length']
if content_length
return nil if headers[':body-done-reading']
chunk = read(content_length.to_i)
headers[':body-done-reading'] = true
return chunk
end
chunked_encoding = headers['transfer-encoding']&.downcase == 'chunked'
return read_chunk(headers, nil) if chunked_encoding
return nil if headers[':body-done-reading']
# if content-length is not specified, we read to EOF, up to max 1MB size
chunk = read(1 << 20, nil, false)
headers[':body-done-reading'] = true
chunk
end
def complete?(req)
req.headers[':body-done-reading']
end
# response API
SEND_FLAGS = UM::MSG_NOSIGNAL | UM::MSG_WAITALL
# Sends response including headers and body. Waits for the request to complete
# if not yet completed. The body is sent using chunked transfer encoding.
# @param request [Qeweney::Request] HTTP request
# @param body [String] response body
# @param headers
def respond(request, body, headers)
formatted_headers = format_headers(headers, body, false)
request.tx_incr(formatted_headers.bytesize + (body ? body.bytesize : 0))
if body
buf = formatted_headers + body
@machine.send(@fd, buf, buf.bytesize, SEND_FLAGS)
# handle_write(formatted_headers + body)
else
@machine.send(@fd, formatted_headers, formatted_headers.bytesize, SEND_FLAGS)
end
@logger&.call(request, headers)
@done = true
@response_headers = headers
end
# Sends response headers. If empty_response is truthy, the response status
# code will default to 204, otherwise to 200.
# @param request [Qeweney::Request] HTTP request
# @param headers [Hash] response headers
# @param empty_response [boolean] whether a response body will be sent
# @param chunked [boolean] whether to use chunked transfer encoding
# @return [void]
def send_headers(request, headers, empty_response: false, chunked: true)
formatted_headers = format_headers(headers, !empty_response, http1_1?(request) && chunked)
request.tx_incr(formatted_headers.bytesize)
@machine.send(@fd, formatted_headers, formatted_headers.bytesize, SEND_FLAGS)
@response_headers = headers
end
EMPTY_CHUNK = "0\r\n\r\n"
EMPTY_CHUNK_LEN = EMPTY_CHUNK.bytesize
# Sends a response body chunk. If no headers were sent, default headers are
# sent using #send_headers. if the done option is true(thy), an empty chunk
# will be sent to signal response completion to the client.
# @param request [Qeweney::Request] HTTP request
# @param chunk [String] response body chunk
# @param done [boolean] whether the response is completed
# @return [void]
def send_chunk(request, chunk, done: false)
data = +''
data << "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n" if chunk
data << EMPTY_CHUNK if done
return if data.empty?
request.tx_incr(data.bytesize)
@machine.send(@fd, data, data.bytesize, SEND_FLAGS)
return if @done || !done
@logger&.call(request, @response_headers)
@done = true
end
# Finishes the response to the current request. If no headers were sent,
# default headers are sent using #send_headers.
# @return [void]
def finish(request)
request.tx_incr(EMPTY_CHUNK_LEN)
@machine.send(@fd, EMPTY_CHUNK, EMPTY_CHUNK_LEN, SEND_FLAGS)
return if @done
@logger&.call(request, @response_headers)
@done = true
end
def respond_with_static_file(req, path, opts, cache_headers)
fd = @machine.open(path, UM::O_RDONLY)
opts ||= {}
if opts[:headers]
opts[:headers].merge!(cache_headers)
else
opts[:headers] = cache_headers
end
maxlen = opts[:max_len] || 65_536
buf = String.new(capacity: maxlen)
headers_sent = nil
loop do
res = @machine.read(fd, buf, maxlen, 0)
if res < maxlen && !headers_sent
return respond(req, buf, opts[:headers])
elsif res == 0
return finish(req)
end
if !headers_sent
send_headers(req, opts[:headers])
headers_sent = true
end
done = res < maxlen
send_chunk(req, buf, done: done)
return if done
end
ensure
@machine.close(fd) if fd
end
def close
@machine.close_async(@fd)
end
private
RE_REQUEST_LINE = %r{^([a-z]+)\s+([^\s]+)\s+(http/[0-9.]{1,3})}i
RE_HEADER_LINE = /^([a-z0-9-]+):\s+(.+)/i
MAX_REQUEST_LINE_LEN = 1 << 14 # 16KB
MAX_HEADER_LINE_LEN = 1 << 10 # 1KB
MAX_CHUNK_SIZE_LEN = 16
class ProtocolError < StandardError
end
def persist_connection?(headers)
connection = headers['connection']&.downcase
return connection != 'close' if headers[':protocol'] == 'http/1.1'
connection && connection != 'close'
end
def parse_headers
buf = String.new(capacity: 4096)
headers = get_request_line(buf)
return nil if !headers
loop do
line = @stream.get_line(buf, MAX_HEADER_LINE_LEN)
break if line.nil? || line.empty?
m = line.match(RE_HEADER_LINE)
raise ProtocolError, "Invalid header: #{line[0..2047].inspect}" if !m
headers[m[1].downcase] = m[2]
end
headers
end
def get_request_line(buf)
line = @stream.get_line(buf, MAX_REQUEST_LINE_LEN)
return nil if !line
m = line.match(RE_REQUEST_LINE)
raise ProtocolError, "Invalid request line: #{line[0..2047].inspect}" if !m
{
':method' => m[1].downcase,
':path' => m[2],
':protocol' => m[3].downcase
}
end
def get_body_chunked_encoding(headers)
buf = String.new(capacity: 65_536)
while read_chunk(headers, buf)
end
buf
end
def read(len, buf = nil, raise_on_eof = true)
str = @stream.get_string(buf, len)
raise ProtocolError, 'Missing data' if !str && raise_on_eof
str
end
def read_chunk(headers, buffer)
tmp = String.new(capacity: 256)
chunk_size_str = @stream.get_line(tmp, MAX_CHUNK_SIZE_LEN)
return nil if !chunk_size_str
chunk_size = chunk_size_str.to_i(16)
if chunk_size == 0
headers[':body-done-reading'] = true
@stream.get_line(tmp, 0)
return nil
end
chunk = @stream.get_string(nil, chunk_size)
@stream.get_line(tmp, 0)
buffer ? (buffer << chunk) : chunk
end
def http1_1?(request)
request.headers[':protocol'] == 'http/1.1'
end
INTERNAL_HEADER_REGEXP = /^:/
# Formats response headers into an array. If empty_response is true(thy),
# the response status code will default to 204, otherwise to 200.
# @param headers [Hash] response headers
# @param body [boolean] whether a response body will be sent
# @param chunked [boolean] whether to use chunked transfer encoding
# @return [String] formatted response headers
def format_headers(headers, body, chunked)
status = headers[':status'] || (body ? Qeweney::Status::OK : Qeweney::Status::NO_CONTENT)
lines = format_status_line(body, status, chunked)
headers.each do |k, v|
next if k =~ INTERNAL_HEADER_REGEXP
collect_header_lines(lines, k, v)
end
lines << "\r\n"
lines
end
def format_status_line(body, status, chunked)
if !body
empty_status_line(status)
else
with_body_status_line(status, body, chunked)
end
end
def empty_status_line(status)
if status == 204
+"HTTP/1.1 #{status}\r\n"
else
+"HTTP/1.1 #{status}\r\nContent-Length: 0\r\n"
end
end
def with_body_status_line(status, body, chunked)
if chunked
+"HTTP/1.1 #{status}\r\nTransfer-Encoding: chunked\r\n"
else
+"HTTP/1.1 #{status}\r\nContent-Length: #{body.bytesize}\r\n"
end
end
def collect_header_lines(lines, key, value)
if value.is_a?(Array)
value.inject(lines) { |_, item| lines << "#{key}: #{item}\r\n" }
else
lines << "#{key}: #{value}\r\n"
end
end
end
end