lib/temporalio/worker/thread_pool.rb



# frozen_string_literal: true

# Much of this logic taken from
# https://github.com/ruby-concurrency/concurrent-ruby/blob/044020f44b36930b863b930f3ee8fa1e9f750469/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb,
# see MIT license at
# https://github.com/ruby-concurrency/concurrent-ruby/blob/044020f44b36930b863b930f3ee8fa1e9f750469/LICENSE.txt

module Temporalio
  class Worker
    # Implementation of a thread pool. This implementation is a stripped down form of Concurrent Ruby's
    # `CachedThreadPool`.
    class ThreadPool
      # @return [ThreadPool] Default/shared thread pool instance with unlimited max threads.
      def self.default
        @default ||= new
      end

      # @!visibility private
      def self._monotonic_time
        Process.clock_gettime(Process::CLOCK_MONOTONIC)
      end

      # Create a new thread pool that creates threads as needed.
      #
      # @param max_threads [Integer, nil] Maximum number of thread workers to create, or nil for unlimited max.
      # @param idle_timeout [Float] Number of seconds before a thread worker with no work should be stopped. Note,
      #   the check of whether a thread worker is idle is only done on each new {execute} call.
      def initialize(max_threads: nil, idle_timeout: 20)
        @max_threads = max_threads
        @idle_timeout = idle_timeout

        @mutex = Mutex.new
        @pool = []
        @ready = []
        @queue = []
        @scheduled_task_count = 0
        @completed_task_count = 0
        @largest_length       = 0
        @workers_counter = 0
        @prune_interval = @idle_timeout / 2
        @next_prune_time = ThreadPool._monotonic_time + @prune_interval
      end

      # Execute the given block in a thread. The block should be built to never raise and need no arguments.
      #
      # @yield Block to execute.
      def execute(&block)
        @mutex.synchronize do
          locked_assign_worker(&block) || locked_enqueue(&block)
          @scheduled_task_count += 1
          locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time
        end
      end

      # @return [Integer] The largest number of threads that have been created in the pool since construction.
      def largest_length
        @mutex.synchronize { @largest_length }
      end

      # @return [Integer] The number of tasks that have been scheduled for execution on the pool since construction.
      def scheduled_task_count
        @mutex.synchronize { @scheduled_task_count }
      end

      # @return [Integer] The number of tasks that have been completed by the pool since construction.
      def completed_task_count
        @mutex.synchronize { @completed_task_count }
      end

      # @return [Integer] The number of threads that are actively executing tasks.
      def active_count
        @mutex.synchronize { @pool.length - @ready.length }
      end

      # @return [Integer] The number of threads currently in the pool.
      def length
        @mutex.synchronize { @pool.length }
      end

      # @return [Integer] The number of tasks in the queue awaiting execution.
      def queue_length
        @mutex.synchronize { @queue.length }
      end

      # Gracefully shutdown each thread when it is done with its current task. This should not be called until all
      # workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the
      # global default).
      def shutdown
        @mutex.synchronize do
          # Stop all workers
          @pool.each(&:stop)
        end
      end

      # Kill each thread. This should not be called until all workers using this executor are complete. This does not
      # need to be called at all on program exit (e.g. for the global default).
      def kill
        @mutex.synchronize do
          # Kill all workers
          @pool.each(&:kill)
          @pool.clear
          @ready.clear
        end
      end

      # @!visibility private
      def _remove_busy_worker(worker)
        @mutex.synchronize { locked_remove_busy_worker(worker) }
      end

      # @!visibility private
      def _ready_worker(worker, last_message)
        @mutex.synchronize { locked_ready_worker(worker, last_message) }
      end

      # @!visibility private
      def _worker_died(worker)
        @mutex.synchronize { locked_worker_died(worker) }
      end

      # @!visibility private
      def _worker_task_completed
        @mutex.synchronize { @completed_task_count += 1 }
      end

      private

      def locked_assign_worker(&block)
        # keep growing if the pool is not at the minimum yet
        worker, = @ready.pop || locked_add_busy_worker
        if worker
          worker << block
          true
        else
          false
        end
      end

      def locked_enqueue(&block)
        @queue << block
      end

      def locked_add_busy_worker
        return if @max_threads && @pool.size >= @max_threads

        @workers_counter += 1
        @pool << (worker = Worker.new(self, @workers_counter))
        @largest_length = @pool.length if @pool.length > @largest_length
        worker
      end

      def locked_prune_pool
        now = ThreadPool._monotonic_time
        stopped_workers = 0
        while !@ready.empty? && (@pool.size - stopped_workers).positive?
          worker, last_message = @ready.first
          break unless now - last_message > @idle_timeout

          stopped_workers += 1
          @ready.shift
          worker << :stop

        end

        @next_prune_time = ThreadPool._monotonic_time + @prune_interval
      end

      def locked_remove_busy_worker(worker)
        @pool.delete(worker)
      end

      def locked_ready_worker(worker, last_message)
        block = @queue.shift
        if block
          worker << block
        else
          @ready.push([worker, last_message])
        end
      end

      def locked_worker_died(worker)
        locked_remove_busy_worker(worker)
        replacement_worker = locked_add_busy_worker
        locked_ready_worker(replacement_worker, ThreadPool._monotonic_time) if replacement_worker
      end

      # @!visibility private
      class Worker
        def initialize(pool, id)
          @queue = Queue.new
          @thread = Thread.new(@queue, pool) do |my_queue, my_pool|
            catch(:stop) do
              loop do
                case block = my_queue.pop
                when :stop
                  pool._remove_busy_worker(self)
                  throw :stop
                else
                  begin
                    block.call
                    my_pool._worker_task_completed
                    my_pool._ready_worker(self, ThreadPool._monotonic_time)
                  rescue StandardError => e
                    # Ignore
                    warn("Unexpected execute block error: #{e.full_message}")
                  rescue Exception => e # rubocop:disable Lint/RescueException
                    warn("Unexpected execute block exception: #{e.full_message}")
                    my_pool._worker_died(self)
                    throw :stop
                  end
                end
              end
            end
          end
          @thread.name = "temporal-thread-#{id}"
        end

        # @!visibility private
        def <<(block)
          @queue << block
        end

        # @!visibility private
        def stop
          @queue << :stop
        end

        # @!visibility private
        def kill
          @thread.kill
        end
      end

      private_constant :Worker
    end
  end
end