class Protocol::HTTP2::Stream

State transition methods use a trailing “!”.
- R: RST_STREAM frame
- ES: END_STREAM flag
- PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
- H: HEADERS frame (with implied CONTINUATIONs)
- ‘recv`: endpoint receives this frame
- `send`: endpoint sends this frame
“`
└────────┘
└───────────────────────►│ │◄───────────────────────┘
│ recv R │ closed │ recv R │
│ send R / └───────────►│ │◄───────────┘ send R / │
│ │ recv R ┌────────┐ recv R │ │
│ │ send R / ▼ send R / │ │
│ │ send ES / │ recv ES / │ │
│ │ │ │ │
│ └────┬─────┘ │ └─────┬────┘ │
│ │ (remote) │ │ recv R │ (local) │ │
│ │ closed │ │ send R / │ closed │ │
│ │ half │ │ │ half │ │
│ ┌──────────┐ │ ┌──────────┐ │
│ ▼ ▼ └───┬────┘ ▼ ▼ │
│ │ │ │ │ │ │ │
│ send H │ ┌─────────┤ open ├─────────┐ │ recv H │
│ │ recv ES │ │ send ES │ │
│ │ ┌────────┐ │ │
│ └───┬──────┘ ▼ └──────┬───┘ │
│ │ (local) │ │ │ (remote) │ │
┌──────┼ reserved │ │ recv H │ reserved ├──────┐
│ │ │ send H / │ │
┌──────────┐ │ ┌──────────┐
▼ └───┬────┘ ▼
│ │ │ │
┌──────────┤ idle ├──────────┐
send PP │ │ recv PP
┌────────┐
“`
and response processing.
diagram below) and provide your application logic to handle request
to do is subscribe to appropriate events (marked with “:” prefix in
error management as defined by the HTTP 2.0 specification. All you have
This class encapsulates all of the state, transition, flow-control, and
data can be interleaved and prioritized.
multiple requests and responses can be in flight simultaneously and stream
A single HTTP/2 connection can multiplex multiple streams in parallel:

def self.create(connection, id)

def self.create(connection, id)
	stream = self.new(connection, id)
	
	connection.streams[id] = stream
	
	return stream
end

def accept_push_promise_stream(stream_id, headers)

Override this function to implement your own push promise logic.
def accept_push_promise_stream(stream_id, headers)
	@connection.accept_push_promise_stream(stream_id)
end

def active?

def active?
	@state != :closed && @state != :idle
end

def close(error = nil)

This method should only be used by `Connection#close`.
Transition directly to closed state. Do not pass go, do not collect $200.
def close(error = nil)
	unless closed?
		@state = :closed
		self.closed(error)
	end
end

def close!(error_code = nil)

Parameters:
  • error_code (Integer) -- the error code if the stream was closed due to a stream reset.
def close!(error_code = nil)
	@state = :closed
	@connection.delete(@id)
	
	if error_code
		error = StreamError.new("Stream closed!", error_code)
	end
	
	self.closed(error)
	
	return self
end

def closed(error = nil)

The stream has been closed. If closed due to a stream reset, the error will be set.
def closed(error = nil)
end

def closed?

def closed?
	@state == :closed
end

def consume_remote_window(frame)

def consume_remote_window(frame)
	super
	
	@connection.consume_remote_window(frame)
end

def create_push_promise_stream(headers)

Override this function to implement your own push promise logic.
def create_push_promise_stream(headers)
	@connection.create_push_promise_stream
end

def ignore_data(frame)

def ignore_data(frame)
	# Console.warn(self) {"Received headers in state: #{@state}!"}
end

def ignore_headers(frame)

def ignore_headers(frame)
.warn(self) {"Received headers in state: #{@state}!"}

def initialize(connection, id, state = :idle)

def initialize(connection, id, state = :idle)
	@connection = connection
	@id = id
	
	@state = state
	
	@local_window = Window.new(@connection.local_settings.initial_window_size)
	@remote_window = Window.new(@connection.remote_settings.initial_window_size)
	
	@dependency = Dependency.create(@connection, @id)
end

def inspect

def inspect
	"\#<#{self.class} id=#{@id} state=#{@state}>"
end

def maximum_frame_size

def maximum_frame_size
	@connection.available_frame_size
end

def open!

def open!
	if @state == :idle
		@state = :open
	else
		raise ProtocolError, "Cannot open stream in state: #{@state}"
	end
	
	return self
end

def parent=(stream)

def parent=(stream)
	@dependency.parent = stream.dependency
end

def priority

def priority
	@dependency.priority
end

def priority= priority

def priority= priority
	@dependency.priority = priority
end

def process_data(frame)

Returns:
  • (String) - the data that was received.
def process_data(frame)
	frame.unpack
end

def process_headers(frame)

def process_headers(frame)
	# Receiving request headers:
	priority, data = frame.unpack
	
	if priority
		@dependency.process_priority(priority)
	end
	
	@connection.decode_headers(data)
end

def receive_data(frame)

DATA frames are subject to flow control and can only be sent when a stream is in the "open" or "half-closed (remote)" state. The entire DATA frame payload is included in flow control, including the Pad Length and Padding fields if present. If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error of type STREAM_CLOSED.
def receive_data(frame)
	if @state == :open
		update_local_window(frame)
		
		if frame.end_stream?
			@state = :half_closed_remote
		end
		
		process_data(frame)
	elsif @state == :half_closed_local
		update_local_window(frame)
		
		process_data(frame)
		
		if frame.end_stream?
			close!
		end
	elsif self.closed?
		ignore_data(frame)
	else
		# If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED.
		self.send_reset_stream(Error::STREAM_CLOSED)
	end
