class Async::HTTP::Protocol::HTTP2::Stream::Buffer
def close(error)
def close(error) if @body @body.close(error) @body = nil end @task&.stop end
def end_stream
def end_stream @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) end
def initialize(stream, body, task: Task.current)
def initialize(stream, body, task: Task.current) @stream = stream @body = body @remainder = nil @window_updated = Async::Condition.new @task = task.async(&self.method(:passthrough)) end
def passthrough(task)
def passthrough(task) while chunk = self.read maximum_size = @stream.available_frame_size while maximum_size <= 0 @window_updated.wait maximum_size = @stream.available_frame_size end self.send_data(chunk, maximum_size) end self.end_stream rescue Async::Stop # Ignore. ensure @body&.close($!) @body = nil end
def push(chunk)
def push(chunk) @remainder = chunk end
def read
def read if @remainder remainder = @remainder @remainder = nil return remainder else @body.read end end
def send_data(chunk, maximum_size)
-
stream
(Stream
) -- the stream to use for sending data frames. -
maximum_size
(Integer
) -- send up to this many bytes of data.
def send_data(chunk, maximum_size) if chunk.bytesize <= maximum_size @stream.send_data(chunk, maximum_size: maximum_size) else @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) # The window was not big enough to send all the data, so we save it for next time: self.push( chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) ) end end
def window_updated(size)
def window_updated(size) @window_updated.signal end