lib/httpx/connection/http2.rb



# frozen_string_literal: true

require "io/wait"
require "http/2/next"

module HTTPX
  class Connection::HTTP2
    include Callbacks
    include Loggable

    Error = Class.new(Error) do
      def initialize(id, code)
        super("stream #{id} closed with error: #{code}")
      end
    end

    attr_reader :streams, :pending

    def initialize(buffer, options)
      @options = Options.new(options)
      @max_concurrent_requests = @options.max_concurrent_requests
      @pending = []
      @streams = {}
      @drains  = {}
      @buffer = buffer
      @handshake_completed = false
      init_connection
    end

    def close
      @connection.goaway
    end

    def empty?
      @connection.state == :closed || @streams.empty?
    end

    def <<(data)
      @connection << data
    end

    def send(request, **)
      if !@handshake_completed ||
         @streams.size >= @max_concurrent_requests
        @pending << request
        return
      end
      unless (stream = @streams[request])
        stream = @connection.new_stream
        handle_stream(stream, request)
        @streams[request] = stream
      end
      handle(request, stream)
      true
    end

    def consume
      @streams.each do |request, stream|
        handle(request, stream)
      end
    end

    def handle_error(ex)
      @streams.each_key do |request|
        emit(:error, request, ex)
      end
      @pending.each do |request|
        emit(:error, request, ex)
      end
    end

    private

    def send_pending
      while (request = @pending.shift)
        break unless send(request)
      end
    end

    def headline_uri(request)
      request.path
    end

    def set_request_headers(request); end

    def handle(request, stream)
      catch(:buffer_full) do
        request.transition(:headers)
        join_headers(stream, request) if request.state == :headers
        request.transition(:body)
        join_body(stream, request) if request.state == :body
        request.transition(:done)
      end
    end

    def init_connection
      @connection = HTTP2Next::Client.new(@options.http2_settings)
      @connection.on(:frame, &method(:on_frame))
      @connection.on(:frame_sent, &method(:on_frame_sent))
      @connection.on(:frame_received, &method(:on_frame_received))
      @connection.on(:origin, &method(:on_origin))
      @connection.on(:promise, &method(:on_promise))
      @connection.on(:altsvc) { |frame| on_altsvc(frame[:origin], frame) }
      @connection.on(:settings_ack, &method(:on_settings))
      @connection.on(:goaway, &method(:on_close))
      #
      # Some servers initiate HTTP/2 negotiation right away, some don't.
      # As such, we have to check the socket buffer. If there is something
      # to read, the server initiated the negotiation. If not, we have to
      # initiate it.
      #
      @connection.send_connection_preface
    end

    def handle_stream(stream, request)
      stream.on(:close, &method(:on_stream_close).curry[stream, request])
      stream.on(:half_close) do
        log(level: 2, label: "#{stream.id}: ") { "waiting for response..." }
      end
      stream.on(:altsvc, &method(:on_altsvc).curry[request.origin])
      stream.on(:headers, &method(:on_stream_headers).curry[stream, request])
      stream.on(:data, &method(:on_stream_data).curry[stream, request])
    end

    def join_headers(stream, request)
      set_request_headers(request)
      headers = {}
      headers[":scheme"]    = request.scheme
      headers[":method"]    = request.verb.to_s.upcase
      headers[":path"]      = headline_uri(request)
      headers[":authority"] = request.authority
      headers = headers.merge(request.headers)
      log(level: 1, label: "#{stream.id}: ", color: :yellow) do
        headers.map { |k, v| "-> HEADER: #{k}: #{v}" }.join("\n")
      end
      stream.headers(headers, end_stream: request.empty?)
    end

    def join_body(stream, request)
      return if request.empty?

      chunk = @drains.delete(request) || request.drain_body
      while chunk
        next_chunk = request.drain_body
        log(level: 1, label: "#{stream.id}: ", color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
        log(level: 2, label: "#{stream.id}: ", color: :green) { "-> #{chunk.inspect}" }
        stream.data(chunk, end_stream: !next_chunk)
        if next_chunk && @buffer.full?
          @drains[request] = next_chunk
          throw(:buffer_full)
        end
        chunk = next_chunk
      end
    end

    ######
    # HTTP/2 Callbacks
    ######

    def on_stream_headers(stream, request, h)
      log(label: "#{stream.id}:", color: :yellow) do
        h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n")
      end
      _, status = h.shift
      headers = request.options.headers_class.new(h)
      response = request.options.response_class.new(request, status, "2.0", headers)
      request.response = response
      @streams[request] = stream
    end

    def on_stream_data(stream, request, data)
      log(level: 1, label: "#{stream.id}: ", color: :green) { "<- DATA: #{data.bytesize} bytes..." }
      log(level: 2, label: "#{stream.id}: ", color: :green) { "<- #{data.inspect}" }
      request.response << data
    end

    def on_stream_close(stream, request, error)
      return handle(request, stream) if request.expects?

      if error && error != :no_error
        ex = Error.new(stream.id, error)
        ex.set_backtrace(caller)
        emit(:error, request, ex)
      else
        response = request.response
        if response.status == 421
          ex = MisdirectedRequestError.new(response)
          ex.set_backtrace(caller)
          emit(:error, request, ex)
        else
          emit(:response, request, response)
        end
      end
      log(level: 2, label: "#{stream.id}: ") { "closing stream" }

      @streams.delete(request)
      send(@pending.shift) unless @pending.empty?
    end

    def on_frame(bytes)
      @buffer << bytes
    end

    def on_settings(*)
      @handshake_completed = true
      @max_concurrent_requests = [@max_concurrent_requests,
                                  @connection.remote_settings[:settings_max_concurrent_streams]].min
      send_pending
    end

    def on_close(_last_frame, error, _payload)
      if error && error != :no_error
        ex = Error.new(0, error)
        ex.set_backtrace(caller)
        @streams.each_key do |request|
          emit(:error, request, ex)
        end
      end
      return unless @connection.state == :closed && @streams.size.zero?

      emit(:close)
    end

    def on_frame_sent(frame)
      log(level: 2, label: "#{frame[:stream]}: ") { "frame was sent!" }
      log(level: 2, label: "#{frame[:stream]}: ", color: :blue) do
        case frame[:type]
        when :data
          frame.merge(payload: frame[:payload].bytesize).inspect
        else
          frame.inspect
        end
      end
    end

    def on_frame_received(frame)
      log(level: 2, label: "#{frame[:stream]}: ") { "frame was received!" }
      log(level: 2, label: "#{frame[:stream]}: ", color: :magenta) do
        case frame[:type]
        when :data
          frame.merge(payload: frame[:payload].bytesize).inspect
        else
          frame.inspect
        end
      end
    end

    def on_altsvc(origin, frame)
      log(level: 2, label: "#{frame[:stream]}: ") { "altsvc frame was received" }
      log(level: 2, label: "#{frame[:stream]}: ") { frame.inspect }
      alt_origin = URI.parse("#{frame[:proto]}://#{frame[:host]}:#{frame[:port]}")
      params = { "ma" => frame[:max_age] }
      emit(:altsvc, origin, alt_origin, origin, params)
    end

    def on_promise(stream)
      emit(:promise, @streams.key(stream.parent), stream)
    end

    def on_origin(origin)
      emit(:origin, origin)
    end

    def respond_to_missing?(meth, *args)
      @connection.respond_to?(meth, *args) || super
    end

    def method_missing(meth, *args, &blk)
      if @connection.respond_to?(meth)
        @connection.__send__(meth, *args, &blk)
      else
        super
      end
    end
  end
  Connection.register "h2", Connection::HTTP2
end