class Async::HTTP::Protocol::HTTP2::Stream
def add_header(key, value)
def add_header(key, value) if key == CONNECTION raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" elsif key.start_with? ':' raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" elsif key =~ /[A-Z]/ raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!" else @headers.add(key, value) end end
def closed(error)
- A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task.
- A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:
def closed(error) super if @input @input.close(error) @input = nil end if @output @output.stop(error) @output = nil end return self end
def finish_output(error = nil)
def finish_output(error = nil) trailer = @output&.trailer @output = nil if error send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) else # Write trailer? if trailer&.any? send_headers(nil, trailer, ::Protocol::HTTP2::END_STREAM) else send_data(nil, ::Protocol::HTTP2::END_STREAM) end end end
def initialize(*)
def initialize(*) super @headers = nil # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil # Output buffer, writing request body or response body (window_updated): @output = nil end
def prepare_input(length)
-
(Input)
- the input body.
def prepare_input(length) if @input.nil? @input = Input.new(self, length) else raise ArgumentError, "Input body already prepared!" end end
def process_data(frame)
def process_data(frame) data = frame.unpack if @input unless data.empty? @input.write(data) end if frame.end_stream? @input.close @input = nil end end return data rescue ::Protocol::HTTP2::ProtocolError raise rescue # Anything else... send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end
def process_headers(frame)
def process_headers(frame) if frame.end_stream? && @headers self.receive_trailing_headers(super, frame.end_stream?) else @headers ||= ::Protocol::HTTP::Headers.new self.receive_initial_headers(super, frame.end_stream?) end # TODO this might need to be in an ensure block: if @input and frame.end_stream? @input.close($!) @input = nil end rescue ::Protocol::HTTP2::HeaderError => error Console.logger.debug(self, error) send_reset_stream(error.code) end
def receive_trailing_headers(headers, end_stream)
def receive_trailing_headers(headers, end_stream) headers.each do |key, value| add_header(key, value) end end
def send_body(body, trailer = nil)
def send_body(body, trailer = nil) @output = Output.new(self, body, trailer) @output.start end
def update_local_window(frame)
def update_local_window(frame) consume_local_window(frame) # This is done on demand in `Input#read`: # request_window_update end
def wait_for_input
def wait_for_input return @input end
def window_updated(size)
def window_updated(size) super @output&.window_updated(size) end