class Async::HTTP::Protocol::HTTP2::Stream::Output
def self.for(stream, body)
def self.for(stream, body) output = self.new(stream, body) output.start return output end
def close(error = nil)
def close(error = nil) if @stream if error @stream.close(error) else self.close_write end @stream = nil end end
def close_write
def close_write @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) end
def initialize(stream, body)
def initialize(stream, body) @stream = stream @body = body @window_updated = Async::Condition.new end
def passthrough(task)
def passthrough(task) task.annotate("Writing #{@body} to #{@stream}.") while chunk = @body&.read self.write(chunk) end self.close_write rescue Async::Stop # Ignore. ensure @body&.close($!) @body = nil end
def send_data(chunk, maximum_size)
-
(String, nil)
- any data that could not be written.
Parameters:
-
stream
(Stream
) -- the stream to use for sending data frames. -
maximum_size
(Integer
) -- send up to this many bytes of data.
def send_data(chunk, maximum_size) if chunk.bytesize <= maximum_size @stream.send_data(chunk, maximum_size: maximum_size) else @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) # The window was not big enough to send all the data, so we save it for next time: return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) end return nil end
def start(parent: Task.current)
def start(parent: Task.current) if @body.respond_to?(:call) @task = parent.async(&self.method(:stream)) else @task = parent.async(&self.method(:passthrough)) end end
def stop(error)
def stop(error) # Ensure that invoking #close doesn't try to close the stream. @stream = nil @task&.stop end
def stream(task)
def stream(task) task.annotate("Streaming #{@body} to #{@stream}.") input = @stream.wait_for_input @body.call(Body::Stream.new(input, self)) rescue Async::Stop # Ignore. end
def window_updated(size)
def window_updated(size) @window_updated.signal end
def write(chunk)
def write(chunk) until chunk.empty? maximum_size = @stream.available_frame_size while maximum_size <= 0 @window_updated.wait maximum_size = @stream.available_frame_size end break unless chunk = send_data(chunk, maximum_size) end end