class Async::HTTP::Protocol::HTTP2::Stream
def initialize(delegate, *args)
def initialize(delegate, *args) super(*args) @delegate = delegate @body = body @remainder = nil @task = nil end
def receive_data(frame)
def receive_data(frame) data = super if data delegate.receive_data(self, data, frame.end_stream?) end return data end
def receive_headers(frame)
def receive_headers(frame) headers = super delegate.receive_headers(self, headers, frame.end_stream?) return headers end
def receive_reset_stream(frame)
def receive_reset_stream(frame) error_code = super if @body @body.close(EOFError.new(error_code)) @body = nil end delegate.receive_reset_stream(self, error_code) return error_code end
def send_body(body, task: Async::Task.current)
def send_body(body, task: Async::Task.current) # TODO Might need to stop this task when body is cancelled. @task = task.async do @body = body window_updated end end
def send_chunk
def send_chunk maximum_size = self.available_frame_size if maximum_size == 0 return false end if @remainder chunk = @remainder @remainder = nil elsif chunk = @body.read # There was a new chunk of data to send else if @body @body.close @body = nil end # @body.read above might take a while and a stream reset might be received in the mean time. unless closed? send_data(nil, ::HTTP::Protocol::HTTP2::END_STREAM) end return false end return false if closed? if chunk.bytesize <= maximum_size send_data(chunk, maximum_size: maximum_size) else send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) @remainder = chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) end return true end
def window_updated
def window_updated return unless @body while send_chunk # There could be more data to send... end end