# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require_relative'buffer'require_relative'generic'moduleAsyncmoduleIOclassStream# The default block size for IO buffers.# BLOCK_SIZE = ENV.fetch('BLOCK_SIZE', 1024*16).to_iBLOCK_SIZE=1024*8definitialize(io,block_size: BLOCK_SIZE,sync: true)@io=io@eof=false# We don't want Ruby to do any IO buffering.@io.sync=sync@block_size=block_size@read_buffer=Buffer.new@write_buffer=Buffer.new# Used as destination buffer for underlying reads.@input_buffer=Buffer.newendattr:ioattr:block_size# Reads `size` bytes from the stream. If size is not specified, read until end of file.defread(size=nil)return''ifsize==0ifsizeuntil@eofor@read_buffer.size>=size# Compute the amount of data we need to read from the underlying stream:read_size=size-@read_buffer.bytesize# Don't read less than @block_size to avoid lots of small reads:fill_read_buffer(read_size>@block_size?read_size:@block_size)endelseuntil@eoffill_read_bufferendendreturnconsume_read_buffer(size)end# Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible.defread_partial(size=nil)return''ifsize==0if@read_buffer.empty?and!@eoffill_read_bufferendreturnconsume_read_buffer(size)end# Efficiently read data from the stream until encountering pattern.# @param pattern [String] The pattern to match.# @return [String] The contents of the stream up until the pattern, which is consumed but not returned.defread_until(pattern,offset=0)untilindex=@read_buffer.index(pattern,offset)offset=@read_buffer.sizereturnunlessfill_read_bufferend@read_buffer.freezematched=@read_buffer.byteslice(0,index)@read_buffer=@read_buffer.byteslice(index+pattern.bytesize,@read_buffer.bytesize)returnmatchedenddefpeekuntilyield(@read_buffer)or@eoffill_read_bufferendend# Writes `string` to the buffer. When the buffer is full or #sync is true the# buffer is flushed to the underlying `io`.# @param string the string to write to the buffer.# @return the number of bytes appended to the buffer.defwrite(string)if@write_buffer.empty?andstring.bytesize>=@block_sizesyswrite(string)else@write_buffer<<stringif@write_buffer.size>=@block_sizesyswrite(@write_buffer)@write_buffer.clearendendreturnstring.bytesizeend# Writes `string` to the stream and returns self.def<<(string)write(string)returnselfend# Flushes buffered data to the stream.defflushunless@write_buffer.empty?syswrite(@write_buffer)@write_buffer.clearendenddefgets(separator=$/)flushread_until(separator)enddefputs(*args,separator: $/)args.eachdo|arg|@write_buffer<<arg<<separatorendflushenddefconnected?@io.connected?enddefclosed?@io.closed?end# Closes the stream and flushes any unwritten data.defclosereturnif@io.closed?beginflushrescue# We really can't do anything here unless we want #close to raise exceptions.Async.logger.error(self){$!}ensure@io.closeendend# Returns true if the stream is at file which means there is no more data to be read.defeof?fill_read_bufferif!@eof&&@read_buffer.empty?return@eof&&@read_buffer.empty?endaliaseofeof?defeof!@read_buffer.clear@eof=trueraiseEOFErrorendprivate# Fills the buffer from the underlying stream.deffill_read_buffer(size=@block_size)if@read_buffer.empty?and@io.read(size,@read_buffer)returntrueelsifchunk=@io.read(size,@input_buffer)@read_buffer<<chunkreturntrueelse# We didn't read anything, so we must be at eof:@eof=truereturnfalseendend# Consumes at most `size` bytes from the buffer.# @param size [Integer|nil] The amount of data to consume. If nil, consume entire buffer.defconsume_read_buffer(size=nil)# If we are at eof, and the read buffer is empty, we can't consume anything.returnnilif@eof&&@read_buffer.empty?result=nilifsize.nil?orsize>=@read_buffer.size# Consume the entire read buffer:result=@read_buffer@read_buffer=Buffer.newelse# This approach uses more memory.# result = @read_buffer.slice!(0, size)# We know that we are not going to reuse the original buffer.# But byteslice will generate a hidden copy. So let's freeze it first:@read_buffer.freezeresult=@read_buffer.byteslice(0,size)@read_buffer=@read_buffer.byteslice(size,@read_buffer.bytesize)endreturnresultend# Write a buffer to the underlying stream.# @param buffer [String] The string to write, any encoding is okay.defsyswrite(buffer)remaining=buffer.bytesize# Fast path:written=@io.write(buffer)returnifwritten==remaining# Slow path:remaining-=writtenwhileremaining>0wrote=@io.write(buffer.byteslice(written,remaining))remaining-=wrotewritten+=wroteendreturnwrittenendendendend