lib/mutant/parallel/driver.rb



# frozen_string_literal: true

module Mutant
  module Parallel
    # Driver for parallelized execution
    class Driver
      include Anima.new(
        :threads,
        :var_active_jobs,
        :var_final,
        :var_running,
        :var_sink,
        :var_source,
        :workers
      )

      private(*anima.attribute_names)

      def initialize(**attributes)
        @alive = true
        super
      end

      # Wait for computation to finish, with timeout
      #
      # @param [Float] timeout
      #
      # @return [Variable::Result<Sink#status>]
      #   current status
      def wait_timeout(timeout)
        var_final.take_timeout(timeout) if @alive

        finalize(status)
      end

      # Stop parallel computation
      #
      # This will cause all work to be immediately stopped.
      #
      # @return [self]
      def stop
        @alive = false
        threads.each(&:kill)
        self
      end

    private

      def finalize(status)
        status.tap do
          if status.done?
            workers.each(&:signal)
            workers.each(&:join)
            threads.each(&:join)
          end
        end
      end

      def status
        var_active_jobs.with do |active_jobs|
          var_sink.with do |sink|
            Status.new(
              active_jobs: active_jobs.dup.freeze,
              done:        threads.all? { |worker| !worker.alive? },
              payload:     sink.status
            )
          end
        end
      end
    end # Driver
  end # Parallel
end # Mutant