class Async::HTTP::Protocol::HTTP2::Stream
def accept_push_promise_stream(headers, stream_id)
def accept_push_promise_stream(headers, stream_id) @delegate.accept_push_promise_stream(headers, stream_id) end
def close!
def close! super @delegate.close! end
def create_push_promise_stream(headers)
def create_push_promise_stream(headers) @delegate.create_push_promise_stream(headers) end
def initialize(delegate, *args)
def initialize(delegate, *args) super(*args) @delegate = delegate @body = body @remainder = nil @task = nil end
def receive_data(frame)
def receive_data(frame) data = super if data @delegate.receive_data(self, data, frame.end_stream?) end return data end
def receive_headers(frame)
def receive_headers(frame) headers = super @delegate.receive_headers(self, headers, frame.end_stream?) return headers end
def receive_reset_stream(frame)
def receive_reset_stream(frame) error_code = super if @body @body.close(EOFError.new(error_code)) @body = nil end @delegate.receive_reset_stream(self, error_code) return error_code end
def send_body(body, task: Async::Task.current)
def send_body(body, task: Async::Task.current) # TODO Might need to stop this task when body is cancelled. @task = task.async do |subtask| subtask.annotate "Sending body: #{body.class}" @body = body window_updated end end
def send_chunk
def send_chunk maximum_size = self.available_frame_size if maximum_size == 0 return false end if @remainder chunk = @remainder @remainder = nil elsif chunk = @body.read # There was a new chunk of data to send else if @body @body.close @body = nil end # @body.read above might take a while and a stream reset might be received in the mean time. unless closed? or @connection.closed? send_data(nil, ::Protocol::HTTP2::END_STREAM) end return false end return false if closed? if chunk.bytesize <= maximum_size send_data(chunk, maximum_size: maximum_size) else send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) @remainder = chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) end return true end
def stop_connection(error)
def stop_connection(error) if @body @body.close(error) @body = nil end @delegate.stop_connection(error) end
def window_updated
def window_updated return unless @body while send_chunk # There could be more data to send... end end