lib/servolux/piper.rb



# == Synopsis
# A Piper is used to fork a child proces and then establish a communication
# pipe between the parent and child. This communication pipe is used to pass
# Ruby objects between the two.
#
# == Details
# When a new piper instance is created, the Ruby process is forked into two
# porcesses - the parent and the child. Each continues execution from the
# point of the fork. The piper establishes a pipe for communication between
# the parent and the child. This communication pipe can be opened as read /
# write / read-write (from the perspective of the parent).
#
# Communication over the pipe is handled by marshalling Ruby objects through
# the pipe. This means that nearly any Ruby object can be passed between the
# two processes. For example, exceptions from the child process can be
# marshalled back to the parent and raised there.
#
# Object passing is handled by use of the +puts+ and +gets+ methods defined
# on the Piper. These methods use a +timeout+ and the Kernel#select method
# to ensure a timely return.
#
# == Examples
#
#    piper = Servolux::Piper.new('r', :timeout => 5)
#
#    piper.parent {
#      $stdout.puts "parent pid #{Process.pid}"
#      $stdout.puts "child pid #{piper.pid} [from fork]"
#
#      child_pid = piper.gets
#      $stdout.puts "child pid #{child_pid} [from child]"
#
#      msg = piper.gets
#      $stdout.puts "message from child #{msg.inspect}"
#    }
#
#    piper.child {
#      sleep 2
#      piper.puts Process.pid
#      sleep 3
#      piper.puts "The time is #{Time.now}"
#    }
#
#    piper.close
#
class Servolux::Piper

  # :stopdoc:
  SEPERATOR = [0xDEAD, 0xBEEF].pack('n*').freeze
  # :startdoc:

  # call-seq:
  #    Piper.daemon( nochdir = false, noclose = false )
  #
  # Creates a new Piper with the child process configured as a daemon. The
  # +pid+ method of the piper returns the PID of the daemon process.
  #
  # Be default a daemon process will release its current working directory
  # and the stdout/stderr/stdin file descriptors. This allows the parent
  # process to exit cleanly. This behavior can be overridden by setting the
  # _nochdir_ and _noclose_ flags to true. The first will keep the current
  # working directory; the second will keep stdout/stderr/stdin open.
  #
  def self.daemon( nochdir = false, noclose = false )
    piper = self.new(:timeout => 1)
    piper.parent {
      pid = piper.gets
      raise ::Servolux::Error, 'Could not get the child PID.' if pid.nil?
      piper.instance_variable_set(:@child_pid, pid)
    }
    piper.child {
      Process.setsid                     # Become session leader.
      exit!(0) if fork                   # Zap session leader.

      Dir.chdir '/' unless nochdir       # Release old working directory.
      File.umask 0000                    # Ensure sensible umask.

      unless noclose
        STDIN.reopen  '/dev/null'        # Free file descriptors and
        STDOUT.reopen '/dev/null', 'a'   # point them somewhere sensible.
        STDERR.reopen '/dev/null', 'a'
      end

      piper.puts Process.pid
    }
    return piper
  end

  # The timeout in seconds to wait for puts / gets commands.
  attr_accessor :timeout

  # The read end of the pipe.
  attr_reader :read_io

  # The write end of the pipe.
  attr_reader :write_io

  # call-seq:
  #    Piper.new( mode = 'r', opts = {} )
  #
  # Creates a new Piper instance with the communication pipe configured
  # using the provided _mode_. The default mode is read-only (from the
  # parent, and write-only from the child). The supported modes are as
  # follows:
  #
  #    Mode | Parent View | Child View
  #    -----+-------------+-----------
  #    r      read-only     write-only
  #    w      write-only    read-only
  #    rw     read-write    read-write
  #
  # The communication timeout can be provided as an option. This is the
  # number of seconds to wait for a +puts+ or +gets+ to succeed.
  #
  def initialize( *args )
    opts = args.last.is_a?(Hash) ? args.pop : {}
    mode = args.first || 'r'

    unless %w[r w rw].include? mode
      raise ArgumentError, "Unsupported mode #{mode.inspect}"
    end

    @timeout = opts.getopt(:timeout, 0)
    if defined? ::Encoding
      @read_io, @write_io = IO.pipe('ASCII-8BIT')    # encoding for Ruby 1.9
    else
      @read_io, @write_io = IO.pipe
    end
    @child_pid = Kernel.fork

    if child?
      case mode
      when 'r'; close_read
      when 'w'; close_write
      end
    else
      case mode
      when 'r'; close_write
      when 'w'; close_read
      end
    end
  end

  # Close both the read and write ends of the communications pipe. This only
  # affects the process from which it was called -- the parent or the child.
  #
  def close
    @read_io.close rescue nil
    @write_io.close rescue nil
  end

  # Close the read end of the communications pipe. This only affects the
  # process from which it was called -- the parent or the child.
  #
  def close_read
    @read_io.close rescue nil
  end

  # Close the write end of the communications pipe. This only affects the
  # process from which it was called -- the parent or the child.
  #
  def close_write
    @write_io.close rescue nil
  end

  # Returns +true+ if the communications pipe is readable from the process
  # and there is data waiting to be read.
  #
  def readable?
    return false if @read_io.closed?
    r,w,e = Kernel.select([@read_io], nil, nil, @timeout)
    return !(r.nil? or r.empty?)
  end

  # Returns +true+ if the communications pipe is writeable from the process
  # and the write buffer can accept more data.
  #
  def writeable?
    return false if @write_io.closed?
    r,w,e = Kernel.select(nil, [@write_io], nil, @timeout)
    return !(w.nil? or w.empty?)
  end

  # call-seq:
  #    child { block }
  #    child {|piper| block }
  #
  # Execute the _block_ only in the child process. This method returns
  # immediately when called from the parent process.
  #
  def child( &block )
    return unless child?
    raise ArgumentError, "A block must be supplied" if block.nil?

    if block.arity > 0
      block.call(self)
    else
      block.call
    end
  end

  # Returns +true+ if this is the child prcoess and +false+ otherwise.
  #
  def child?
    @child_pid.nil?
  end

  # call-seq:
  #    parent { block }
  #    parent {|piper| block }
  #
  # Execute the _block_ only in the parent process. This method returns
  # immediately when called from the child process.
  #
  def parent( &block )
    return unless parent?
    raise ArgumentError, "A block must be supplied" if block.nil?

    if block.arity > 0
      block.call(self)
    else
      block.call
    end
  end

  # Returns +true+ if this is the parent prcoess and +false+ otherwise.
  #
  def parent?
    !@child_pid.nil?
  end

  # Returns the PID of the child process when called from the parent.
  # Returns +nil+ when called from the child.
  #
  def pid
    @child_pid
  end

  # Read an object from the communication pipe. Returns +nil+ if the pipe is
  # closed for reading or if no data is available before the timeout
  # expires. If data is available then it is un-marshalled and returned as a
  # Ruby object.
  #
  # This method will block until the +timeout+ is reached or data can be
  # read from the pipe.
  #
  def gets
    return unless readable?

    data = @read_io.gets SEPERATOR
    return if data.nil?

    data.chomp! SEPERATOR
    Marshal.load(data) rescue data
  end

  # Write an object to the communication pipe. Returns +nil+ if the pipe is
  # closed for writing or if the write buffer is full. The _obj_ is
  # marshalled and written to the pipe (therefore, procs and other
  # un-marshallable Ruby objects cannot be passed through the pipe).
  #
  # If the write is successful, then the number of bytes written to the pipe
  # is returned. If this number is zero it means that the _obj_ was
  # unsuccessfully communicated (sorry).
  #
  def puts( obj )
    return unless writeable?

    bytes = @write_io.write Marshal.dump(obj)
    @write_io.write SEPERATOR if bytes > 0
    @write_io.flush

    bytes
  end

  # Send the given signal to the child process. The signal may be an integer
  # signal number or a POSIX signal name (either with or without a +SIG+
  # prefix). 
  #
  # This method does nothing when called from the child process.
  #
  def signal( sig )
    return if @child_pid.nil?
    sig = Signal.list.invert[sig] if sig.is_a?(Integer)
    Process.kill(sig, @child_pid)
  end

end  # class Servolux::Piper

# EOF