class Async::HTTP::Protocol::HTTP2::Stream
def add_header(key, value)
def add_header(key, value) if key == CONNECTION raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" elsif key.start_with? ':' raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" elsif key =~ /[A-Z]/ raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!" else @headers.add(key, value) end end
def add_trailer(key, value)
def add_trailer(key, value) if @trailers.include(key) add_header(key, value) else raise ::Protocol::HTTP2::HeaderError, "Cannot add trailer #{key} as it was not specified in trailers!" end end
def close(error = nil)
def close(error = nil) super if @input @input.close(error) @input = nil end if @output @output.stop(error) @output = nil end end
def initialize(*)
def initialize(*) super @headers = nil @trailers = nil # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil # Output buffer, writing request body or response body (window_updated): @output = nil end
def prepare_input(length)
-
(Input)
- the input body.
def prepare_input(length) if @input.nil? @input = Input.new(self, length) else raise ArgumentError, "Input body already prepared!" end end
def process_data(frame)
def process_data(frame) data = frame.unpack if @input unless data.empty? @input.write(data) end if frame.end_stream? @input.close @input = nil end end return data rescue ::Protocol::HTTP2::ProtocolError raise rescue # Anything else... send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end
def receive_headers(frame)
def receive_headers(frame) if @headers.nil? @headers = ::Protocol::HTTP::Headers.new self.receive_initial_headers(super, frame.end_stream?) @trailers = @headers[TRAILERS] elsif @trailers and frame.end_stream? self.receive_trailing_headers(super, frame.end_stream?) else raise ::Protocol::HTTP2::HeaderError, "Unable to process headers!" end rescue ::Protocol::HTTP2::HeaderError => error Async.logger.error(self, error) send_reset_stream(error.code) end
def receive_trailing_headers(headers, end_stream)
def receive_trailing_headers(headers, end_stream) headers.each do |key, value| add_trailer(key, value) end end
def send_body(body)
def send_body(body) @output = Output.for(self, body) end
def update_local_window(frame)
def update_local_window(frame) consume_local_window(frame) # This is done on demand in `Input#read`: # request_window_update end
def wait_for_input
def wait_for_input return @input end
def window_updated(size)
def window_updated(size) super @output&.window_updated(size) end