lib/mutant/parallel.rb



# frozen_string_literal: true

module Mutant
  # Parallel execution engine of arbitrary payloads
  module Parallel
    # Run async computation returning driver
    #
    # @param [World] world
    # @param [Config] config
    #
    # @return [Driver]
    def self.async(config:, world:)
      shared = shared_state(world, config)

      world.process_warmup

      workers = workers(world, config, shared)

      Driver.new(
        workers:,
        threads: threads(world, config, workers),
        **shared
      )
    end

    # rubocop:disable Metrics/MethodLength
    def self.workers(world, config, shared)
      Array.new(config.jobs) do |index|
        Worker.start(
          block:            config.block,
          index:,
          on_process_start: config.on_process_start,
          process_name:     "#{config.process_name}-#{index}",
          timeout:          config.timeout,
          world:,
          **shared
        )
      end
    end
    private_class_method :workers
    # rubocop:enable Metrics/MethodLength

    def self.shared_state(world, config)
      {
        var_active_jobs: shared(Variable::IVar, world, value: Set.new),
        var_final:       shared(Variable::IVar, world),
        var_running:     shared(Variable::MVar, world, value: config.jobs),
        var_sink:        shared(Variable::IVar, world, value: config.sink),
        var_source:      shared(Variable::IVar, world, value: config.source)
      }
    end
    private_class_method :shared_state

    def self.threads(world, config, workers)
      thread = world.thread

      workers.map do |worker|
        thread.new do
          thread.current.name = "#{config.thread_name}-#{worker.index}"
          worker.call
        end
      end
    end
    private_class_method :threads

    def self.shared(klass, world, **attributes)
      klass.new(
        condition_variable: world.condition_variable,
        mutex:              world.mutex,
        **attributes
      )
    end
    private_class_method :shared

    # Job result sink signature
    module Sink
      include AbstractType

      # Process job result
      #
      # @param [Response]
      #
      # @return [self]
      abstract_method :response

      # The sink status
      #
      # @return [Object]
      abstract_method :status

      # Test if processing should stop
      #
      # @return [Boolean]
      abstract_method :stop?
    end # Sink

    # Parallel run configuration
    class Config
      include Adamantium, Anima.new(
        :block,
        :jobs,
        :on_process_start,
        :process_name,
        :sink,
        :source,
        :thread_name,
        :timeout
      )
    end # Config

    class Response
      include Anima.new(:error, :job, :log, :result)
    end

    # Parallel execution status
    class Status
      include Adamantium, Anima.new(
        :active_jobs,
        :done,
        :payload
      )

      alias_method :done?, :done
    end # Status

  end # Parallel
end # Mutant