lib/chef-utils/parallel_map.rb



# frozen_string_literal: true
#
# Copyright:: Copyright (c) Chef Software Inc.
# License:: Apache License, Version 2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require "concurrent/executors"
require "concurrent/future"
require "singleton" unless defined?(Singleton)

module ChefUtils
  #
  # This module contains ruby refinements that adds several methods to the Enumerable
  # class which are useful for parallel processing.
  #
  module ParallelMap
    refine Enumerable do

      # Enumerates through the collection in parallel using the thread pool provided
      # or the default thread pool.  By using the default thread pool this supports
      # recursively calling the method without deadlocking while using a globally
      # fixed number of workers.  This method supports lazy collections.  It returns
      # synchronously, waiting until all the work is done.  Failures are only reported
      # after the collection has executed and only the first exception is raised.
      #
      # (0..).lazy.parallel_map { |i| i*i }.first(5)
      #
      # @return [Array] output results
      #
      def parallel_map(pool: nil)
        return self unless block_given?

        pool ||= ChefUtils::DefaultThreadPool.instance.pool

        futures = map do |item|
          Concurrent::Future.execute(executor: pool) do
            yield item
          end
        end

        futures.map(&:value!)
      end

      # This has the same behavior as parallel_map but returns the enumerator instead of
      # the return values.
      #
      # @return [Enumerable] the enumerable for method chaining
      #
      def parallel_each(pool: nil, &block)
        return self unless block_given?

        parallel_map(pool: pool, &block)

        self
      end

      # The flat_each method is tightly coupled to the usage of parallel_map within the
      # ChefFS implementation.  It is not itself a parallel method, but it is used to
      # iterate through the 2nd level of nested structure, which is tied to the nested
      # structures that ChefFS returns.
      #
      # This is different from Enumerable#flat_map because that behaves like map.flatten(1) while
      # this behaves more like flatten(1).each.  We need this on an Enumerable, so we have no
      # Enumerable#flatten method to call.
      #
      # [ [ 1, 2 ], [ 3, 4 ] ].flat_each(&block) calls block four times with 1, 2, 3, 4
      #
      # [ [ 1, 2 ], [ 3, 4 ] ].flat_map(&block) calls block twice with [1, 2] and [3,4]
      #
      def flat_each(&block)
        map do |value|
          if value.is_a?(Enumerable)
            value.each(&block)
          else
            yield value
          end
        end
      end
    end
  end

  # The DefaultThreadPool has a fixed thread size and has no
  # queue of work and the behavior on failure to find a thread is for the
  # caller to run the work.  This contract means that the thread pool can
  # be called recursively without deadlocking and while keeping the fixed
  # number of threads (and not exponentially growing the thread pool with
  # the depth of recursion).
  #
  class DefaultThreadPool
    include Singleton

    DEFAULT_THREAD_SIZE = 10

    # Size of the thread pool, must be set before getting the thread pool or
    # calling parallel_map/parallel_each.  Does not (but could be modified to)
    # support dynamic resizing. To get fully synchronous behavior set this equal to
    # zero rather than one since the caller will get work if the threads are
    # busy.
    #
    # @return [Integer] number of threads
    attr_accessor :threads

    # Memoizing accessor for the thread pool
    #
    # @return [Concurrent::ThreadPoolExecutor] the thread pool
    def pool
      @pool ||= Concurrent::ThreadPoolExecutor.new(
        min_threads: threads || DEFAULT_THREAD_SIZE,
        max_threads: threads || DEFAULT_THREAD_SIZE,
        max_queue: 0,
        # "synchronous" redefines the 0 in max_queue to mean 'no queue' instead of 'infinite queue'
        # it does not mean synchronous execution (no threads) but synchronous offload to the threads.
        synchronous: true,
        # this prevents deadlocks on recursive parallel usage
        fallback_policy: :caller_runs
      )
    end
  end
end