lib/bundler/parallel_workers/worker.rb



module Bundler
  module ParallelWorkers
    class Worker
      POISON = Object.new

      class WrappedException < StandardError
        attr_reader :exception
        def initialize(exn)
          @exception = exn
        end
      end

      # Creates a worker pool of specified size
      #
      # @param size [Integer] Size of pool
      # @param func [Proc] job to run in inside the worker pool
      def initialize(size, func)
        @request_queue = Queue.new
        @response_queue = Queue.new
        prepare_workers size, func
        prepare_threads size
        trap("INT") { @threads.each {|i| i.exit }; stop_workers; exit 1 }
      end

      # Enqueue a request to be executed in the worker pool
      #
      # @param obj [String] mostly it is name of spec that should be downloaded
      def enq(obj)
        @request_queue.enq obj
      end

      # Retrieves results of job function being executed in worker pool
      def deq
        result = @response_queue.deq
        if result.is_a?(WrappedException)
          raise result.exception
        end
        result
      end

      # Stop the forked workers and started threads
      def stop
        stop_threads
        stop_workers
      end

      private
      # Stop the worker threads by sending a poison object down the request queue
      # so as worker threads after retrieving it, shut themselves down
      def stop_threads
        @threads.each do
          @request_queue.enq POISON
        end
        @threads.each do |thread|
          thread.join
        end
      end

      # To be overridden by child classes
      def prepare_threads(size)
      end

      # To be overridden by child classes
      def stop_workers
      end

    end
  end
end