lib/protocol/http/body/stream.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2024, by Samuel Williams.
# Copyright, 2023, by Genki Takiuchi.

require_relative "buffered"

module Protocol
	module HTTP
		module Body
			# The input stream is an IO-like object which contains the raw HTTP POST data. When applicable, its external encoding must be “ASCII-8BIT” and it must be opened in binary mode, for Ruby 1.9 compatibility. The input stream must respond to gets, each, read and rewind.
			class Stream
				# The default line separator, used by {gets}.
				NEWLINE = "\n"
				
				# Initialize the stream with the given input and output.
				#
				# @parameter input [Readable] The input stream.
				# @parameter output [Writable] The output stream.
				def initialize(input = nil, output = Buffered.new)
					@input = input
					@output = output
					
					raise ArgumentError, "Non-writable output!" unless output.respond_to?(:write)
					
					# Will hold remaining data in `#read`.
					@buffer = nil
					
					@closed = false
					@closed_read = false
				end
				
				# @attribute [Readable] The input stream.
				attr :input
				
				# @attribute [Writable] The output stream.
				attr :output
				
				# This provides a read-only interface for data, which is surprisingly tricky to implement correctly.
				module Reader
					# Read data from the underlying stream.
					#
					# If given a non-negative length, it will read at most that many bytes from the stream. If the stream is at EOF, it will return nil.
					#
					# If the length is not given, it will read all data until EOF, or return an empty string if the stream is already at EOF.
					#
					# If buffer is given, then the read data will be placed into buffer instead of a newly created String object.
					#
					# @parameterlength [Integer] the amount of data to read
					# @parameter buffer [String] the buffer which will receive the data
					# @returns [String] a buffer containing the data
					def read(length = nil, buffer = nil)
						return "" if length == 0
						
						buffer ||= String.new.force_encoding(Encoding::BINARY)
						
						# Take any previously buffered data and replace it into the given buffer.
						if @buffer
							buffer.replace(@buffer)
							@buffer = nil
						else
							buffer.clear
						end
						
						if length
							while buffer.bytesize < length and chunk = read_next
								buffer << chunk
							end
							
							# This ensures the subsequent `slice!` works correctly.
							buffer.force_encoding(Encoding::BINARY)
							
							# This will be at least one copy:
							@buffer = buffer.byteslice(length, buffer.bytesize)
							
							# This should be zero-copy:
							buffer.slice!(length, buffer.bytesize)
							
							if buffer.empty?
								return nil
							else
								return buffer
							end
						else
							while chunk = read_next
								buffer << chunk
							end
							
							return buffer
						end
					end
					
					# Read some bytes from the stream.
					#
					# If the length is given, at most length bytes will be read. Otherwise, one chunk of data from the underlying stream will be read.
					#
					# Will avoid reading from the underlying stream if there is buffered data available.
					#
					# @parameter length [Integer] The maximum number of bytes to read.
					def read_partial(length = nil, buffer = nil)
						if @buffer
							if buffer
								buffer.replace(@buffer)
							else
								buffer = @buffer
							end
							@buffer = nil
						else
							if chunk = read_next
								if buffer
									buffer.replace(chunk)
								else
									buffer = chunk
								end
							else
								buffer&.clear
								buffer = nil
							end
						end
						
						if buffer and length
							if buffer.bytesize > length
								# This ensures the subsequent `slice!` works correctly.
								buffer.force_encoding(Encoding::BINARY)

								@buffer = buffer.byteslice(length, buffer.bytesize)
								buffer.slice!(length, buffer.bytesize)
							end
						end
						
						return buffer
					end
					
					# Similar to {read_partial} but raises an `EOFError` if the stream is at EOF.
					#
					# @parameter length [Integer] The maximum number of bytes to read.
					# @parameter buffer [String] The buffer to read into.
					def readpartial(length, buffer = nil)
						read_partial(length, buffer) or raise EOFError, "End of file reached!"
					end
					
					# Iterate over each chunk of data from the input stream.
					#
					# @yields {|chunk| ...} Each chunk of data.
					def each(&block)
						return to_enum unless block_given?
						
						if @buffer
							yield @buffer
							@buffer = nil
						end
						
						while chunk = read_next
							yield chunk
						end
					end
					
					# Read data from the stream without blocking if possible.
					#
					# @parameter length [Integer] The maximum number of bytes to read.
					# @parameter buffer [String | Nil] The buffer to read into.
					def read_nonblock(length, buffer = nil, exception: nil)
						@buffer ||= read_next
						chunk = nil
						
						unless @buffer
							buffer&.clear
							return
						end
						
						if @buffer.bytesize > length
							chunk = @buffer.byteslice(0, length)
							@buffer = @buffer.byteslice(length, @buffer.bytesize)
						else
							chunk = @buffer
							@buffer = nil
						end
						
						if buffer
							buffer.replace(chunk)
						else
							buffer = chunk
						end
						
						return buffer
					end
					
					# Read data from the stream until encountering pattern.
					#
					# @parameter pattern [String] The pattern to match.
					# @parameter offset [Integer] The offset to start searching from.
					# @parameter chomp [Boolean] Whether to remove the pattern from the returned data.
					# @returns [String] The contents of the stream up until the pattern, which is consumed but not returned.
					def read_until(pattern, offset = 0, chomp: false)
						# We don't want to split on the pattern, so we subtract the size of the pattern.
						split_offset = pattern.bytesize - 1
						
						@buffer ||= read_next
						return nil if @buffer.nil?
						
						until index = @buffer.index(pattern, offset)
							offset = @buffer.bytesize - split_offset
							
							offset = 0 if offset < 0
							
							if chunk = read_next
								@buffer << chunk
							else
								return nil
							end
						end
						
						@buffer.freeze
						matched = @buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
						@buffer = @buffer.byteslice(index+pattern.bytesize, @buffer.bytesize)
						
						return matched
					end
					
					# Read a single line from the stream.
					#
					# @parameter separator [String] The line separator, defaults to `\n`.
					# @parameter limit [Integer] The maximum number of bytes to read.
					# @parameter *options [Hash] Additional options, passed to {read_until}.
					def gets(separator = NEWLINE, limit = nil, chomp: false)
						# If the separator is an integer, it is actually the limit:
						if separator.is_a?(Integer)
							limit = separator
							separator = NEWLINE
						end
						
						# If no separator is given, this is the same as a read operation:
						if separator.nil?
							# I tried using `read(limit)` here but it will block until the limit is reached, which is not usually desirable behaviour.
							return read_partial(limit)
						end
						
						# We don't want to split on the separator, so we subtract the size of the separator:
						split_offset = separator.bytesize - 1
						
						@buffer ||= read_next
						return nil if @buffer.nil?
						
						offset = 0
						until index = @buffer.index(separator, offset)
							offset = @buffer.bytesize - split_offset
							offset = 0 if offset < 0
							
							# If we have gone past the limit, we are done:
							if limit and offset >= limit
								@buffer.freeze
								matched = @buffer.byteslice(0, limit)
								@buffer = @buffer.byteslice(limit, @buffer.bytesize)
								return matched
							end
							
							# Read more data:
							if chunk = read_next
								@buffer << chunk
							else
								# No more data could be read, return the remaining data:
								buffer = @buffer
								@buffer = nil
								
								return @buffer
							end
						end
						
						# Freeze the buffer, as this enables us to use byteslice without generating a hidden copy:
						@buffer.freeze
						
						if limit and index > limit
							line = @buffer.byteslice(0, limit)
							@buffer = @buffer.byteslice(limit, @buffer.bytesize)
						else
							line = @buffer.byteslice(0, index+(chomp ? 0 : separator.bytesize))
							@buffer = @buffer.byteslice(index+separator.bytesize, @buffer.bytesize)
						end
						
						return line
					end
				end
				
				include Reader
				
				# Write data to the underlying stream.
				#
				# @parameter buffer [String] The data to write.
				# @raises [IOError] If the stream is not writable.
				# @returns [Integer] The number of bytes written.
				def write(buffer)
					if @output
						@output.write(buffer)
						return buffer.bytesize
					else
						raise IOError, "Stream is not writable, output has been closed!"
					end
				end
				
				# Write data to the stream using {write}.
				#
				# Provided for compatibility with IO-like objects.
				#
				# @parameter buffer [String] The data to write.
				# @parameter exception [Boolean] Whether to raise an exception if the write would block, currently ignored.
				# @returns [Integer] The number of bytes written.
				def write_nonblock(buffer, exception: nil)
					write(buffer)
				end
				
				# Write data to the stream using {write}.
				def <<(buffer)
					write(buffer)
				end
				
				# Write lines to the stream.
				#
				# The current implementation buffers the lines and writes them in a single operation.
				#
				# @parameter arguments [Array(String)] The lines to write.
				# @parameter separator [String] The line separator, defaults to `\n`.
				def puts(*arguments, separator: NEWLINE)
					buffer = ::String.new
					
					arguments.each do |argument|
						buffer << argument << separator
					end
					
					write(buffer)
				end
				
				# Flush the output stream.
				#
				# This is currently a no-op.
				def flush
				end
				
				# Close the input body.
				#
				# If, while processing the data that was read from this stream, an error is encountered, it should be passed to this method.
				#
				# @parameter error [Exception | Nil] The error that was encountered, if any.
				def close_read(error = nil)
					if input = @input
						@input = nil
						@closed_read = true
						@buffer = nil
						
						input.close(error)
					end
				end
				
				# Close the output body.
				#
				# If, while generating the data that is written to this stream, an error is encountered, it should be passed to this method.
				#
				# @parameter error [Exception | Nil] The error that was encountered, if any.
				def close_write(error = nil)
					if output = @output
						@output = nil
						
						output.close_write(error)
					end
				end
				
				# Close the input and output bodies.
				#
				# @parameter error [Exception | Nil] The error that caused this stream to be closed, if any.
				def close(error = nil)
					self.close_read(error)
					self.close_write(error)
					
					return nil
				ensure
					@closed = true
				end
				
				# @returns [Boolean] Whether the stream has been closed.
				def closed?
					@closed
				end
				
				# @returns [Boolean] Whether there are any output chunks remaining.
				def empty?
					@output.empty?
				end
				
				private
				
				# Read the next chunk of data from the input stream.
				#
				# @returns [String] The next chunk of data.
				# @raises [IOError] If the input stream was explicitly closed.
				def read_next
					if @input
						return @input.read
					elsif @closed_read
						raise IOError, "Stream is not readable, input has been closed!"
					end
				end
			end
		end
	end
end