class PhusionPassenger::MessageChannel
happen.
receiving side does things in the wrong order then bad things will
receive a message, then an IO object, then a scalar. If the
IO object, then a scalar, then the receiving side must first
in the exact some order. So suppose you first send a message, then an
of these in a specific order, then the receiving side must receive them
scalar messages and IO objects. If you send a collection of any
Be careful with mixing the sending/receiving of array messages,
Note:
if you want to close the underlying IO object.
the underlying IO object is not automatically closed. Call close()
wrapped IO object. If a MessageChannel object is destroyed,
The life time of a MessageChannel is independent from that of the
channel1.read_scalar
channel2.write_scalar(“some long string which can contain arbitrary binary data”)
# Send a scalar message.
channel1.read # => [“hello”, “world !!”]
channel2.write(“hello”, “world !!”)
# Send an array message.
channel2 = MessageChannel.new(b)
channel1 = MessageChannel.new(a)
a, b = IO.pipe
MessageChannel is to be wrapped around an IO object. For example:
easy to parse.
The protocol is designed to be low overhead, easy to implement and
binary data. Scalar messages also have a specific length.
These are byte strings which may contain arbitrary
[ Scalar messages ]
must have at least one element.
contain NUL characters ('\0'
). Note that an array message
itself has a specific length. The contained strings may not
These are just a list of strings, and the message
[ Array messages ]
There are two kinds of messages:
All of these methods use exceptions for error reporting.
- file descriptor (IO object) passing over a Unix socket.
- sending and receiving messages over an IO channel.
- sending and receiving raw data over an IO channel.
This class provides convenience methods for:
def check_argument(arg)
def check_argument(arg) if arg.to_s.index(DELIMITER) raise ArgumentError, "Message name and arguments may not contain #{DELIMITER_NAME}." end end
def close
Close the underlying IO stream. Might raise SystemCallError or
def close @io.close end
def closed?
def closed? return @io.closed? end
def fileno
def fileno return @io.fileno end
def initialize(io = nil)
def initialize(io = nil) @io = io # Make it binary just in case. @io.binmode if @io end
def new_buffer
def new_buffer return ByteString.new end
def new_buffer
def new_buffer return "" end
def read
Might raise SystemCallError, IOError or SocketError when something
been reached.
Returns the array message as an array, or nil when end-of-stream has
Read an array message from the underlying file descriptor.
def read buffer = new_buffer if !@io.read(HEADER_SIZE, buffer) return nil end while buffer.size < HEADER_SIZE tmp = @io.read(HEADER_SIZE - buffer.size) if tmp.empty? return nil else buffer << tmp end end chunk_size = buffer.unpack(UINT16_PACK_FORMAT)[0] if !@io.read(chunk_size, buffer) return nil end while buffer.size < chunk_size tmp = @io.read(chunk_size - buffer.size) if tmp.empty? return nil else buffer << tmp end end message = [] offset = 0 delimiter_pos = buffer.index(DELIMITER, offset) while !delimiter_pos.nil? if delimiter_pos == 0 message << "" else message << buffer[offset .. delimiter_pos - 1] end offset = delimiter_pos + 1 delimiter_pos = buffer.index(DELIMITER, offset) end return message rescue Errno::ECONNRESET return nil end
def read_hash
Might raise SystemCallError, IOError or SocketError when something
Returns nil when end-of-stream has been reached.
has an even number of elements.
result as a hash instead of an array. This assumes that the array message
Read an array message from the underlying file descriptor and return the
def read_hash buffer = new_buffer if !@io.read(HEADER_SIZE, buffer) return nil end while buffer.size < HEADER_SIZE tmp = @io.read(HEADER_SIZE - buffer.size) if tmp.empty? return nil else buffer << tmp end end chunk_size = buffer.unpack(UINT16_PACK_FORMAT)[0] if !@io.read(chunk_size, buffer) return nil end while buffer.size < chunk_size tmp = @io.read(chunk_size - buffer.size) if tmp.empty? return nil else buffer << tmp end end result = {} offset = 0 delimiter_pos = buffer.index(DELIMITER, offset) while !delimiter_pos.nil? if delimiter_pos == 0 name = "" else name = buffer[offset .. delimiter_pos - 1] end offset = delimiter_pos + 1 delimiter_pos = buffer.index(DELIMITER, offset) if delimiter_pos.nil? raise InvalidHashError elsif delimiter_pos == 0 value = "" else value = buffer[offset .. delimiter_pos - 1] end result[name] = value offset = delimiter_pos + 1 delimiter_pos = buffer.index(DELIMITER, offset) end return result rescue Errno::ECONNRESET return nil end
def read_scalar(buffer = new_buffer, max_size = nil)
size for the scalar message. If the received scalar message's size
The +max_size+ argument allows one to specify the maximum allowed
in order to minimize stress on the garbage collector.
stores the read data. It is good practice to reuse existing buffers
The +buffer+ argument specifies a buffer in which #read_scalar
goes wrong.
Might raise SystemCallError, IOError or SocketError when something
read message, or nil on end-of-stream.
Read a scalar message from the underlying IO object. Returns the
def read_scalar(buffer = new_buffer, max_size = nil) if !@io.read(4, buffer) return nil end while buffer.size < 4 tmp = @io.read(4 - buffer.size) if tmp.empty? return nil else buffer << tmp end end size = buffer.unpack(UINT32_PACK_FORMAT)[0] if size == 0 buffer.replace('') return buffer else if !max_size.nil? && size > max_size raise SecurityError, "Scalar message size (#{size}) " << "exceeds maximum allowed size (#{max_size})." end if !@io.read(size, buffer) return nil end if buffer.size < size tmp = '' while buffer.size < size if !@io.read(size - buffer.size, tmp) return nil else buffer << tmp end end end return buffer end rescue Errno::ECONNRESET return nil end
def recv_io(klass = IO, negotiate = true)
Might raise SystemCallError, IOError or SocketError when something
this only works on Unix sockets.
side must have sent an IO object by calling send_io(). Note that
Receive an IO object (a file descriptor) from the channel. The other
def recv_io(klass = IO, negotiate = true) write("pass IO") if negotiate io = @io.recv_io(klass) write("got IO") if negotiate return io end
def send_io(io)
Might raise SystemCallError, IOError or SocketError when something
this only works on Unix sockets.
side must receive the IO object by calling recv_io(). Note that
Send an IO object (a file descriptor) over the channel. The other
def send_io(io) # We read a message before actually calling #send_io # in order to prevent the other side from accidentally # read()ing past the normal data and reading our file # descriptor too. # # For example suppose that side A looks like this: # # read(fd, buf, 1024) # read_io(fd) # # and side B: # # write(fd, buf, 100) # send_io(fd_to_pass) # # If B completes both write() and send_io(), then A's read() call # reads past the 100 bytes that B sent. On some platforms, like # Linux, this will cause read_io() to fail. And it just so happens # that Ruby's IO#read method slurps more than just the given amount # of bytes. result = read if !result raise EOFError, "End of stream" elsif result != ["pass IO"] raise IOError, "IO passing pre-negotiation header expected" else @io.send_io(io) # Once you've sent the IO you expect to be able to close it on the # sender's side, even if the other side hasn't read the IO yet. # Not so: on some operating systems (I'm looking at you OS X) this # can cause the receiving side to receive a bad file descriptor. # The post negotiation protocol ensures that we block until the # other side has really received the IO. result = read if !result raise EOFError, "End of stream" elsif result != ["got IO"] raise IOError, "IO passing post-negotiation header expected" end end end
def write(name, *args)
Might raise SystemCallError, IOError or SocketError when something
to_s().
other elements. These arguments will internally be converted to strings by calling
file descriptor. _name_ is the first element in the message, and _args_ are the
Send an array message, which consists of the given elements, over the underlying
def write(name, *args) check_argument(name) args.each do |arg| check_argument(arg) end message = "#{name}#{DELIMITER}" args.each do |arg| message << arg.to_s << DELIMITER end @io.write([message.size].pack('n') << message) @io.flush end
def write_scalar(data)
Might raise SystemCallError, IOError or SocketError when something
Send a scalar message over the underlying IO object.
def write_scalar(data) @io.write([data.size].pack('N') << data) @io.flush end