class Temporalio::Worker::ThreadPool

‘CachedThreadPool`.
Implementation of a thread pool. This implementation is a stripped down form of Concurrent Ruby’s

def self._monotonic_time

@!visibility private
def self._monotonic_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def self.default

Returns:
  • (ThreadPool) - Default/shared thread pool instance with unlimited max threads.
def self.default
  @default ||= new
end

def _ready_worker(worker, last_message)

@!visibility private
def _ready_worker(worker, last_message)
  @mutex.synchronize { locked_ready_worker(worker, last_message) }
end

def _remove_busy_worker(worker)

@!visibility private
def _remove_busy_worker(worker)
  @mutex.synchronize { locked_remove_busy_worker(worker) }
end

def _worker_died(worker)

@!visibility private
def _worker_died(worker)
  @mutex.synchronize { locked_worker_died(worker) }
end

def _worker_task_completed

@!visibility private
def _worker_task_completed
  @mutex.synchronize { @completed_task_count += 1 }
end

def active_count

Returns:
  • (Integer) - The number of threads that are actively executing tasks.
def active_count
  @mutex.synchronize { @pool.length - @ready.length }
end

def completed_task_count

Returns:
  • (Integer) - The number of tasks that have been completed by the pool since construction.
def completed_task_count
  @mutex.synchronize { @completed_task_count }
end

def execute(&block)

Other tags:
    Yield: - Block to execute.
def execute(&block)
  @mutex.synchronize do
    locked_assign_worker(&block) || locked_enqueue(&block)
    @scheduled_task_count += 1
    locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time
  end
end

def initialize(max_threads: nil, idle_timeout: 20)

Parameters:
  • idle_timeout (Float) -- Number of seconds before a thread worker with no work should be stopped. Note,
  • max_threads (Integer, nil) -- Maximum number of thread workers to create, or nil for unlimited max.
def initialize(max_threads: nil, idle_timeout: 20)
  @max_threads = max_threads
  @idle_timeout = idle_timeout
  @mutex = Mutex.new
  @pool = []
  @ready = []
  @queue = []
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0
  @workers_counter = 0
  @prune_interval = @idle_timeout / 2
  @next_prune_time = ThreadPool._monotonic_time + @prune_interval
end

def kill

need to be called at all on program exit (e.g. for the global default).
Kill each thread. This should not be called until all workers using this executor are complete. This does not
def kill
  @mutex.synchronize do
    # Kill all workers
    @pool.each(&:kill)
    @pool.clear
    @ready.clear
  end
end

def largest_length

Returns:
  • (Integer) - The largest number of threads that have been created in the pool since construction.
def largest_length
  @mutex.synchronize { @largest_length }
end

def length

Returns:
  • (Integer) - The number of threads currently in the pool.
def length
  @mutex.synchronize { @pool.length }
end

def locked_add_busy_worker

def locked_add_busy_worker
  return if @max_threads && @pool.size >= @max_threads
  @workers_counter += 1
  @pool << (worker = Worker.new(self, @workers_counter))
  @largest_length = @pool.length if @pool.length > @largest_length
  worker
end

def locked_assign_worker(&block) # rubocop:disable Naming/PredicateMethod

rubocop:disable Naming/PredicateMethod
def locked_assign_worker(&block) # rubocop:disable Naming/PredicateMethod
  # keep growing if the pool is not at the minimum yet
  worker, = @ready.pop || locked_add_busy_worker
  if worker
    worker << block
    true
  else
    false
  end
end

def locked_enqueue(&block)

def locked_enqueue(&block)
  @queue << block
end

def locked_prune_pool

def locked_prune_pool
  now = ThreadPool._monotonic_time
  stopped_workers = 0
  while !@ready.empty? && (@pool.size - stopped_workers).positive?
    worker, last_message = @ready.first
    break unless now - last_message > @idle_timeout
    stopped_workers += 1
    @ready.shift
    worker << :stop
  end
  @next_prune_time = ThreadPool._monotonic_time + @prune_interval
end

def locked_ready_worker(worker, last_message)

def locked_ready_worker(worker, last_message)
  block = @queue.shift
  if block
    worker << block
  else
    @ready.push([worker, last_message])
  end
end

def locked_remove_busy_worker(worker)

def locked_remove_busy_worker(worker)
  @pool.delete(worker)
end

def locked_worker_died(worker)

def locked_worker_died(worker)
  locked_remove_busy_worker(worker)
  replacement_worker = locked_add_busy_worker
  locked_ready_worker(replacement_worker, ThreadPool._monotonic_time) if replacement_worker
end

def queue_length

Returns:
  • (Integer) - The number of tasks in the queue awaiting execution.
def queue_length
  @mutex.synchronize { @queue.length }
end

def scheduled_task_count

Returns:
  • (Integer) - The number of tasks that have been scheduled for execution on the pool since construction.
def scheduled_task_count
  @mutex.synchronize { @scheduled_task_count }
end

def shutdown

global default).
workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the
Gracefully shutdown each thread when it is done with its current task. This should not be called until all
def shutdown
  @mutex.synchronize do
    # Stop all workers
    @pool.each(&:stop)
  end
end