lib/protocol/http2/connection.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2024, by Samuel Williams.
# Copyright, 2023, by Marco Concetto Rudilosso.

require_relative "framer"
require_relative "dependency"
require_relative "flow_controlled"

require "protocol/hpack"

module Protocol
	module HTTP2
		class Connection
			include FlowControlled
			
			def initialize(framer, local_stream_id)
				super()
				
				@state = :new
				
				# Hash(Integer, Stream)
				@streams = {}
				
				# Hash(Integer, Dependency)
				@dependency = Dependency.new(self, 0)
				@dependencies = {0 => @dependency}
				
				@framer = framer
				
				# The next stream id to use:
				@local_stream_id = local_stream_id
				
				# The biggest remote stream id seen thus far:
				@remote_stream_id = 0
				
				@local_settings = PendingSettings.new
				@remote_settings = Settings.new
				
				@decoder = HPACK::Context.new
				@encoder = HPACK::Context.new
				
				@local_window = LocalWindow.new()
				@remote_window = Window.new
			end
			
			def id
				0
			end
			
			def [] id
				if id.zero?
					self
				else
					@streams[id]
				end
			end
			
			# The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
			def maximum_frame_size
				@remote_settings.maximum_frame_size
			end
			
			# The maximum number of concurrent streams that this connection can initiate:
			def maximum_concurrent_streams
				@remote_settings.maximum_concurrent_streams
			end
			
			attr :framer
			
			# Connection state (:new, :open, :closed).
			attr_accessor :state
			
			# Current settings value for local and peer
			attr_accessor :local_settings
			attr_accessor :remote_settings
			
			# Our window for receiving data. When we receive data, it reduces this window.
			# If the window gets too small, we must send a window update.
			attr :local_window
			
			# Our window for sending data. When we send data, it reduces this window.
			attr :remote_window
			
			# The highest stream_id that has been successfully accepted by this connection.
			attr :remote_stream_id
			
			# Whether the connection is effectively or actually closed.
			def closed?
				@state == :closed || @framer.nil?
			end
			
			def delete(id)
				@streams.delete(id)
				@dependencies[id]&.delete!
			end
			
			# Close the underlying framer and all streams.
			def close(error = nil)
				# The underlying socket may already be closed by this point.
				@streams.each_value{|stream| stream.close(error)}
				@streams.clear
				
			ensure
				if @framer
					@framer.close
					@framer = nil
				end
			end
			
			def encode_headers(headers, buffer = String.new.b)
				HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers)
			end
			
			def decode_headers(data)
				HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode
			end
			
			# Streams are identified with an unsigned 31-bit integer.  Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers.  A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.
			def next_stream_id
				id = @local_stream_id
				
				@local_stream_id += 2
				
				return id
			end
			
			attr :streams
			
			attr :dependencies
			
			attr :dependency
			
			# 6.8. GOAWAY
			# There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client.
			# Once sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams.
			def ignore_frame?(frame)
				if self.closed?
					# puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}"
					if valid_remote_stream_id?(frame.stream_id)
						return frame.stream_id > @remote_stream_id
					end
				end
			end
			
			def synchronize
				yield
			end
			
			# Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.
			def read_frame
				frame = @framer.read_frame(@local_settings.maximum_frame_size)
				
				# puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})"
				# puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"
				
				return if ignore_frame?(frame)
				
				yield frame if block_given?
				frame.apply(self)
				
				return frame
			rescue GoawayError => error
				# Go directly to jail. Do not pass go, do not collect $200.
				raise
			rescue ProtocolError => error
				send_goaway(error.code || PROTOCOL_ERROR, error.message)
				
				raise
			rescue HPACK::Error => error
				send_goaway(COMPRESSION_ERROR, error.message)
				
				raise
			end
			
			def send_settings(changes)
				@local_settings.append(changes)
				
				frame = SettingsFrame.new
				frame.pack(changes)
				
				write_frame(frame)
			end
			
			# Transition the connection into the closed state.
			def close!
				@state = :closed
				
				return self
			end
			
			# Tell the remote end that the connection is being shut down. If the `error_code` is 0, this is a graceful shutdown. The other end of the connection should not make any new streams, but existing streams may be completed.
			def send_goaway(error_code = 0, message = "")
				frame = GoawayFrame.new
				frame.pack @remote_stream_id, error_code, message
				
				write_frame(frame)
			ensure
				self.close!
			end
			
			def receive_goaway(frame)
				# We capture the last stream that was processed.
				@remote_stream_id, error_code, message = frame.unpack
				
				self.close!
				
				if error_code != 0
					# Shut down immediately.
					raise GoawayError.new(message, error_code)
				end
			end
			
			def write_frame(frame)
				synchronize do
					@framer.write_frame(frame)
				end
				
				# The IO is already synchronized, and we don't want additional contention.
				@framer.flush
			end
			
			def write_frames
				if @framer
					synchronize do
						yield @framer
					end
					
					# See above note.
					@framer.flush
				else
					raise EOFError, "Connection closed!"
				end
			end
			
			def update_local_settings(changes)
				capacity = @local_settings.initial_window_size
				
				@streams.each_value do |stream|
					stream.local_window.capacity = capacity
				end
				
				@local_window.desired = capacity
			end
			
			def update_remote_settings(changes)
				capacity = @remote_settings.initial_window_size
				
				@streams.each_value do |stream|
					stream.remote_window.capacity = capacity
				end
			end
			
			# In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the "open" or "half-closed (remote)" state).  When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.
			#
			# @return [Boolean] whether the frame was an acknowledgement
			def process_settings(frame)
				if frame.acknowledgement?
					# The remote end has confirmed the settings have been received:
					changes = @local_settings.acknowledge
					
					update_local_settings(changes)
					
					return true
				else
					# The remote end is updating the settings, we reply with acknowledgement:
					reply = frame.acknowledge
					
					write_frame(reply)
					
					changes = frame.unpack
					@remote_settings.update(changes)
					
					update_remote_settings(changes)
					
					return false
				end
			end
			
			def open!
				@state = :open
				
				return self
			end
			
			def receive_settings(frame)
				if @state == :new
					# We transition to :open when we receive acknowledgement of first settings frame:
					open! if process_settings(frame)
				elsif @state != :closed
					process_settings(frame)
				else
					raise ProtocolError, "Cannot receive settings in state #{@state}"
				end
			end
			
			def send_ping(data)
				if @state != :closed
					frame = PingFrame.new
					frame.pack data
					
					write_frame(frame)
				else
					raise ProtocolError, "Cannot send ping in state #{@state}"
				end
			end
			
			def receive_ping(frame)
				if @state != :closed
					# This is handled in `read_payload`:
					# if frame.stream_id != 0
					# 	raise ProtocolError, "Ping received for non-zero stream!"
					# end
					
					unless frame.acknowledgement?
						reply = frame.acknowledge
						
						write_frame(reply)
					end
				else
					raise ProtocolError, "Cannot receive ping in state #{@state}"
				end
			end
			
			def receive_data(frame)
				update_local_window(frame)
				
				if stream = @streams[frame.stream_id]
					stream.receive_data(frame)
				elsif closed_stream_id?(frame.stream_id)
					# This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless.
				else
					raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}"
				end
			end
			
			def valid_remote_stream_id?(stream_id)
				false
			end
			
			# Accept an incoming stream from the other side of the connnection.
			# On the server side, we accept requests.
			def accept_stream(stream_id, &block)
				unless valid_remote_stream_id?(stream_id)
					raise ProtocolError, "Invalid stream id: #{stream_id}"
				end
				
				create_stream(stream_id, &block)
			end
			
			# Accept an incoming push promise from the other side of the connection.
			# On the client side, we accept push promise streams.
			# On the server side, existing streams create push promise streams.
			def accept_push_promise_stream(stream_id, &block)
				accept_stream(stream_id, &block)
			end
			
			# Create a stream, defaults to an outgoing stream.
			# On the client side, we create requests.
			# @return [Stream] the created stream.
			def create_stream(id = next_stream_id, &block)
				if @streams.key?(id)
					raise ProtocolError, "Cannot create stream with id #{id}, already exists!"
				end
				
				if block_given?
					return yield(self, id)
				else
					return Stream.create(self, id)
				end
			end
			
			def create_push_promise_stream(&block)
				create_stream(&block)
			end
			
			# On the server side, starts a new request.
			def receive_headers(frame)
				stream_id = frame.stream_id
				
				if stream_id.zero?
					raise ProtocolError, "Cannot receive headers for stream 0!"
				end
				
				if stream = @streams[stream_id]
					stream.receive_headers(frame)
				else
					if stream_id <= @remote_stream_id
						raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!"
					end
					
					# We need to validate that we have less streams than the specified maximum:
					if @streams.size < @local_settings.maximum_concurrent_streams
						stream = accept_stream(stream_id)
						@remote_stream_id = stream_id
						
						stream.receive_headers(frame)
					else
						raise ProtocolError, "Exceeded maximum concurrent streams"
					end
				end
			end
			
			def send_priority(stream_id, priority)
				frame = PriorityFrame.new(stream_id)
				frame.pack(priority)
				
				write_frame(frame)
			end
			
			# Sets the priority for an incoming stream.
			def receive_priority(frame)
				if dependency = @dependencies[frame.stream_id]
					dependency.receive_priority(frame)
				elsif idle_stream_id?(frame.stream_id)
					Dependency.create(self, frame.stream_id, frame.unpack)
				end
			end
			
			def receive_push_promise(frame)
				raise ProtocolError, "Unable to receive push promise!"
			end
			
			def client_stream_id?(id)
				id.odd?
			end
			
			def server_stream_id?(id)
				id.even?
			end
			
			def idle_stream_id?(id)
				if id.even?
					# Server-initiated streams are even.
					if @local_stream_id.even?
						id >= @local_stream_id
					else
						id > @remote_stream_id
					end
				elsif id.odd?
					# Client-initiated streams are odd.
					if @local_stream_id.odd?
						id >= @local_stream_id
					else
						id > @remote_stream_id
					end
				end
			end
			
			# This is only valid if the stream doesn't exist in `@streams`.
			def closed_stream_id?(id)
				if id.zero?
					# The connection "stream id" can never be closed:
					false
				else
					!idle_stream_id?(id)
				end
			end
			
			def receive_reset_stream(frame)
				if frame.connection?
					raise ProtocolError, "Cannot reset connection!"
				elsif stream = @streams[frame.stream_id]
					stream.receive_reset_stream(frame)
				elsif closed_stream_id?(frame.stream_id)
					# Ignore.
				else
					raise StreamClosed, "Cannot reset stream #{frame.stream_id}"
				end
			end
			
			# Traverse active streams in order of priority and allow them to consume the available flow-control window.
			# @param amount [Integer] the amount of data to write. Defaults to the current window capacity.
			def consume_window(size = self.available_size)
				# Return if there is no window to consume:
				return unless size > 0
				
				# Console.logger.debug(self) do |buffer|
				# 	@dependencies.each do |id, dependency|
				# 		buffer.puts "- #{dependency}"
				# 	end
				# 
				# 	buffer.puts
				# 
				# 	@dependency.print_hierarchy(buffer)
				# end
				
				@dependency.consume_window(size)
			end
			
			def receive_window_update(frame)
				if frame.connection?
					super
					
					self.consume_window
				elsif stream = @streams[frame.stream_id]
					begin
						stream.receive_window_update(frame)
					rescue ProtocolError => error
						stream.send_reset_stream(error.code)
					end
				elsif closed_stream_id?(frame.stream_id)
					# Ignore.
				else
					# Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR.
					raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}"
				end
			end
			
			def receive_continuation(frame)
				raise ProtocolError, "Received unexpected continuation: #{frame.class}"
			end
			
			def receive_frame(frame)
				# ignore.
			end
		end
	end
end