# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.
require_relative "string_buffer"
module IO::Stream
# The default block size for IO buffers. Defaults to 256KB (optimized for modern SSDs and networks).
BLOCK_SIZE = ENV.fetch("IO_STREAM_BLOCK_SIZE", 1024*256).to_i
# The minimum read size for efficient I/O operations. Defaults to the same as BLOCK_SIZE.
MINIMUM_READ_SIZE = ENV.fetch("IO_STREAM_MINIMUM_READ_SIZE", BLOCK_SIZE).to_i
# The maximum read size for a single read operation. This limit exists because:
# 1. System calls like read() cannot handle requests larger than SSIZE_MAX
# 2. Very large reads can cause memory pressure and poor interactive performance
# 3. Most socket buffers and pipe capacities are much smaller anyway
# On 64-bit systems SSIZE_MAX is ~8.8 million MB, on 32-bit it's ~2GB.
# Our default of 16MB provides a good balance of throughput and responsiveness, and is page aligned.
# It is also a multiple of the minimum read size, so that we can read in chunks without exceeding the maximum.
MAXIMUM_READ_SIZE = ENV.fetch("IO_STREAM_MAXIMUM_READ_SIZE", MINIMUM_READ_SIZE * 64).to_i
# A module providing readable stream functionality.
#
# You must implement the `sysread` method to read data from the underlying IO.
module Readable
# Initialize readable stream functionality.
# @parameter minimum_read_size [Integer] The minimum size for read operations.
# @parameter maximum_read_size [Integer] The maximum size for read operations.
# @parameter block_size [Integer] Legacy parameter, use minimum_read_size instead.
def initialize(minimum_read_size: MINIMUM_READ_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, block_size: nil, **, &block)
@finished = false
@read_buffer = StringBuffer.new
# Used as destination buffer for underlying reads.
@input_buffer = StringBuffer.new
# Support legacy block_size parameter for backwards compatibility
@minimum_read_size = block_size || minimum_read_size
@maximum_read_size = maximum_read_size
super(**, &block) if defined?(super)
end
attr_accessor :minimum_read_size
# Legacy accessor for backwards compatibility
# @returns [Integer] The minimum read size.
def block_size
@minimum_read_size
end
# Legacy setter for backwards compatibility
# @parameter value [Integer] The minimum read size.
def block_size=(value)
@minimum_read_size = value
end
# Read data from the stream.
# @parameter size [Integer | Nil] The number of bytes to read. If nil, read until end of stream.
# @parameter buffer [String | Nil] An optional buffer to fill with data instead of allocating a new string.
# @returns [String] The data read from the stream, or the provided buffer filled with data.
def read(size = nil, buffer = nil)
if size == 0
if buffer
buffer.clear
buffer.force_encoding(Encoding::BINARY)
return buffer
else
return String.new(encoding: Encoding::BINARY)
end
end
if size
until @finished or @read_buffer.bytesize >= size
# Compute the amount of data we need to read from the underlying stream:
read_size = size - @read_buffer.bytesize
# Don't read less than @minimum_read_size to avoid lots of small reads:
fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size)
end
else
until @finished
fill_read_buffer
end
if buffer
buffer.replace(@read_buffer)
@read_buffer.clear
else
buffer = @read_buffer
@read_buffer = StringBuffer.new
end
# Read without size always returns a non-nil value, even if it is an empty string.
return buffer
end
return consume_read_buffer(size, buffer)
end
# Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible.
# @parameter size [Integer | Nil] The number of bytes to read. If nil, read all available data.
# @parameter buffer [String | Nil] An optional buffer to fill with data instead of allocating a new string.
# @returns [String] The data read from the stream, or the provided buffer filled with data.
def read_partial(size = nil, buffer = nil)
if size == 0
if buffer
buffer.clear
buffer.force_encoding(Encoding::BINARY)
return buffer
else
return String.new(encoding: Encoding::BINARY)
end
end
if !@finished and @read_buffer.empty?
fill_read_buffer
end
return consume_read_buffer(size, buffer)
end
# Read exactly the specified number of bytes.
# @parameter size [Integer] The number of bytes to read.
# @parameter exception [Class] The exception to raise if not enough data is available.
# @returns [String] The data read from the stream.
def read_exactly(size, buffer = nil, exception: EOFError)
if buffer = read(size, buffer)
if buffer.bytesize != size
raise exception, "Could not read enough data!"
end
return buffer
end
raise exception, "Stream finished before reading enough data!"
end
# This is a compatibility shim for existing code that uses `readpartial`.
# @parameter size [Integer | Nil] The number of bytes to read.
# @parameter buffer [String | Nil] An optional buffer to fill with data instead of allocating a new string.
# @returns [String] The data read from the stream.
def readpartial(size = nil, buffer = nil)
read_partial(size, buffer) or raise EOFError, "Stream finished before reading enough data!"
end
# Find the index of a pattern in the read buffer, reading more data if needed.
# @parameter pattern [String] The pattern to search for.
# @parameter offset [Integer] The offset to start searching from.
# @parameter limit [Integer | Nil] The maximum number of bytes to read while searching.
# @returns [Integer | Nil] The index of the pattern, or nil if not found.
private def index_of(pattern, offset, limit, discard = false)
# We don't want to split on the pattern, so we subtract the size of the pattern.
split_offset = pattern.bytesize - 1
until index = @read_buffer.index(pattern, offset)
offset = @read_buffer.bytesize - split_offset
offset = 0 if offset < 0
if limit and offset >= limit
return nil
end
unless fill_read_buffer
return nil
end
if discard
# If we are discarding, we should consume the read buffer up to the offset:
consume_read_buffer(offset)
offset = 0
end
end
return index
end
# Efficiently read data from the stream until encountering pattern.
# @parameter pattern [String] The pattern to match.
# @parameter offset [Integer] The offset to start searching from.
# @parameter limit [Integer] The maximum number of bytes to read, including the pattern (even if chomped).
# @parameter chomp [Boolean] Whether to remove the pattern from the returned data.
# @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found.
def read_until(pattern, offset = 0, limit: nil, chomp: true)
if index = index_of(pattern, offset, limit)
return nil if limit and index >= limit
@read_buffer.freeze
matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
@read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
return matched
end
end
# Efficiently discard data from the stream until encountering pattern.
# @parameter pattern [String] The pattern to match.
# @parameter offset [Integer] The offset to start searching from.
# @parameter limit [Integer] The maximum number of bytes to read, including the pattern.
# @returns [String | Nil] The contents of the stream up until the pattern, or nil if the pattern was not found.
def discard_until(pattern, offset = 0, limit: nil)
if index = index_of(pattern, offset, limit, true)
@read_buffer.freeze
if limit and index >= limit
@read_buffer = @read_buffer.byteslice(limit, @read_buffer.bytesize)
return nil
end
matched = @read_buffer.byteslice(0, index+pattern.bytesize)
@read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
return matched
end
end
# Peek at data in the buffer without consuming it.
# @parameter size [Integer | Nil] The number of bytes to peek at. If nil, peek at all available data.
# @returns [String] The data in the buffer without consuming it.
def peek(size = nil)
if size
until @finished or @read_buffer.bytesize >= size
# Compute the amount of data we need to read from the underlying stream:
read_size = size - @read_buffer.bytesize
# Don't read less than @minimum_read_size to avoid lots of small reads:
fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size)
end
return @read_buffer[..([size, @read_buffer.size].min - 1)]
end
until (block_given? && yield(@read_buffer)) or @finished
fill_read_buffer
end
return @read_buffer
end
# Read a line from the stream, similar to IO#gets.
# @parameter separator [String] The line separator to search for.
# @parameter limit [Integer | Nil] The maximum number of bytes to read.
# @parameter chomp [Boolean] Whether to remove the separator from the returned line.
# @returns [String | Nil] The line read from the stream, or nil if at end of stream.
def gets(separator = $/, limit = nil, chomp: false)
# Compatibility with IO#gets:
if separator.is_a?(Integer)
limit = separator
separator = $/
end
# We don't want to split in the middle of the separator, so we subtract the size of the separator from the start of the search:
split_offset = separator.bytesize - 1
offset = 0
until index = @read_buffer.index(separator, offset)
offset = @read_buffer.bytesize - split_offset
offset = 0 if offset < 0
# If a limit was given, and the offset is beyond the limit, we should return up to the limit:
if limit and offset >= limit
# As we didn't find the separator, there is nothing to chomp either.
return consume_read_buffer(limit)
end
# If we can't read any more data, we should return what we have:
return consume_read_buffer unless fill_read_buffer
end
# If the index of the separator was beyond the limit:
if limit and index >= limit
# Return up to the limit:
return consume_read_buffer(limit)
end
# Freeze the read buffer, as this enables us to use byteslice without generating a hidden copy:
@read_buffer.freeze
line = @read_buffer.byteslice(0, index+(chomp ? 0 : separator.bytesize))
@read_buffer = @read_buffer.byteslice(index+separator.bytesize, @read_buffer.bytesize)
return line
end
# Determins if the stream has consumed all available data. May block if the stream is not readable.
# See {readable?} for a non-blocking alternative.
#
# @returns [Boolean] If the stream is at file which means there is no more data to be read.
def finished?
if !@read_buffer.empty?
return false
elsif @finished
return true
else
return !self.fill_read_buffer
end
end
alias eof? finished?
# Mark the stream as finished and raise `EOFError`.
def finish!
@read_buffer.clear
@finished = true
raise EOFError
end
alias eof! finish!
# Whether there is a chance that a read operation will succeed or not.
# @returns [Boolean] If the stream is readable, i.e. a `read` operation has a chance of success.
def readable?
# If we are at the end of the file, we can't read any more data:
if @finished
return false
end
# If the read buffer is not empty, we can read more data:
if !@read_buffer.empty?
return true
end
# If the underlying stream is readable, we can read more data:
return !closed?
end
# Close the read end of the stream.
def close_read
end
private
# Fills the buffer from the underlying stream.
def fill_read_buffer(size = @minimum_read_size)
# Limit the read size to avoid exceeding SSIZE_MAX and to manage memory usage.
# Very large reads can also hurt interactive performance by blocking for too long.
if size > @maximum_read_size
size = @maximum_read_size
end
# This effectively ties the input and output stream together.
flush
if @read_buffer.empty?
if sysread(size, @read_buffer)
# Console.info(self, name: "read") {@read_buffer.inspect}
return true
end
else
if chunk = sysread(size, @input_buffer)
@read_buffer << chunk
# Console.info(self, name: "read") {@read_buffer.inspect}
return true
end
end
# else for both cases above:
@finished = true
return false
end
# Consumes at most `size` bytes from the buffer.
# @parameter size [Integer | Nil] The amount of data to consume. If nil, consume entire buffer.
# @parameter buffer [String | Nil] An optional buffer to fill with data instead of allocating a new string.
# @returns [String | Nil] The consumed data, or nil if no data available.
def consume_read_buffer(size = nil, buffer = nil)
# If we are at finished, and the read buffer is empty, we can't consume anything.
if @finished && @read_buffer.empty?
# Clear the buffer even when returning nil
if buffer
buffer.clear
buffer.force_encoding(Encoding::BINARY)
end
return nil
end
result = nil
if size.nil? or size >= @read_buffer.bytesize
# Consume the entire read buffer:
if buffer
buffer.clear
buffer << @read_buffer
result = buffer
else
result = @read_buffer
end
@read_buffer = StringBuffer.new
else
# We know that we are not going to reuse the original buffer.
# But byteslice will generate a hidden copy. So let's freeze it first:
@read_buffer.freeze
if buffer
# Use replace instead of clear + << for better performance
buffer.replace(@read_buffer.byteslice(0, size))
result = buffer
else
result = @read_buffer.byteslice(0, size)
end
@read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize)
end
return result
end
end
end