class Protocol::HTTP::Body::Streamable::Body
def call(stream)
Invoke the block with the given stream.
def call(stream) if @block.nil? raise ConsumedError, "Streaming block has already been consumed!" end block = @block @input = @output = @block = nil # Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream. block.call(stream) rescue => error # If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here: stream.close raise end
def close_input(error = nil)
def close_input(error = nil) if input = @input @input = nil input.close(error) end end
def close_output(error = nil)
def close_output(error = nil) @output&.close(error) end
def initialize(block, input = nil)
def initialize(block, input = nil) @block = block @input = input @output = nil end
def read
def read # We are reading chunk by chunk, allocate an output stream and execute the block to generate the chunks: if @output.nil? if @block.nil? raise ConsumedError, "Streaming body has already been consumed!" end @output = Output.schedule(@input, @block) @block = nil end @output.read end
def stream?
def stream? true end