class Servolux::Prefork


execute method returns, the child will exit.
– listening on a socket, reading a file, polling a queue, etc. When the
“term” method that will wake the execute thread from any pending operations
return from the “execute” method when it receives the signal. Define a
signal to the child. For the child to shutdown gracefully, it needs to
Child processes are stopped by the prefork parent by sending a SIGTERM
SIGTERM
will exit.
a file, polling a queue, etc. When the execute method returns, the child
execute thread from any pending operations – listening on a socket, reading
method when it receives the signal. Define a “hup” method that will wake the
For the child to shutdown gracefully, it needs to return from the “execute”
will shutdown the child worker and then start up a new one to replace it.
Child processes are restarted by sending a SIGHUP signal to the child. This
SIGHUP
uses SIGTERM to halt all the children when it is stopping.
to forcibly kill the child; it will not be restarted. The parent process
just that one child. The SIGTERM signal when sent to a child process is used
SIGTERM. The SIGHUP signal when sent to a child process is used to restart
Forked child processes are configured to respond to two signals: SIGHUP and
==== Signals
processor load spikes.
Use the heartbeat with caution – allow margins for timing issues and
heartbeat prevents.
this will cause it to return nil – this is the error condition the
pipe to wait for this message. The timeout is passed to the select call, and
the parent. The parent is using a Kernel#select call on the communications
block of code, the Servolux library code will send a “heartbeat” message to
What is happening here is that each time the child processes executes the
two seconds.
random number generator will eventually cause the child to sleep longer than
Eventually all 42 child processes will be killed by their parents. The
pool.start 42
}
sleep(rand * 5)
puts “Process #{Process.pid} is running.”
pool = Servolux::Prefork.new(:timeout => 2) {
parent process.
parent process will halt the child worker. An error will be raised in the
iteration through the loop takes longer than :timeout seconds, then the
execute code must return before :timeout seconds have elapsed. If one
between the parent and the child worker. Each loop through the child’s
When a :timeout is supplied to the constructor, a “heartbeat” is setup
==== Heartbeat
pool.start 42
pool = Servolux::Prefork.new(:module => RunMe)
end
end
sleep 2
@fd.puts “Process #{Process.pid} @ #{Time.now}”
def execute
end
@fd.close
def after_executing
end
@fd = File.open(“#{Process.pid}.txt”, ‘w’)
def before_executing
module RunMe
execute method uses the stored file descriptor when logging the message.
the run loop starts and to close the file after the run loop completes. The
seconds. The before/after_executing methods are used to open the file before
process ID and the current time to a file. Each worker will do this every 2
In this example, we are creating 42 worker processes that will log the
==== Before / After Executing
Pulling jobs from a beanstalkd work queue: github.com/TwP/servolux/blob/master/examples/beanstalk.rb<br><br>A pre-forking echo server: github.com/TwP/servolux/blob/master/examples/echo.rb<br><br>== Examples
Queue#pop will not return when wakeup is called on the thread).
code to respond accordingly to this wakeup call (a thread waiting on a
will call Thread#wakeup on the main child process thread; please write your
and reloading any code or resources your worker code will use. The SIGHUP
This has the advantage of calling your before/after_executing methods again
the last method executed in the signal handler.
be executed when SIGHUP is received by the child. Your “hup” method will be
to replace it. If you define a “hup” method in your worker module, it will
The parent will gracefully halt the child and then start a new child process
restart. The child will send a signal to the parent asking to be shutdown.
Sending a SIGHUP to a child process will cause that child to stop and
methods are optional.
the execute loop finishes. Each method will be called exactly once. Both
methods will be called before the child starts the execute loop and after
defining a “before_executing” method and an “after_executing” method. These
child process’ run loop. When using a module, you also have the option of
module that defines an “execute” method. This method will be executed in the
Instead of passing a block to the initialize method, you can provide a Ruby
errors as they please.
to iterate over workers that have errors, and it is up to the user to handle
The Prefork worker pool does not restart dead workers. A method is provided
and marshalled back to the parent process. This will halt the child process.
If your code raises an exception, it will be captured by the library code
is handled by the library.
that is, your code block should not worry about keeping itself alive. This
Prefork initialize method. The child processes executes this code in a loop;
The code to execute in the child processes is passed as a block to the
network requests.
pull jobs from a queue (beanstalkd for example) or listen on a socket for
executes user supplied code in that child process. The child process can
UNIX environment. Each worker in the pool forks a child process and then
A pre-forking worker pool is a technique for executing code in parallel in a
== Details
parallel using multiple processes.
The Prefork class provides a pre-forking worker pool for executing tasks in
== Synopsis

def add_workers( number = 1 )


the number set in :max_workers
Adds additional workers to the pool. It will not add more workers than

add_workers( number = 1 )
call-seq:
def add_workers( number = 1 )
  number.times do
    break if at_max_workers?
    worker = Worker.new(self, @config)
    worker.extend @module
    worker.start
    @workers << worker
    pause
  end
end

def at_max_workers?


workers allowed.
Return true or false if we are currently at or above the maximum number of

at_max_workers?
call-seq:
def at_max_workers?
  return false unless @max_workers
  return @workers.size >= @max_workers
end

def below_minimum_workers?


Report if the number of workers is below the minimum threshold

below_minimum_workers?
call-seq:
def below_minimum_workers?
  return false unless @min_workers
  return @workers.size < @min_workers
end

def dead_worker_count

Returns the number of dead workers in the pool

dead_worker_count -> Integer
call-seq:
def dead_worker_count
  worker_counts[:dead]
end

def each_worker( &block )


_block_.
Iterates over all the workers and yields each, in turn, to the given

each_worker { |worker| block }
call-seq:
def each_worker( &block )
  @workers.each(&block)
  self
end

def ensure_worker_pool_size


to the min_worker level. If min is not set, then we only prune
Generally, this means prune the number of workers and then spawn workers up

than the maximum number of workers.
Make sure that the worker pool has >= the minimum number of workers and less

ensure_worker_pool_size()
call-seq:
def ensure_worker_pool_size
  prune_workers
  while below_minimum_workers? do
    add_workers
  end
end

def errors


only if the worker has an error condition.
Iterates over all the workers and yields the worker to the given _block_

errors { |worker| block }
call-seq:
def errors
  @workers.each { |worker| yield worker unless worker.error.nil? }
  self
end

def initialize( opts = {}, &block )


left to the user to implement this functionality.
The pre-forking worker pool makes no effort to restart dead workers. It is

:max_workers.
then +add_workers+ will NOT allow ou to spawn more workers than
that at least :min_workers are up and running. If :max_workers is given,
:min_workers is given, the method +ensure_worker_pool_size+ will guarantee
Additionally, :min_workers and :max_workers options are avilable. If

manually set it to +nil+.
If you do not want to use the heartbeat then leave the :timeout unset or
the parent within :timeout seconds, then the child process will be halted.
the parent process and the child process. If the child does not respond to
If a :timeout is given, then each worker will setup a "heartbeat" between

option.
passed either as a block to this method or as a module via the :module
the workers to execute in their child processes. This code block can be
Create a new pre-forking worker pool. You must provide a block of code for

Prefork.new( :module => Module )
Prefork.new { block }
call-seq:
def initialize( opts = {}, &block )
  @timeout = opts.fetch(:timeout, nil)
  @module = opts.fetch(:module, nil)
  @max_workers = opts.fetch(:max_workers, nil)
  @min_workers = opts.fetch(:min_workers, nil)
  @config = opts.fetch(:config, {})
  @module = Module.new { define_method :execute, &block } if block
  @workers = []
  raise ArgumentError, 'No code was given to execute by the workers.' unless @module
end

def live_worker_count

Returns the number of live workers in the pool

live_worker_count -> Integer
call-seq:
def live_worker_count
  worker_counts[:alive]
end

def pause


http://en.wikipedia.org/wiki/Thundering_herd_problem
child processes in order to avoid the "thundering herd" problem.
seconds. This method is used to slow down the starting and stopping of
Pause script execution for a random time interval between 0.1 and 0.4
def pause
  sleep(0.1 + 0.3*rand)
end

def prune_workers


Remove workers that are no longer alive from the worker pool

prune_workers()
call-seq:
def prune_workers
  new_workers = @workers.find_all { |w| w.reap.alive? }
  @workers = new_workers
end

def reap

Returns:
  • (Prefork) - self
def reap
  @workers.each { |worker| worker.reap }
  self
end

def signal( signal = 'TERM' )

Returns:
  • (Prefork) - self

Parameters:
  • signal (String, Integer) -- The signal to send to child processes.
def signal( signal = 'TERM' )
  @workers.each { |worker| worker.signal(signal); pause }
  self
end

def start( number )

Returns:
  • (Prefork) - self

Parameters:
  • number (Integer) -- The number of workers to prefork
def start( number )
  @workers.clear
  add_workers( number )
 self
end

def stop


+errors+ methods will still function correctly after stopping the workers.
destroyed by this method; this means that the +each_worker+ and the
exit before this method will return. The worker instances are not
Stop all workers. The current process will wait for each child process to
def stop
  @workers.each { |worker| worker.stop; pause }
  reap
  self
end

def worker_counts

Returns a hash containing the counts of alive and dead workers

worker_counts -> { :alive => 2, :dead => 1 }
call-seq:
def worker_counts
  counts = { :alive => 0, :dead => 0 }
  each_worker do |worker|
    state = worker.alive? ? :alive : :dead
    counts[state] += 1
  end
  return counts
end