lib/protocol/rack/body/streaming.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2022, by Samuel Williams.

require 'protocol/http/body/readable'
require 'protocol/http/body/stream'

module Protocol
	module Rack
		module Body
			# Wraps a streaming response body into a compatible Protocol::HTTP body.
			class Streaming < ::Protocol::HTTP::Body::Readable
				def initialize(block, input = nil)
					@block = block
					@input = input
					@output = nil
				end
				
				attr :block
				
				class Output
					def initialize(input, block)
						stream = ::Protocol::HTTP::Body::Stream.new(input, self)
						
						@from = nil
						
						@fiber = Fiber.new do |from|
							@from = from
							block.call(stream)
							@fiber = nil
						end
					end
					
					def write(chunk)
						if from = @from
							@from = nil
							@from = from.transfer(chunk)
						else
							raise RuntimeError, "Stream is not being read!"
						end
					end
					
					def close(error = nil)
						@fiber = nil
						
						if from = @from
							@from = nil
							if error
								from.raise(error)
							else
								from.transfer(nil)
							end
						end
					end
					
					def close_write(error = nil)
						close(error)
					end
					
					def read
						raise RuntimeError, "Stream is already being read!" if @from
						
						@fiber&.transfer(Fiber.current)
					end
				end
				
				# Invokes the block in a fiber which yields chunks when they are available.
				def read
					@output ||= Output.new(@input, @block)
					return @output.read
				end
				
				def stream?
					true
				end
				
				def call(stream)
					raise "Streaming body has already been read!" if @output
					@block.call(stream)
				end
			end
		end
	end
end