end

def receive_headers(frame)

def receive_headers(frame)
	if @state == :idle
		if frame.end_stream?
			@state = :half_closed_remote
		else
			open!
		end
		
		process_headers(frame)
	elsif @state == :reserved_remote
		process_headers(frame)
		
		@state = :half_closed_local
	elsif @state == :open
		process_headers(frame)
		
		if frame.end_stream?
			@state = :half_closed_remote
		end
	elsif @state == :half_closed_local
		process_headers(frame)
		
		if frame.end_stream?
			close!
		end
	elsif self.closed?
		ignore_headers(frame)
	else
		self.send_reset_stream(Error::STREAM_CLOSED)
	end
end

def receive_push_promise(frame)

def receive_push_promise(frame)
	promised_stream_id, data = frame.unpack
	headers = @connection.decode_headers(data)
	
	stream = self.accept_push_promise_stream(promised_stream_id, headers)
	stream.parent = self
	stream.reserved_remote!
	
	return stream, headers
end

def receive_reset_stream(frame)

def receive_reset_stream(frame)
	if @state == :idle
		# If a RST_STREAM frame identifying an idle stream is received, the recipient MUST treat this as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
		raise ProtocolError, "Cannot receive reset stream in state: #{@state}!"
	else
		error_code = frame.unpack
		
		close!(error_code)
		
		return error_code
	end
end

def reserved_local!

def reserved_local!
	if @state == :idle
		@state = :reserved_local
	else
		raise ProtocolError, "Cannot reserve stream in state: #{@state}"
	end
	
	return self
end

def reserved_remote!

def reserved_remote!
	if @state == :idle
		@state = :reserved_remote
	else
		raise ProtocolError, "Cannot reserve stream in state: #{@state}"
	end
	
	return self
end

def send_data(*arguments, **options)

def send_data(*arguments, **options)
	if @state == :open
		frame = write_data(*arguments, **options)
		
		if frame.end_stream?
			@state = :half_closed_local
		end
	elsif @state == :half_closed_remote
		frame = write_data(*arguments, **options)
		
		if frame.end_stream?
			close!
		end
	else
		raise ProtocolError, "Cannot send data in state: #{@state}"
	end
end

def send_headers(*arguments)

The HEADERS frame is used to open a stream, and additionally carries a header block fragment. HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state.
def send_headers(*arguments)
	if @state == :idle
		frame = write_headers(*arguments)
		
		if frame.end_stream?
			@state = :half_closed_local
		else
			open!
		end
	elsif @state == :reserved_local
		frame = write_headers(*arguments)
		
		@state = :half_closed_remote
	elsif @state == :open
		frame = write_headers(*arguments)
		
		if frame.end_stream?
			@state = :half_closed_local
		end
	elsif @state == :half_closed_remote
		frame = write_headers(*arguments)
		
		if frame.end_stream?
			close!
		end
	else
		raise ProtocolError, "Cannot send headers in state: #{@state}"
	end
end

def send_headers?

HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state. Despite it's name, it can also be used for trailers.
def send_headers?
	@state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote
end

def send_push_promise(headers)

Parameters:
  • headers (Hash) -- contains a complete set of request header fields that the server attributes to the request.
def send_push_promise(headers)
	if @state == :open or @state == :half_closed_remote
		promised_stream = self.create_push_promise_stream(headers)
		promised_stream.reserved_local!
		
		# The headers are the same as if the client had sent a request:
		write_push_promise(promised_stream.id, headers)
		
		# The server should call send_headers on the promised stream to begin sending the response:
		return promised_stream
	else
		raise ProtocolError, "Cannot send push promise in state: #{@state}"
	end
end

def send_reset_stream(error_code = 0)

def send_reset_stream(error_code = 0)
	if @state != :idle and @state != :closed
		frame = ResetStreamFrame.new(@id)
		frame.pack(error_code)
		
		write_frame(frame)
		
		close!
	else
		raise ProtocolError, "Cannot send reset stream (#{error_code}) in state: #{@state}"
	end
end

def weight

def weight
	@dependency.weight
end

def write_data(data, flags = 0, **options)

def write_data(data, flags = 0, **options)
 DataFrame.new(@id, flags)
ack(data, **options)
might fail if the data payload was too big:
_remote_window(frame)
rame(frame)
frame

def write_frame(frame)

def write_frame(frame)
	@connection.write_frame(frame)
end

def write_headers(priority, headers, flags = 0)

def write_headers(priority, headers, flags = 0)
 HeadersFrame.new(@id, flags)
tion.write_frames do |framer|
 @connection.encode_headers(headers)
pack(priority, data, maximum_size: @connection.maximum_frame_size)
.write_frame(frame)
frame

def write_push_promise(stream_id, headers, flags = 0, **options)

The server generates the same set of headers as if the client was sending a request, and sends these to the client. The client can reject the request by resetting the (new) stream. Otherwise, the server will start sending a response as if the client had send the request.
A push promise is server request -> client -> server response -> client.
A normal request is client request -> server response -> client.
def write_push_promise(stream_id, headers, flags = 0, **options)
 PushPromiseFrame.new(@id, flags)
tion.write_frames do |framer|
 @connection.encode_headers(headers)
pack(stream_id, data, maximum_size: @connection.maximum_frame_size)
.write_frame(frame)
frame