class Protocol::HTTP2::Stream
State transition methods use a trailing “!”.
- R: RST_STREAM frame
- ES: END_STREAM flag
- PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
- H: HEADERS frame (with implied CONTINUATIONs)
- ‘recv`: endpoint receives this frame
- `send`: endpoint sends this frame
“`
└────────┘
└───────────────────────►│ │◄───────────────────────┘
│ recv R │ closed │ recv R │
│ send R / └───────────►│ │◄───────────┘ send R / │
│ │ recv R ┌────────┐ recv R │ │
│ │ send R / ▼ send R / │ │
│ │ send ES / │ recv ES / │ │
│ │ │ │ │
│ └────┬─────┘ │ └─────┬────┘ │
│ │ (remote) │ │ recv R │ (local) │ │
│ │ closed │ │ send R / │ closed │ │
│ │ half │ │ │ half │ │
│ ┌──────────┐ │ ┌──────────┐ │
│ ▼ ▼ └───┬────┘ ▼ ▼ │
│ │ │ │ │ │ │ │
│ send H │ ┌─────────┤ open ├─────────┐ │ recv H │
│ │ recv ES │ │ send ES │ │
│ │ ┌────────┐ │ │
│ └───┬──────┘ ▼ └──────┬───┘ │
│ │ (local) │ │ │ (remote) │ │
┌──────┼ reserved │ │ recv H │ reserved ├──────┐
│ │ │ send H / │ │
┌──────────┐ │ ┌──────────┐
▼ └───┬────┘ ▼
│ │ │ │
┌──────────┤ idle ├──────────┐
send PP │ │ recv PP
┌────────┐
“`
and response processing.
diagram below) and provide your application logic to handle request
to do is subscribe to appropriate events (marked with “:” prefix in
error management as defined by the HTTP 2.0 specification. All you have
This class encapsulates all of the state, transition, flow-control, and
data can be interleaved and prioritized.
multiple requests and responses can be in flight simultaneously and stream
A single HTTP/2 connection can multiplex multiple streams in parallel:
def self.create(connection, id)
def self.create(connection, id) stream = self.new(connection, id) connection.streams[id] = stream return stream end
def accept_push_promise_stream(stream_id, headers)
def accept_push_promise_stream(stream_id, headers) @connection.accept_push_promise_stream(stream_id) end
def active?
def active? @state != :closed && @state != :idle end
def close(error = nil)
Transition directly to closed state. Do not pass go, do not collect $200.
def close(error = nil) unless closed? @state = :closed self.closed(error) end end
def close!(error_code = nil)
-
error_code
(Integer
) -- the error code if the stream was closed due to a stream reset.
def close!(error_code = nil) @state = :closed @connection.delete(@id) if error_code error = StreamError.new("Stream closed!", error_code) end self.closed(error) return self end
def closed(error = nil)
def closed(error = nil) end
def closed?
def closed? @state == :closed end
def consume_remote_window(frame)
def consume_remote_window(frame) super @connection.consume_remote_window(frame) end
def create_push_promise_stream(headers)
def create_push_promise_stream(headers) @connection.create_push_promise_stream end
def ignore_data(frame)
def ignore_data(frame) # Console.warn(self) {"Received headers in state: #{@state}!"} end
def ignore_headers(frame)
def ignore_headers(frame) .warn(self) {"Received headers in state: #{@state}!"}
def initialize(connection, id, state = :idle)
def initialize(connection, id, state = :idle) @connection = connection @id = id @state = state @local_window = Window.new(@connection.local_settings.initial_window_size) @remote_window = Window.new(@connection.remote_settings.initial_window_size) @dependency = Dependency.create(@connection, @id) end
def inspect
def inspect "\#<#{self.class} id=#{@id} state=#{@state}>" end
def maximum_frame_size
def maximum_frame_size @connection.available_frame_size end
def open!
def open! if @state == :idle @state = :open else raise ProtocolError, "Cannot open stream in state: #{@state}" end return self end
def parent=(stream)
def parent=(stream) @dependency.parent = stream.dependency end
def priority
def priority @dependency.priority end
def priority= priority
def priority= priority @dependency.priority = priority end
def process_data(frame)
-
(String)
- the data that was received.
def process_data(frame) frame.unpack end
def process_headers(frame)
def process_headers(frame) # Receiving request headers: priority, data = frame.unpack if priority @dependency.process_priority(priority) end @connection.decode_headers(data) end
def receive_data(frame)
def receive_data(frame) if @state == :open update_local_window(frame) if frame.end_stream? @state = :half_closed_remote end process_data(frame) elsif @state == :half_closed_local update_local_window(frame) process_data(frame) if frame.end_stream? close! end elsif self.closed? ignore_data(frame) else # If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED. self.send_reset_stream(Error::STREAM_CLOSED) end end
def receive_headers(frame)
def receive_headers(frame) if @state == :idle if frame.end_stream? @state = :half_closed_remote else open! end process_headers(frame) elsif @state == :reserved_remote process_headers(frame) @state = :half_closed_local elsif @state == :open process_headers(frame) if frame.end_stream? @state = :half_closed_remote end elsif @state == :half_closed_local process_headers(frame) if frame.end_stream? close! end elsif self.closed? ignore_headers(frame) else self.send_reset_stream(Error::STREAM_CLOSED) end end
def receive_push_promise(frame)
def receive_push_promise(frame) promised_stream_id, data = frame.unpack headers = @connection.decode_headers(data) stream = self.accept_push_promise_stream(promised_stream_id, headers) stream.parent = self stream.reserved_remote! return stream, headers end
def receive_reset_stream(frame)
def receive_reset_stream(frame) if @state == :idle # If a RST_STREAM frame identifying an idle stream is received, the recipient MUST treat this as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. raise ProtocolError, "Cannot receive reset stream in state: #{@state}!" else error_code = frame.unpack close!(error_code) return error_code end end
def reserved_local!
def reserved_local! if @state == :idle @state = :reserved_local else raise ProtocolError, "Cannot reserve stream in state: #{@state}" end return self end
def reserved_remote!
def reserved_remote! if @state == :idle @state = :reserved_remote else raise ProtocolError, "Cannot reserve stream in state: #{@state}" end return self end
def send_data(*arguments, **options)
def send_data(*arguments, **options) if @state == :open frame = write_data(*arguments, **options) if frame.end_stream? @state = :half_closed_local end elsif @state == :half_closed_remote frame = write_data(*arguments, **options) if frame.end_stream? close! end else raise ProtocolError, "Cannot send data in state: #{@state}" end end
def send_headers(*arguments)
def send_headers(*arguments) if @state == :idle frame = write_headers(*arguments) if frame.end_stream? @state = :half_closed_local else open! end elsif @state == :reserved_local frame = write_headers(*arguments) @state = :half_closed_remote elsif @state == :open frame = write_headers(*arguments) if frame.end_stream? @state = :half_closed_local end elsif @state == :half_closed_remote frame = write_headers(*arguments) if frame.end_stream? close! end else raise ProtocolError, "Cannot send headers in state: #{@state}" end end
def send_headers?
def send_headers? @state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote end
def send_push_promise(headers)
-
headers
(Hash
) -- contains a complete set of request header fields that the server attributes to the request.
def send_push_promise(headers) if @state == :open or @state == :half_closed_remote promised_stream = self.create_push_promise_stream(headers) promised_stream.reserved_local! # The headers are the same as if the client had sent a request: write_push_promise(promised_stream.id, headers) # The server should call send_headers on the promised stream to begin sending the response: return promised_stream else raise ProtocolError, "Cannot send push promise in state: #{@state}" end end
def send_reset_stream(error_code = 0)
def send_reset_stream(error_code = 0) if @state != :idle and @state != :closed frame = ResetStreamFrame.new(@id) frame.pack(error_code) write_frame(frame) close! else raise ProtocolError, "Cannot send reset stream (#{error_code}) in state: #{@state}" end end
def weight
def weight @dependency.weight end
def write_data(data, flags = 0, **options)
def write_data(data, flags = 0, **options) DataFrame.new(@id, flags) ack(data, **options) might fail if the data payload was too big: _remote_window(frame) rame(frame) frame
def write_frame(frame)
def write_frame(frame) @connection.write_frame(frame) end
def write_headers(priority, headers, flags = 0)
def write_headers(priority, headers, flags = 0) HeadersFrame.new(@id, flags) tion.write_frames do |framer| @connection.encode_headers(headers) pack(priority, data, maximum_size: @connection.maximum_frame_size) .write_frame(frame) frame
def write_push_promise(stream_id, headers, flags = 0, **options)
A push promise is server request -> client -> server response -> client.
A normal request is client request -> server response -> client.
def write_push_promise(stream_id, headers, flags = 0, **options) PushPromiseFrame.new(@id, flags) tion.write_frames do |framer| @connection.encode_headers(headers) pack(stream_id, data, maximum_size: @connection.maximum_frame_size) .write_frame(frame) frame