# frozen_string_literal: true
require "io/wait"
require "http/2/next"
module HTTPX
class Connection::HTTP2
include Callbacks
include Loggable
Error = Class.new(Error) do
def initialize(id, code)
super("stream #{id} closed with error: #{code}")
end
end
attr_reader :streams, :pending
def initialize(buffer, options)
@options = Options.new(options)
@max_concurrent_requests = @options.max_concurrent_requests
@pending = []
@streams = {}
@drains = {}
@buffer = buffer
@handshake_completed = false
init_connection
end
def close
@connection.goaway
end
def empty?
@connection.state == :closed || @streams.empty?
end
def <<(data)
@connection << data
end
def send(request, **)
if !@handshake_completed ||
@streams.size >= @max_concurrent_requests
@pending << request
return
end
unless (stream = @streams[request])
stream = @connection.new_stream
handle_stream(stream, request)
@streams[request] = stream
end
handle(request, stream)
true
end
def consume
@streams.each do |request, stream|
handle(request, stream)
end
end
def handle_error(ex)
@streams.each_key do |request|
emit(:error, request, ex)
end
@pending.each do |request|
emit(:error, request, ex)
end
end
private
def send_pending
while (request = @pending.shift)
break unless send(request)
end
end
def headline_uri(request)
request.path
end
def set_request_headers(request); end
def handle(request, stream)
catch(:buffer_full) do
request.transition(:headers)
join_headers(stream, request) if request.state == :headers
request.transition(:body)
join_body(stream, request) if request.state == :body
request.transition(:done)
end
end
def init_connection
@connection = HTTP2Next::Client.new(@options.http2_settings)
@connection.on(:frame, &method(:on_frame))
@connection.on(:frame_sent, &method(:on_frame_sent))
@connection.on(:frame_received, &method(:on_frame_received))
@connection.on(:origin, &method(:on_origin))
@connection.on(:promise, &method(:on_promise))
@connection.on(:altsvc) { |frame| on_altsvc(frame[:origin], frame) }
@connection.on(:settings_ack, &method(:on_settings))
@connection.on(:goaway, &method(:on_close))
#
# Some servers initiate HTTP/2 negotiation right away, some don't.
# As such, we have to check the socket buffer. If there is something
# to read, the server initiated the negotiation. If not, we have to
# initiate it.
#
@connection.send_connection_preface
end
def handle_stream(stream, request)
stream.on(:close, &method(:on_stream_close).curry[stream, request])
stream.on(:half_close) do
log(level: 2, label: "#{stream.id}: ") { "waiting for response..." }
end
stream.on(:altsvc, &method(:on_altsvc).curry[request.origin])
stream.on(:headers, &method(:on_stream_headers).curry[stream, request])
stream.on(:data, &method(:on_stream_data).curry[stream, request])
end
def join_headers(stream, request)
set_request_headers(request)
headers = {}
headers[":scheme"] = request.scheme
headers[":method"] = request.verb.to_s.upcase
headers[":path"] = headline_uri(request)
headers[":authority"] = request.authority
headers = headers.merge(request.headers)
log(level: 1, label: "#{stream.id}: ", color: :yellow) do
headers.map { |k, v| "-> HEADER: #{k}: #{v}" }.join("\n")
end
stream.headers(headers, end_stream: request.empty?)
end
def join_body(stream, request)
return if request.empty?
chunk = @drains.delete(request) || request.drain_body
while chunk
next_chunk = request.drain_body
log(level: 1, label: "#{stream.id}: ", color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
log(level: 2, label: "#{stream.id}: ", color: :green) { "-> #{chunk.inspect}" }
stream.data(chunk, end_stream: !next_chunk)
if next_chunk && @buffer.full?
@drains[request] = next_chunk
throw(:buffer_full)
end
chunk = next_chunk
end
end
######
# HTTP/2 Callbacks
######
def on_stream_headers(stream, request, h)
log(label: "#{stream.id}:", color: :yellow) do
h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n")
end
_, status = h.shift
headers = request.options.headers_class.new(h)
response = request.options.response_class.new(request, status, "2.0", headers)
request.response = response
@streams[request] = stream
end
def on_stream_data(stream, request, data)
log(level: 1, label: "#{stream.id}: ", color: :green) { "<- DATA: #{data.bytesize} bytes..." }
log(level: 2, label: "#{stream.id}: ", color: :green) { "<- #{data.inspect}" }
request.response << data
end
def on_stream_close(stream, request, error)
return handle(request, stream) if request.expects?
if error && error != :no_error
ex = Error.new(stream.id, error)
ex.set_backtrace(caller)
emit(:error, request, ex)
else
response = request.response
if response.status == 421
ex = MisdirectedRequestError.new(response)
ex.set_backtrace(caller)
emit(:error, request, ex)
else
emit(:response, request, response)
end
end
log(level: 2, label: "#{stream.id}: ") { "closing stream" }
@streams.delete(request)
send(@pending.shift) unless @pending.empty?
end
def on_frame(bytes)
@buffer << bytes
end
def on_settings(*)
@handshake_completed = true
@max_concurrent_requests = [@max_concurrent_requests,
@connection.remote_settings[:settings_max_concurrent_streams]].min
send_pending
end
def on_close(_last_frame, error, _payload)
if error && error != :no_error
ex = Error.new(0, error)
ex.set_backtrace(caller)
@streams.each_key do |request|
emit(:error, request, ex)
end
end
return unless @connection.state == :closed && @streams.size.zero?
emit(:close)
end
def on_frame_sent(frame)
log(level: 2, label: "#{frame[:stream]}: ") { "frame was sent!" }
log(level: 2, label: "#{frame[:stream]}: ", color: :blue) do
case frame[:type]
when :data
frame.merge(payload: frame[:payload].bytesize).inspect
else
frame.inspect
end
end
end
def on_frame_received(frame)
log(level: 2, label: "#{frame[:stream]}: ") { "frame was received!" }
log(level: 2, label: "#{frame[:stream]}: ", color: :magenta) do
case frame[:type]
when :data
frame.merge(payload: frame[:payload].bytesize).inspect
else
frame.inspect
end
end
end
def on_altsvc(origin, frame)
log(level: 2, label: "#{frame[:stream]}: ") { "altsvc frame was received" }
log(level: 2, label: "#{frame[:stream]}: ") { frame.inspect }
alt_origin = URI.parse("#{frame[:proto]}://#{frame[:host]}:#{frame[:port]}")
params = { "ma" => frame[:max_age] }
emit(:altsvc, origin, alt_origin, origin, params)
end
def on_promise(stream)
emit(:promise, @streams.key(stream.parent), stream)
end
def on_origin(origin)
emit(:origin, origin)
end
def respond_to_missing?(meth, *args)
@connection.respond_to?(meth, *args) || super
end
def method_missing(meth, *args, &blk)
if @connection.respond_to?(meth)
@connection.__send__(meth, *args, &blk)
else
super
end
end
end
Connection.register "h2", Connection::HTTP2
end