class Temporalio::Worker::ThreadPool
‘CachedThreadPool`.
Implementation of a thread pool. This implementation is a stripped down form of Concurrent Ruby’s
def self._monotonic_time
def self._monotonic_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
def self.default
-
(ThreadPool)- Default/shared thread pool instance with unlimited max threads.
def self.default @default ||= new end
def _ready_worker(worker, last_message)
def _ready_worker(worker, last_message) @mutex.synchronize { locked_ready_worker(worker, last_message) } end
def _remove_busy_worker(worker)
def _remove_busy_worker(worker) @mutex.synchronize { locked_remove_busy_worker(worker) } end
def _worker_died(worker)
def _worker_died(worker) @mutex.synchronize { locked_worker_died(worker) } end
def _worker_task_completed
def _worker_task_completed @mutex.synchronize { @completed_task_count += 1 } end
def active_count
-
(Integer)- The number of threads that are actively executing tasks.
def active_count @mutex.synchronize { @pool.length - @ready.length } end
def completed_task_count
-
(Integer)- The number of tasks that have been completed by the pool since construction.
def completed_task_count @mutex.synchronize { @completed_task_count } end
def execute(&block)
- Yield: - Block to execute.
def execute(&block) @mutex.synchronize do locked_assign_worker(&block) || locked_enqueue(&block) @scheduled_task_count += 1 locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time end end
def initialize(max_threads: nil, idle_timeout: 20)
-
idle_timeout(Float) -- Number of seconds before a thread worker with no work should be stopped. Note, -
max_threads(Integer, nil) -- Maximum number of thread workers to create, or nil for unlimited max.
def initialize(max_threads: nil, idle_timeout: 20) @max_threads = max_threads @idle_timeout = idle_timeout @mutex = Mutex.new @pool = [] @ready = [] @queue = [] @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @prune_interval = @idle_timeout / 2 @next_prune_time = ThreadPool._monotonic_time + @prune_interval end
def kill
Kill each thread. This should not be called until all workers using this executor are complete. This does not
def kill @mutex.synchronize do # Kill all workers @pool.each(&:kill) @pool.clear @ready.clear end end
def largest_length
-
(Integer)- The largest number of threads that have been created in the pool since construction.
def largest_length @mutex.synchronize { @largest_length } end
def length
-
(Integer)- The number of threads currently in the pool.
def length @mutex.synchronize { @pool.length } end
def locked_add_busy_worker
def locked_add_busy_worker return if @max_threads && @pool.size >= @max_threads @workers_counter += 1 @pool << (worker = Worker.new(self, @workers_counter)) @largest_length = @pool.length if @pool.length > @largest_length worker end
def locked_assign_worker(&block) # rubocop:disable Naming/PredicateMethod
def locked_assign_worker(&block) # rubocop:disable Naming/PredicateMethod # keep growing if the pool is not at the minimum yet worker, = @ready.pop || locked_add_busy_worker if worker worker << block true else false end end
def locked_enqueue(&block)
def locked_enqueue(&block) @queue << block end
def locked_prune_pool
def locked_prune_pool now = ThreadPool._monotonic_time stopped_workers = 0 while !@ready.empty? && (@pool.size - stopped_workers).positive? worker, last_message = @ready.first break unless now - last_message > @idle_timeout stopped_workers += 1 @ready.shift worker << :stop end @next_prune_time = ThreadPool._monotonic_time + @prune_interval end
def locked_ready_worker(worker, last_message)
def locked_ready_worker(worker, last_message) block = @queue.shift if block worker << block else @ready.push([worker, last_message]) end end
def locked_remove_busy_worker(worker)
def locked_remove_busy_worker(worker) @pool.delete(worker) end
def locked_worker_died(worker)
def locked_worker_died(worker) locked_remove_busy_worker(worker) replacement_worker = locked_add_busy_worker locked_ready_worker(replacement_worker, ThreadPool._monotonic_time) if replacement_worker end
def queue_length
-
(Integer)- The number of tasks in the queue awaiting execution.
def queue_length @mutex.synchronize { @queue.length } end
def scheduled_task_count
-
(Integer)- The number of tasks that have been scheduled for execution on the pool since construction.
def scheduled_task_count @mutex.synchronize { @scheduled_task_count } end
def shutdown
workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the
Gracefully shutdown each thread when it is done with its current task. This should not be called until all
def shutdown @mutex.synchronize do # Stop all workers @pool.each(&:stop) end end