class Protocol::HTTP2::Connection
def [] id
def [] id if id.zero? self else @streams[id] end end
def accept_push_promise_stream(stream_id, &block)
On the client side, we accept push promise streams.
Accept an incoming push promise from the other side of the connection.
def accept_push_promise_stream(stream_id, &block) accept_stream(stream_id, &block) end
def accept_stream(stream_id, &block)
Accept an incoming stream from the other side of the connnection.
def accept_stream(stream_id, &block) unless valid_remote_stream_id?(stream_id) raise ProtocolError, "Invalid stream id: #{stream_id}" end create_stream(stream_id, &block) end
def client_stream_id?(id)
def client_stream_id?(id) id.odd? end
def close(error = nil)
def close(error = nil) # The underlying socket may already be closed by this point. @streams.each_value{|stream| stream.close(error)} @streams.clear ensure if @framer @framer.close @framer = nil end end
def close!
def close! @state = :closed return self end
def closed?
def closed? @state == :closed || @framer.nil? end
def closed_stream_id?(id)
def closed_stream_id?(id) if id.zero? # The connection "stream id" can never be closed: false else !idle_stream_id?(id) end end
def consume_window(size = self.available_size)
-
amount(Integer) -- the amount of data to write. Defaults to the current window capacity.
def consume_window(size = self.available_size) # Return if there is no window to consume: return unless size > 0 # Console.info(self) do |buffer| # @dependencies.each do |id, dependency| # buffer.puts "- #{dependency}" # end # # buffer.puts # # @dependency.print_hierarchy(buffer) # end @dependency.consume_window(size) end
def create_push_promise_stream(&block)
def create_push_promise_stream(&block) create_stream(&block) end
def create_stream(id = next_stream_id, &block)
-
(Stream)- the created stream.
def create_stream(id = next_stream_id, &block) if @streams.key?(id) raise ProtocolError, "Cannot create stream with id #{id}, already exists!" end if block_given? return yield(self, id) else return Stream.create(self, id) end end
def decode_headers(data)
def decode_headers(data) HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode end
def delete(id)
def delete(id) @streams.delete(id) @dependencies[id]&.delete! end
def encode_headers(headers, buffer = String.new.b)
def encode_headers(headers, buffer = String.new.b) HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers) end
def id
def id 0 end
def idle_stream_id?(id)
def idle_stream_id?(id) if id.even? # Server-initiated streams are even. if @local_stream_id.even? id >= @local_stream_id else id > @remote_stream_id end elsif id.odd? # Client-initiated streams are odd. if @local_stream_id.odd? id >= @local_stream_id else id > @remote_stream_id end end end
def ignore_frame?(frame)
There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client.
6.8. GOAWAY
def ignore_frame?(frame) if self.closed? # puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}" if valid_remote_stream_id?(frame.stream_id) return frame.stream_id > @remote_stream_id end end end
def initialize(framer, local_stream_id)
def initialize(framer, local_stream_id) super() @state = :new # Hash(Integer, Stream) @streams = {} # Hash(Integer, Dependency) @dependency = Dependency.new(self, 0) @dependencies = {0 => @dependency} @framer = framer # The next stream id to use: @local_stream_id = local_stream_id # The biggest remote stream id seen thus far: @remote_stream_id = 0 @local_settings = PendingSettings.new @remote_settings = Settings.new @decoder = HPACK::Context.new @encoder = HPACK::Context.new @local_window = LocalWindow.new @remote_window = Window.new end
def maximum_concurrent_streams
The maximum number of concurrent streams that this connection can initiate. This is a setting that can be changed by the remote peer.
def maximum_concurrent_streams @remote_settings.maximum_concurrent_streams end
def maximum_frame_size
def maximum_frame_size @remote_settings.maximum_frame_size end
def next_stream_id
def next_stream_id id = @local_stream_id @local_stream_id += 2 return id end
def open!
def open! @state = :open return self end
def process_settings(frame)
-
(Boolean)- whether the frame was an acknowledgement
def process_settings(frame) if frame.acknowledgement? # The remote end has confirmed the settings have been received: changes = @local_settings.acknowledge update_local_settings(changes) return true else # The remote end is updating the settings, we reply with acknowledgement: reply = frame.acknowledge write_frame(reply) changes = frame.unpack @remote_settings.update(changes) update_remote_settings(changes) return false end end
def read_frame
def read_frame frame = @framer.read_frame(@local_settings.maximum_frame_size) # puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})" # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}" return if ignore_frame?(frame) yield frame if block_given? frame.apply(self) return frame rescue GoawayError => error # Go directly to jail. Do not pass go, do not collect $200. raise rescue ProtocolError => error send_goaway(error.code || PROTOCOL_ERROR, error.message) raise rescue HPACK::Error => error send_goaway(COMPRESSION_ERROR, error.message) raise end
def receive_continuation(frame)
def receive_continuation(frame) raise ProtocolError, "Received unexpected continuation: #{frame.class}" end
def receive_data(frame)
def receive_data(frame) update_local_window(frame) if stream = @streams[frame.stream_id] stream.receive_data(frame) elsif closed_stream_id?(frame.stream_id) # This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless. else raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}" end end
def receive_frame(frame)
def receive_frame(frame) # ignore. end
def receive_goaway(frame)
def receive_goaway(frame) # We capture the last stream that was processed. @remote_stream_id, error_code, message = frame.unpack self.close! if error_code != 0 # Shut down immediately. raise GoawayError.new(message, error_code) end end
def receive_headers(frame)
def receive_headers(frame) stream_id = frame.stream_id if stream_id.zero? raise ProtocolError, "Cannot receive headers for stream 0!" end if stream = @streams[stream_id] stream.receive_headers(frame) else if stream_id <= @remote_stream_id raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!" end # We need to validate that we have less streams than the specified maximum: if @streams.size < @local_settings.maximum_concurrent_streams stream = accept_stream(stream_id) @remote_stream_id = stream_id stream.receive_headers(frame) else raise ProtocolError, "Exceeded maximum concurrent streams" end end end
def receive_ping(frame)
def receive_ping(frame) if @state != :closed # This is handled in `read_payload`: # if frame.stream_id != 0 # raise ProtocolError, "Ping received for non-zero stream!" # end unless frame.acknowledgement? reply = frame.acknowledge write_frame(reply) end else raise ProtocolError, "Cannot receive ping in state #{@state}" end end
def receive_priority(frame)
def receive_priority(frame) if dependency = @dependencies[frame.stream_id] dependency.receive_priority(frame) elsif idle_stream_id?(frame.stream_id) Dependency.create(self, frame.stream_id, frame.unpack) end end
def receive_push_promise(frame)
def receive_push_promise(frame) raise ProtocolError, "Unable to receive push promise!" end
def receive_reset_stream(frame)
def receive_reset_stream(frame) if frame.connection? raise ProtocolError, "Cannot reset connection!" elsif stream = @streams[frame.stream_id] stream.receive_reset_stream(frame) elsif closed_stream_id?(frame.stream_id) # Ignore. else raise StreamClosed, "Cannot reset stream #{frame.stream_id}" end end
def receive_settings(frame)
def receive_settings(frame) if @state == :new # We transition to :open when we receive acknowledgement of first settings frame: open! if process_settings(frame) elsif @state != :closed process_settings(frame) else raise ProtocolError, "Cannot receive settings in state #{@state}" end end
def receive_window_update(frame)
def receive_window_update(frame) if frame.connection? super self.consume_window elsif stream = @streams[frame.stream_id] begin stream.receive_window_update(frame) rescue ProtocolError => error stream.send_reset_stream(error.code) end elsif closed_stream_id?(frame.stream_id) # Ignore. else # Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR. raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}" end end
def send_goaway(error_code = 0, message = "")
def send_goaway(error_code = 0, message = "") frame = GoawayFrame.new frame.pack @remote_stream_id, error_code, message write_frame(frame) ensure self.close! end
def send_ping(data)
def send_ping(data) if @state != :closed frame = PingFrame.new frame.pack data write_frame(frame) else raise ProtocolError, "Cannot send ping in state #{@state}" end end
def send_priority(stream_id, priority)
def send_priority(stream_id, priority) frame = PriorityFrame.new(stream_id) frame.pack(priority) write_frame(frame) end
def send_settings(changes)
def send_settings(changes) @local_settings.append(changes) frame = SettingsFrame.new frame.pack(changes) write_frame(frame) end
def server_stream_id?(id)
def server_stream_id?(id) id.even? end
def synchronize
def synchronize yield end
def update_local_settings(changes)
def update_local_settings(changes) capacity = @local_settings.initial_window_size @streams.each_value do |stream| stream.local_window.capacity = capacity end @local_window.desired = capacity end
def update_remote_settings(changes)
def update_remote_settings(changes) capacity = @remote_settings.initial_window_size @streams.each_value do |stream| stream.remote_window.capacity = capacity end end
def valid_remote_stream_id?(stream_id)
def valid_remote_stream_id?(stream_id) false end
def write_frame(frame)
def write_frame(frame) synchronize do @framer.write_frame(frame) end @framer.flush end
def write_frames
def write_frames if @framer synchronize do yield @framer end @framer.flush else raise EOFError, "Connection closed!" end end