# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2019-2024, by Samuel Williams.
require_relative "connection"
require_relative "dependency"
module Protocol
module HTTP2
# A single HTTP/2 connection can multiplex multiple streams in parallel:
# multiple requests and responses can be in flight simultaneously and stream
# data can be interleaved and prioritized.
#
# This class encapsulates all of the state, transition, flow-control, and
# error management as defined by the HTTP 2.0 specification. All you have
# to do is subscribe to appropriate events (marked with ":" prefix in
# diagram below) and provide your application logic to handle request
# and response processing.
#
# ```
# ┌────────┐
# send PP │ │ recv PP
# ┌──────────┤ idle ├──────────┐
# │ │ │ │
# ▼ └───┬────┘ ▼
# ┌──────────┐ │ ┌──────────┐
# │ │ │ send H / │ │
# ┌──────┼ reserved │ │ recv H │ reserved ├──────┐
# │ │ (local) │ │ │ (remote) │ │
# │ └───┬──────┘ ▼ └──────┬───┘ │
# │ │ ┌────────┐ │ │
# │ │ recv ES │ │ send ES │ │
# │ send H │ ┌─────────┤ open ├─────────┐ │ recv H │
# │ │ │ │ │ │ │ │
# │ ▼ ▼ └───┬────┘ ▼ ▼ │
# │ ┌──────────┐ │ ┌──────────┐ │
# │ │ half │ │ │ half │ │
# │ │ closed │ │ send R / │ closed │ │
# │ │ (remote) │ │ recv R │ (local) │ │
# │ └────┬─────┘ │ └─────┬────┘ │
# │ │ │ │ │
# │ │ send ES / │ recv ES / │ │
# │ │ send R / ▼ send R / │ │
# │ │ recv R ┌────────┐ recv R │ │
# │ send R / └───────────►│ │◄───────────┘ send R / │
# │ recv R │ closed │ recv R │
# └───────────────────────►│ │◄───────────────────────┘
# └────────┘
# ```
#
# - `send`: endpoint sends this frame
# - `recv`: endpoint receives this frame
#
# - H: HEADERS frame (with implied CONTINUATIONs)
# - PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
# - ES: END_STREAM flag
# - R: RST_STREAM frame
#
# State transition methods use a trailing "!".
class Stream
include FlowControlled
def self.create(connection, id)
stream = self.new(connection, id)
connection.streams[id] = stream
return stream
end
def initialize(connection, id, state = :idle)
@connection = connection
@id = id
@state = state
@local_window = Window.new(@connection.local_settings.initial_window_size)
@remote_window = Window.new(@connection.remote_settings.initial_window_size)
@dependency = Dependency.create(@connection, @id)
end
# The connection this stream belongs to.
attr :connection
# Stream ID (odd for client initiated streams, even otherwise).
attr :id
# Stream state, e.g. `idle`, `closed`.
attr_accessor :state
attr :dependency
attr :local_window
attr :remote_window
def weight
@dependency.weight
end
def priority
@dependency.priority
end
def priority= priority
@dependency.priority = priority
end
def parent=(stream)
@dependency.parent = stream.dependency
end
def maximum_frame_size
@connection.available_frame_size
end
def write_frame(frame)
@connection.write_frame(frame)
end
def active?
@state != :closed && @state != :idle
end
def closed?
@state == :closed
end
# Transition directly to closed state. Do not pass go, do not collect $200.
# This method should only be used by `Connection#close`.
def close(error = nil)
unless closed?
@state = :closed
self.closed(error)
end
end
# HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state. Despite it's name, it can also be used for trailers.
def send_headers?
@state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote
end
private def write_headers(priority, headers, flags = 0)
frame = HeadersFrame.new(@id, flags)
@connection.write_frames do |framer|
data = @connection.encode_headers(headers)
frame.pack(priority, data, maximum_size: @connection.maximum_frame_size)
framer.write_frame(frame)
end
return frame
end
# The HEADERS frame is used to open a stream, and additionally carries a header block fragment. HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state.
def send_headers(*arguments)
if @state == :idle
frame = write_headers(*arguments)
if frame.end_stream?
@state = :half_closed_local
else
open!
end
elsif @state == :reserved_local
frame = write_headers(*arguments)
@state = :half_closed_remote
elsif @state == :open
frame = write_headers(*arguments)
if frame.end_stream?
@state = :half_closed_local
end
elsif @state == :half_closed_remote
frame = write_headers(*arguments)
if frame.end_stream?
close!
end
else
raise ProtocolError, "Cannot send headers in state: #{@state}"
end
end
def consume_remote_window(frame)
super
@connection.consume_remote_window(frame)
end
private def write_data(data, flags = 0, **options)
frame = DataFrame.new(@id, flags)
frame.pack(data, **options)
# This might fail if the data payload was too big:
consume_remote_window(frame)
write_frame(frame)
return frame
end
def send_data(*arguments, **options)
if @state == :open
frame = write_data(*arguments, **options)
if frame.end_stream?
@state = :half_closed_local
end
elsif @state == :half_closed_remote
frame = write_data(*arguments, **options)
if frame.end_stream?
close!
end
else
raise ProtocolError, "Cannot send data in state: #{@state}"
end
end
def open!
if @state == :idle
@state = :open
else
raise ProtocolError, "Cannot open stream in state: #{@state}"
end
return self
end
# The stream has been closed. If closed due to a stream reset, the error will be set.
def closed(error = nil)
end
# Transition the stream into the closed state.
# @param error_code [Integer] the error code if the stream was closed due to a stream reset.
def close!(error_code = nil)
@state = :closed
@connection.delete(@id)
if error_code
error = StreamError.new("Stream closed!", error_code)
end
self.closed(error)
return self
end
def send_reset_stream(error_code = 0)
if @state != :idle and @state != :closed
frame = ResetStreamFrame.new(@id)
frame.pack(error_code)
write_frame(frame)
close!
else
raise ProtocolError, "Cannot send reset stream (#{error_code}) in state: #{@state}"
end
end
def process_headers(frame)
# Receiving request headers:
priority, data = frame.unpack
if priority
@dependency.process_priority(priority)
end
@connection.decode_headers(data)
end
protected def ignore_headers(frame)
# Async.logger.warn(self) {"Received headers in state: #{@state}!"}
end
def receive_headers(frame)
if @state == :idle
if frame.end_stream?
@state = :half_closed_remote
else
open!
end
process_headers(frame)
elsif @state == :reserved_remote
process_headers(frame)
@state = :half_closed_local
elsif @state == :open
process_headers(frame)
if frame.end_stream?
@state = :half_closed_remote
end
elsif @state == :half_closed_local
process_headers(frame)
if frame.end_stream?
close!
end
elsif self.closed?
ignore_headers(frame)
else
self.send_reset_stream(Error::STREAM_CLOSED)
end
end
# @return [String] the data that was received.
def process_data(frame)
frame.unpack
end
def ignore_data(frame)
# Async.logger.warn(self) {"Received headers in state: #{@state}!"}
end
# DATA frames are subject to flow control and can only be sent when a stream is in the "open" or "half-closed (remote)" state. The entire DATA frame payload is included in flow control, including the Pad Length and Padding fields if present. If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error of type STREAM_CLOSED.
def receive_data(frame)
if @state == :open
update_local_window(frame)
if frame.end_stream?
@state = :half_closed_remote
end
process_data(frame)
elsif @state == :half_closed_local
update_local_window(frame)
process_data(frame)
if frame.end_stream?
close!
end
elsif self.closed?
ignore_data(frame)
else
# If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED.
self.send_reset_stream(Error::STREAM_CLOSED)
end
end
def receive_reset_stream(frame)
if @state == :idle
# If a RST_STREAM frame identifying an idle stream is received, the recipient MUST treat this as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
raise ProtocolError, "Cannot receive reset stream in state: #{@state}!"
else
error_code = frame.unpack
close!(error_code)
return error_code
end
end
# A normal request is client request -> server response -> client.
# A push promise is server request -> client -> server response -> client.
# The server generates the same set of headers as if the client was sending a request, and sends these to the client. The client can reject the request by resetting the (new) stream. Otherwise, the server will start sending a response as if the client had send the request.
private def write_push_promise(stream_id, headers, flags = 0, **options)
frame = PushPromiseFrame.new(@id, flags)
@connection.write_frames do |framer|
data = @connection.encode_headers(headers)
frame.pack(stream_id, data, maximum_size: @connection.maximum_frame_size)
framer.write_frame(frame)
end
return frame
end
def reserved_local!
if @state == :idle
@state = :reserved_local
else
raise ProtocolError, "Cannot reserve stream in state: #{@state}"
end
return self
end
def reserved_remote!
if @state == :idle
@state = :reserved_remote
else
raise ProtocolError, "Cannot reserve stream in state: #{@state}"
end
return self
end
# Override this function to implement your own push promise logic.
def create_push_promise_stream(headers)
@connection.create_push_promise_stream
end
# Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame.
# @param headers [Hash] contains a complete set of request header fields that the server attributes to the request.
def send_push_promise(headers)
if @state == :open or @state == :half_closed_remote
promised_stream = self.create_push_promise_stream(headers)
promised_stream.reserved_local!
# The headers are the same as if the client had sent a request:
write_push_promise(promised_stream.id, headers)
# The server should call send_headers on the promised stream to begin sending the response:
return promised_stream
else
raise ProtocolError, "Cannot send push promise in state: #{@state}"
end
end
# Override this function to implement your own push promise logic.
def accept_push_promise_stream(stream_id, headers)
@connection.accept_push_promise_stream(stream_id)
end
def receive_push_promise(frame)
promised_stream_id, data = frame.unpack
headers = @connection.decode_headers(data)
stream = self.accept_push_promise_stream(promised_stream_id, headers)
stream.parent = self
stream.reserved_remote!
return stream, headers
end
def inspect
"\#<#{self.class} id=#{@id} state=#{@state}>"
end
end
end
end