lib/protocol/http/body/streamable.rb
# frozen_string_literal: true # Released under the MIT License. # Copyright, 2019-2024, by Samuel Williams. require_relative "readable" require_relative "writable" require_relative "stream" module Protocol module HTTP module Body # A body that invokes a block that can read and write to a stream. # # In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement {stream?} and return `true`. When {stream?} returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using {each}. # # When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server. module Streamable # Generate a new streaming request body using the given block to generate the body. # # @parameter block [Proc] The block that generates the body. # @returns [RequestBody] The streaming request body. def self.request(&block) RequestBody.new(block) end # Generate a new streaming response body using the given block to generate the body. # # @parameter request [Request] The request. # @parameter block [Proc] The block that generates the body. # @returns [ResponseBody] The streaming response body. def self.response(request, &block) ResponseBody.new(block, request.body) end # A output stream that can be written to by a block. class Output # Schedule the block to be executed in a fiber. # # @parameter input [Readable] The input stream. # @parameter block [Proc] The block that generates the output. # @returns [Output] The output stream. def self.schedule(input, block) self.new(input, block).tap(&:schedule) end # Initialize the output stream with the given input and block. # # @parameter input [Readable] The input stream. # @parameter block [Proc] The block that generates the output. def initialize(input, block) @output = Writable.new @stream = Stream.new(input, @output) @block = block end # Schedule the block to be executed in a fiber. # # @returns [Fiber] The fiber. def schedule @fiber ||= Fiber.schedule do @block.call(@stream) end end # Read from the output stream (may block). def read @output.read end # Close the output stream. # # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close(error = nil) @output.close_write(error) end end # Raised when a streaming body is consumed more than once. class ConsumedError < StandardError end # A streaming body that can be read from and written to. class Body < Readable # Initialize the body with the given block and input. # # @parameter block [Proc] The block that generates the body. # @parameter input [Readable] The input stream, if known. def initialize(block, input = nil) @block = block @input = input @output = nil end # @returns [Boolean] Whether the body can be streamed, which is true. def stream? true end # Invokes the block in a fiber which yields chunks when they are available. def read # We are reading chunk by chunk, allocate an output stream and execute the block to generate the chunks: if @output.nil? if @block.nil? raise ConsumedError, "Streaming body has already been consumed!" end @output = Output.schedule(@input, @block) @block = nil end @output.read end # Invoke the block with the given stream. The block can read and write to the stream, and must close the stream when finishing. # # @parameter stream [Stream] The stream to read and write to. def call(stream) if @block.nil? raise ConsumedError, "Streaming block has already been consumed!" end block = @block @input = @output = @block = nil # Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream. block.call(stream) rescue => error # If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here: stream.close raise end # Close the input. The streaming body will eventually read all the input. # # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close_input(error = nil) if input = @input @input = nil input.close(error) end end # Close the output, the streaming body will be unable to write any more output. # # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close_output(error = nil) @output&.close(error) end end # A response body is used on the server side to generate the response body using a block. class ResponseBody < Body # Close will be invoked when all the output is written. def close(error = nil) self.close_output(error) end end # A request body is used on the client side to generate the request body using a block. # # As the response body isn't available until the request is sent, the response body must be {stream}ed into the request body. class RequestBody < Body # Initialize the request body with the given block. # # @parameter block [Proc] The block that generates the body. def initialize(block) super(block, Writable.new) end # Close will be invoked when all the input is read. def close(error = nil) self.close_input(error) end # Stream the response body into the block's input. def stream(body) body&.each do |chunk| @input.write(chunk) end rescue => error ensure @input.close_write(error) end end end end end end