class Bundler::ParallelWorkers::UnixWorker

It starts preconfigured number of threads that write to the connected pipe.
this code works is, it forks a preconfigured number of workers and then
UnixWorker is used only on platforms where fork is available. The way

def initialize(size, job)

def initialize(size, job)
  # Close the persistent connections for the main thread before forking
  Net::HTTP::Persistent.new('bundler', :ENV).shutdown
  super
end

def prepare_threads(size)

Parameters:
  • size (Integer) -- Number of threads to be started
def prepare_threads(size)
  @threads = size.times.map do |i|
    Thread.start do
      worker = @workers[i]
      loop do
        obj = @request_queue.deq
        break if obj.equal? POISON
        @response_queue.enq worker.work(obj)
      end
    end
  end
end

def prepare_workers(size, func)

Parameters:
  • func (Proc) -- Job that should be executed in the worker
  • size (Integer) -- Size of worker pool
def prepare_workers(size, func)
  @workers = size.times.map do |num|
    child_read, parent_write = IO.pipe
    parent_read, child_write = IO.pipe
    pid = Process.fork do
      begin
        parent_read.close
        parent_write.close
        while !child_read.eof?
          obj = Marshal.load child_read
          Marshal.dump func.call(obj, num), child_write
        end
      rescue Exception => e
        begin
          Marshal.dump WrappedException.new(e), child_write
        rescue Errno::EPIPE
          nil
        end
      ensure
        child_read.close
        child_write.close
      end
    end
    child_read.close
    child_write.close
    JobHandler.new pid, parent_read, parent_write
  end
end

def stop_workers

Kill the forked workers by sending SIGINT to them
def stop_workers
  @workers.each do |worker|
    worker.io_r.close
    worker.io_w.close
    Process.kill :INT, worker.pid
  end
  @workers.each do |worker|
    Process.waitpid worker.pid
  end
end