class PhusionPassenger::MessageChannel
see that file for more information.
I/O channels. This is the Ruby implementation of ext/common/Utils/MessageIO.h;
This class allows reading and writing structured messages over
def check_argument(arg)
def check_argument(arg) if arg.to_s.index(DELIMITER) raise ArgumentError, "Message name and arguments may not contain #{DELIMITER_NAME}." end end
def close
Close the underlying IO stream. Might raise SystemCallError or
def close @io.close end
def closed?
def closed? return @io.closed? end
def fileno
def fileno return @io.fileno end
def initialize(io = nil)
def initialize(io = nil) @io = io # Make it binary just in case. @io.binmode if @io end
def new_buffer
def new_buffer return ByteString.new end
def new_buffer
def new_buffer return "" end
def read
Might raise SystemCallError, IOError or SocketError when something
been reached.
Returns the array message as an array, or nil when end-of-stream has
Read an array message from the underlying file descriptor.
def read buffer = new_buffer if !@io.read(HEADER_SIZE, buffer) return nil end while buffer.size < HEADER_SIZE tmp = @io.read(HEADER_SIZE - buffer.size) if tmp.empty? return nil else buffer << tmp end end chunk_size = buffer.unpack(UINT16_PACK_FORMAT)[0] if !@io.read(chunk_size, buffer) return nil end while buffer.size < chunk_size tmp = @io.read(chunk_size - buffer.size) if tmp.empty? return nil else buffer << tmp end end message = [] offset = 0 delimiter_pos = buffer.index(DELIMITER, offset) while !delimiter_pos.nil? if delimiter_pos == 0 message << "" else message << buffer[offset .. delimiter_pos - 1] end offset = delimiter_pos + 1 delimiter_pos = buffer.index(DELIMITER, offset) end return message rescue Errno::ECONNRESET return nil end
def read_hash
Might raise SystemCallError, IOError or SocketError when something
Returns nil when end-of-stream has been reached.
has an even number of elements.
result as a hash instead of an array. This assumes that the array message
Read an array message from the underlying file descriptor and return the
def read_hash buffer = new_buffer if !@io.read(HEADER_SIZE, buffer) return nil end while buffer.size < HEADER_SIZE tmp = @io.read(HEADER_SIZE - buffer.size) if tmp.empty? return nil else buffer << tmp end end chunk_size = buffer.unpack(UINT16_PACK_FORMAT)[0] if !@io.read(chunk_size, buffer) return nil end while buffer.size < chunk_size tmp = @io.read(chunk_size - buffer.size) if tmp.empty? return nil else buffer << tmp end end result = {} offset = 0 delimiter_pos = buffer.index(DELIMITER, offset) while !delimiter_pos.nil? if delimiter_pos == 0 name = "" else name = buffer[offset .. delimiter_pos - 1] end offset = delimiter_pos + 1 delimiter_pos = buffer.index(DELIMITER, offset) if delimiter_pos.nil? raise InvalidHashError elsif delimiter_pos == 0 value = "" else value = buffer[offset .. delimiter_pos - 1] end result[name] = value offset = delimiter_pos + 1 delimiter_pos = buffer.index(DELIMITER, offset) end return result rescue Errno::ECONNRESET return nil end
def read_scalar(buffer = new_buffer, max_size = nil)
size for the scalar message. If the received scalar message's size
The +max_size+ argument allows one to specify the maximum allowed
in order to minimize stress on the garbage collector.
stores the read data. It is good practice to reuse existing buffers
The +buffer+ argument specifies a buffer in which #read_scalar
goes wrong.
Might raise SystemCallError, IOError or SocketError when something
read message, or nil on end-of-stream.
Read a scalar message from the underlying IO object. Returns the
def read_scalar(buffer = new_buffer, max_size = nil) if !@io.read(4, buffer) return nil end while buffer.size < 4 tmp = @io.read(4 - buffer.size) if tmp.empty? return nil else buffer << tmp end end size = buffer.unpack(UINT32_PACK_FORMAT)[0] if size == 0 buffer.replace('') return buffer else if !max_size.nil? && size > max_size raise SecurityError, "Scalar message size (#{size}) " << "exceeds maximum allowed size (#{max_size})." end if !@io.read(size, buffer) return nil end if buffer.size < size tmp = '' while buffer.size < size if !@io.read(size - buffer.size, tmp) return nil else buffer << tmp end end end return buffer end rescue Errno::ECONNRESET return nil end
def recv_io(klass = IO, negotiate = true)
Might raise SystemCallError, IOError or SocketError when something
this only works on Unix sockets.
side must have sent an IO object by calling send_io(). Note that
Receive an IO object (a file descriptor) from the channel. The other
def recv_io(klass = IO, negotiate = true) write("pass IO") if negotiate io = @io.recv_io(klass) write("got IO") if negotiate return io end
def send_io(io)
Might raise SystemCallError, IOError or SocketError when something
this only works on Unix sockets.
side must receive the IO object by calling recv_io(). Note that
Send an IO object (a file descriptor) over the channel. The other
def send_io(io) # We read a message before actually calling #send_io # in order to prevent the other side from accidentally # read()ing past the normal data and reading our file # descriptor too. # # For example suppose that side A looks like this: # # read(fd, buf, 1024) # read_io(fd) # # and side B: # # write(fd, buf, 100) # send_io(fd_to_pass) # # If B completes both write() and send_io(), then A's read() call # reads past the 100 bytes that B sent. On some platforms, like # Linux, this will cause read_io() to fail. And it just so happens # that Ruby's IO#read method slurps more than just the given amount # of bytes. result = read if !result raise EOFError, "End of stream" elsif result != ["pass IO"] raise IOError, "IO passing pre-negotiation header expected" else @io.send_io(io) # Once you've sent the IO you expect to be able to close it on the # sender's side, even if the other side hasn't read the IO yet. # Not so: on some operating systems (I'm looking at you OS X) this # can cause the receiving side to receive a bad file descriptor. # The post negotiation protocol ensures that we block until the # other side has really received the IO. result = read if !result raise EOFError, "End of stream" elsif result != ["got IO"] raise IOError, "IO passing post-negotiation header expected" end end end
def write(name, *args)
Might raise SystemCallError, IOError or SocketError when something
to_s().
other elements. These arguments will internally be converted to strings by calling
file descriptor. _name_ is the first element in the message, and _args_ are the
Send an array message, which consists of the given elements, over the underlying
def write(name, *args) check_argument(name) args.each do |arg| check_argument(arg) end message = "#{name}#{DELIMITER}" args.each do |arg| message << arg.to_s << DELIMITER end @io.write([message.size].pack('n') << message) @io.flush end
def write_scalar(data)
Might raise SystemCallError, IOError or SocketError when something
Send a scalar message over the underlying IO object.
def write_scalar(data) @io.write([data.size].pack('N') << data) @io.flush end