lib/tp2/http1_connection.rb



# frozen_string_literal: true

require 'qeweney'
require 'stringio'

module TP2
  class HTTP1Connection
    attr_reader :fd

    def initialize(machine, fd, opts, &app)
      @machine = machine
      @fd = fd
      @opts = opts
      @logger = opts[:logger]
      @stream = UM::Stream.new(machine, fd)
      @app = app

      @done = nil
      @response_headers = nil
    end

    def run
      loop do
        @done = nil
        @response_headers = nil
        persist = serve_request
        break if !persist
      end
    rescue UM::Terminate
      # server is terminated, do nothing
    rescue StandardError => e
      @logger&.call(e)
    ensure
      @machine.close_async(@fd)
    end

    # Returns true if connection should persist
    def serve_request
      headers = parse_headers
      return false if !headers

      request = Qeweney::Request.new(headers, self)
      @app.call(request)
      persist_connection?(headers)
    rescue ProtocolError => e
      msg = "Protocol error: #{e.message}. Closing connection..."
      @logger&.call(msg)
      false
    rescue SystemCallError => e
      msg = "I/O error: #{e.class} #{e.message}. Closing connection..."
      @logger&.call(msg)
      false
    rescue StandardError => e
      msg = "Internal error while serving request: #{e.class} #{e.message} (#{e.backtrace.inspect}). Abandoning connection..."
      @logger&.call(msg)
      if request && !@done
        respond(request, 'Internal server error', ':status' => Qeweney::Status::INTERNAL_SERVER_ERROR)
      end
      false
    end

    def get_body(req)
      headers = req.headers
      content_length = headers['content-length']
      return read(content_length.to_i) if content_length

      chunked_encoding = headers['transfer-encoding']&.downcase == 'chunked'
      return get_body_chunked_encoding(headers) if chunked_encoding

      # if content-length is not specified, we read to EOF, up to max 1MB size
      read(1 << 20, nil, false)
    end

    def get_body_chunk(req, _buffered_only = false)
      headers = req.headers
      content_length = headers['content-length']
      if content_length
        return nil if headers[':body-done-reading']

        chunk = read(content_length.to_i)
        headers[':body-done-reading'] = true
        return chunk
      end

      chunked_encoding = headers['transfer-encoding']&.downcase == 'chunked'
      return read_chunk(headers, nil) if chunked_encoding

      return nil if headers[':body-done-reading']

      # if content-length is not specified, we read to EOF, up to max 1MB size
      chunk = read(1 << 20, nil, false)
      headers[':body-done-reading'] = true
      chunk
    end

    def complete?(req)
      req.headers[':body-done-reading']
    end

    # response API

    SEND_FLAGS = UM::MSG_NOSIGNAL | UM::MSG_WAITALL

    # Sends response including headers and body. Waits for the request to complete
    # if not yet completed. The body is sent using chunked transfer encoding.
    # @param request [Qeweney::Request] HTTP request
    # @param body [String] response body
    # @param headers
    def respond(request, body, headers)
      formatted_headers = format_headers(headers, body, false)
      request.tx_incr(formatted_headers.bytesize + (body ? body.bytesize : 0))
      if body
        buf = formatted_headers + body
        @machine.send(@fd, buf, buf.bytesize, SEND_FLAGS)
        # handle_write(formatted_headers + body)
      else
        @machine.send(@fd, formatted_headers, formatted_headers.bytesize, SEND_FLAGS)
      end
      @logger&.call(request, headers)
      @done = true
      @response_headers = headers
    end

    # Sends response headers. If empty_response is truthy, the response status
    # code will default to 204, otherwise to 200.
    # @param request [Qeweney::Request] HTTP request
    # @param headers [Hash] response headers
    # @param empty_response [boolean] whether a response body will be sent
    # @param chunked [boolean] whether to use chunked transfer encoding
    # @return [void]
    def send_headers(request, headers, empty_response: false, chunked: true)
      formatted_headers = format_headers(headers, !empty_response, http1_1?(request) && chunked)
      request.tx_incr(formatted_headers.bytesize)
      @machine.send(@fd, formatted_headers, formatted_headers.bytesize, SEND_FLAGS)
      @response_headers = headers
    end

    EMPTY_CHUNK = "0\r\n\r\n"
    EMPTY_CHUNK_LEN = EMPTY_CHUNK.bytesize

    # Sends a response body chunk. If no headers were sent, default headers are
    # sent using #send_headers. if the done option is true(thy), an empty chunk
    # will be sent to signal response completion to the client.
    # @param request [Qeweney::Request] HTTP request
    # @param chunk [String] response body chunk
    # @param done [boolean] whether the response is completed
    # @return [void]
    def send_chunk(request, chunk, done: false)
      data = +''
      data << "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n" if chunk
      data << EMPTY_CHUNK if done
      return if data.empty?

      request.tx_incr(data.bytesize)
      @machine.send(@fd, data, data.bytesize, SEND_FLAGS)
      return if @done || !done

      @logger&.call(request, @response_headers)
      @done = true
    end

    # Finishes the response to the current request. If no headers were sent,
    # default headers are sent using #send_headers.
    # @return [void]
    def finish(request)
      request.tx_incr(EMPTY_CHUNK_LEN)
      @machine.send(@fd, EMPTY_CHUNK, EMPTY_CHUNK_LEN, SEND_FLAGS)
      return if @done

      @logger&.call(request, @response_headers)
      @done = true
    end

    def respond_with_static_file(req, path, opts, cache_headers)
      fd = @machine.open(path, UM::O_RDONLY)
      opts ||= {}
      if opts[:headers]
        opts[:headers].merge!(cache_headers)
      else
        opts[:headers] = cache_headers
      end

      maxlen = opts[:max_len] || 65_536
      buf = String.new(capacity: maxlen)
      headers_sent = nil
      loop do
        res = @machine.read(fd, buf, maxlen, 0)
        if res < maxlen && !headers_sent
          return respond(req, buf, opts[:headers])
        elsif res == 0
          return finish(req)
        end

        if !headers_sent
          send_headers(req, opts[:headers])
          headers_sent = true
        end
        done = res < maxlen
        send_chunk(req, buf, done: done)
        return if done
      end
    ensure
      @machine.close(fd) if fd
    end

    def close
      @machine.close_async(@fd)
    end

    private

    RE_REQUEST_LINE = %r{^([a-z]+)\s+([^\s]+)\s+(http/[0-9.]{1,3})}i
    RE_HEADER_LINE = /^([a-z0-9-]+):\s+(.+)/i
    MAX_REQUEST_LINE_LEN = 1 << 14 # 16KB
    MAX_HEADER_LINE_LEN = 1 << 10 # 1KB
    MAX_CHUNK_SIZE_LEN = 16

    class ProtocolError < StandardError
    end

    def persist_connection?(headers)
      connection = headers['connection']&.downcase
      return connection != 'close' if headers[':protocol'] == 'http/1.1'

      connection && connection != 'close'
    end

    def parse_headers
      buf = String.new(capacity: 4096)
      headers = get_request_line(buf)
      return nil if !headers

      loop do
        line = @stream.get_line(buf, MAX_HEADER_LINE_LEN)
        break if line.nil? || line.empty?

        m = line.match(RE_HEADER_LINE)
        raise ProtocolError, "Invalid header: #{line[0..2047].inspect}" if !m

        headers[m[1].downcase] = m[2]
      end

      headers
    end

    def get_request_line(buf)
      line = @stream.get_line(buf, MAX_REQUEST_LINE_LEN)
      return nil if !line

      m = line.match(RE_REQUEST_LINE)
      raise ProtocolError, "Invalid request line: #{line[0..2047].inspect}" if !m

      {
        ':method'   => m[1].downcase,
        ':path'     => m[2],
        ':protocol' => m[3].downcase
      }
    end

    def get_body_chunked_encoding(headers)
      buf = String.new(capacity: 65_536)
      while read_chunk(headers, buf)
      end

      buf
    end

    def read(len, buf = nil, raise_on_eof = true)
      str = @stream.get_string(buf, len)
      raise ProtocolError, 'Missing data' if !str && raise_on_eof

      str
    end

    def read_chunk(headers, buffer)
      tmp = String.new(capacity: 256)
      chunk_size_str = @stream.get_line(tmp, MAX_CHUNK_SIZE_LEN)
      return nil if !chunk_size_str

      chunk_size = chunk_size_str.to_i(16)
      if chunk_size == 0
        headers[':body-done-reading'] = true
        @stream.get_line(tmp, 0)
        return nil
      end

      chunk = @stream.get_string(nil, chunk_size)
      @stream.get_line(tmp, 0)

      buffer ? (buffer << chunk) : chunk
    end

    def http1_1?(request)
      request.headers[':protocol'] == 'http/1.1'
    end

    INTERNAL_HEADER_REGEXP = /^:/

    # Formats response headers into an array. If empty_response is true(thy),
    # the response status code will default to 204, otherwise to 200.
    # @param headers [Hash] response headers
    # @param body [boolean] whether a response body will be sent
    # @param chunked [boolean] whether to use chunked transfer encoding
    # @return [String] formatted response headers
    def format_headers(headers, body, chunked)
      status = headers[':status'] || (body ? Qeweney::Status::OK : Qeweney::Status::NO_CONTENT)
      lines = format_status_line(body, status, chunked)
      headers.each do |k, v|
        next if k =~ INTERNAL_HEADER_REGEXP

        collect_header_lines(lines, k, v)
      end
      lines << "\r\n"
      lines
    end

    def format_status_line(body, status, chunked)
      if !body
        empty_status_line(status)
      else
        with_body_status_line(status, body, chunked)
      end
    end

    def empty_status_line(status)
      if status == 204
        +"HTTP/1.1 #{status}\r\n"
      else
        +"HTTP/1.1 #{status}\r\nContent-Length: 0\r\n"
      end
    end

    def with_body_status_line(status, body, chunked)
      if chunked
        +"HTTP/1.1 #{status}\r\nTransfer-Encoding: chunked\r\n"
      else
        +"HTTP/1.1 #{status}\r\nContent-Length: #{body.bytesize}\r\n"
      end
    end

    def collect_header_lines(lines, key, value)
      if value.is_a?(Array)
        value.inject(lines) { |_, item| lines << "#{key}: #{item}\r\n" }
      else
        lines << "#{key}: #{value}\r\n"
      end
    end
  end
end