class IO::Stream::Generic
def <<(string)
def <<(string) write(string) return self end
def close
def close return if closed? begin flush rescue # We really can't do anything here unless we want #close to raise exceptions. ensure self.sysclose end end
def close_read
def close_read end
def close_write
def close_write flush end
def closed?
def closed? false end
def consume_read_buffer(size = nil)
Consumes at most `size` bytes from the buffer.
def consume_read_buffer(size = nil) # If we are at eof, and the read buffer is empty, we can't consume anything. return nil if @eof && @read_buffer.empty? result = nil if size.nil? or size >= @read_buffer.bytesize # Consume the entire read buffer: result = @read_buffer @read_buffer = StringBuffer.new else # This approach uses more memory. # result = @read_buffer.slice!(0, size) # We know that we are not going to reuse the original buffer. # But byteslice will generate a hidden copy. So let's freeze it first: @read_buffer.freeze result = @read_buffer.byteslice(0, size) @read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize) end return result end
def drain(buffer)
def drain(buffer) te(buffer) he write operation fails, we still need to clear this buffer, and the data is essentially lost. .clear
def eof!
def eof! @read_buffer.clear @eof = true raise EOFError end
def eof?
See {readable?} for a non-blocking alternative.
Determins if the stream has consumed all available data. May block if the stream is not readable.
def eof? if !@read_buffer.empty? return false elsif @eof return true else return !self.fill_read_buffer end end
def fill_read_buffer(size = @block_size)
def fill_read_buffer(size = @block_size) # We impose a limit because the underlying `read` system call can fail if we request too much data in one go. if size > @maximum_read_size size = @maximum_read_size end # This effectively ties the input and output stream together. flush if @read_buffer.empty? if sysread(size, @read_buffer) # Console.info(self, name: "read") {@read_buffer.inspect} return true end else if chunk = sysread(size, @input_buffer) @read_buffer << chunk # Console.info(self, name: "read") {@read_buffer.inspect} return true end end # else for both cases above: @eof = true return false end
def flush
def flush return if @write_buffer.empty? @writing.synchronize do self.drain(@write_buffer) end end
def gets(separator = $/, limit = nil, chomp: false)
def gets(separator = $/, limit = nil, chomp: false) # Compatibility with IO#gets: if separator.is_a?(Integer) limit = separator separator = $/ end # We don't want to split in the middle of the separator, so we subtract the size of the separator from the start of the search: split_offset = separator.bytesize - 1 offset = 0 until index = @read_buffer.index(separator, offset) offset = @read_buffer.bytesize - split_offset offset = 0 if offset < 0 # If a limit was given, and the offset is beyond the limit, we should return up to the limit: if limit and offset >= limit # As we didn't find the separator, there is nothing to chomp either. return consume_read_buffer(limit) end # If we can't read any more data, we should return what we have: return consume_read_buffer unless fill_read_buffer end # If the index of the separator was beyond the limit: if limit and index >= limit # Return up to the limit: return consume_read_buffer(limit) end # Freeze the read buffer, as this enables us to use byteslice without generating a hidden copy: @read_buffer.freeze line = @read_buffer.byteslice(0, index+(chomp ? 0 : separator.bytesize)) @read_buffer = @read_buffer.byteslice(index+separator.bytesize, @read_buffer.bytesize) return line end
def index_of(pattern, offset, limit)
def index_of(pattern, offset, limit) n't want to split on the pattern, so we subtract the size of the pattern. ffset = pattern.bytesize - 1 ndex = @read_buffer.index(pattern, offset) = @read_buffer.bytesize - split_offset = 0 if offset < 0 nil if limit and offset >= limit nil unless fill_read_buffer index
def initialize(block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE)
def initialize(block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE) @eof = false @writing = ::Thread::Mutex.new @block_size = block_size @maximum_read_size = maximum_read_size @read_buffer = StringBuffer.new @write_buffer = StringBuffer.new # Used as destination buffer for underlying reads. @input_buffer = StringBuffer.new end
def peek(size = nil)
def peek(size = nil) if size until @eof or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end return @read_buffer[..([size, @read_buffer.size].min - 1)] end until (block_given? && yield(@read_buffer)) or @eof fill_read_buffer end return @read_buffer end
def puts(*arguments, separator: $/)
def puts(*arguments, separator: $/) return if arguments.empty? @writing.synchronize do arguments.each do |argument| @write_buffer << argument << separator end self.drain(@write_buffer) end end
def read(size = nil)
def read(size = nil) return String.new(encoding: Encoding::BINARY) if size == 0 if size until @eof or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end
def read_exactly(size, exception: EOFError)
def read_exactly(size, exception: EOFError) if buffer = read(size) if buffer.bytesize != size raise exception, "could not read enough data" end return buffer end raise exception, "encountered eof while reading data" end
def read_partial(size = nil)
def read_partial(size = nil) return String.new(encoding: Encoding::BINARY) if size == 0 if !@eof and @read_buffer.empty? fill_read_buffer end return consume_read_buffer(size) end
def read_until(pattern, offset = 0, limit: nil, chomp: true)
@parameter limit [Integer] The maximum number of bytes to read, including the pattern (even if chomped).
@parameter offset [Integer] The offset to start searching from.
@parameter pattern [String] The pattern to match.
Efficiently read data from the stream until encountering pattern.
def read_until(pattern, offset = 0, limit: nil, chomp: true) if index = index_of(pattern, offset, limit) return nil if limit and index >= limit @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end end
def readable?
Whether there is a chance that a read operation will succeed or not.
def readable? # If we are at the end of the file, we can't read any more data: if @eof return false end # If the read buffer is not empty, we can read more data: if !@read_buffer.empty? return true end # If the underlying stream is readable, we can read more data: return !closed? end
def readpartial(size = nil)
def readpartial(size = nil) read_partial(size) or raise EOFError, "Encountered eof while reading data!" end
def sysclose
def sysclose raise NotImplementedError end
def sysread(size, buffer)
def sysread(size, buffer) raise NotImplementedError end
def syswrite(buffer)
def syswrite(buffer) raise NotImplementedError end
def write(string, flush: false)
@parameter string [String] the string to write to the buffer.
buffer is flushed to the underlying `io`.
Writes `string` to the buffer. When the buffer is full or #sync is true the
def write(string, flush: false) @writing.synchronize do @write_buffer << string flush |= (@write_buffer.bytesize >= @block_size) if flush self.drain(@write_buffer) end end return string.bytesize end