lib/async/worker_pool.rb
# frozen_string_literal: true # Released under the MIT License. # Copyright, 2024, by Samuel Williams. require "etc" module Async # A simple work pool that offloads work to a background thread. # # @private class WorkerPool # Used to augment the scheduler to add support for blocking operations. module BlockingOperationWait # Wait for the given work to be executed. # # @public Since *Async v2.19* and *Ruby v3.4*. # @asynchronous May be non-blocking. # # @parameter work [Proc] The work to execute on a background thread. # @returns [Object] The result of the work. def blocking_operation_wait(work) @worker_pool.call(work) end end # Execute the given work in a background thread. class Promise # Create a new promise. # # @parameter work [Proc] The work to be done. def initialize(work) @work = work @state = :pending @value = nil @guard = ::Mutex.new @condition = ::ConditionVariable.new @thread = nil end # Execute the work and resolve the promise. def call work = nil @guard.synchronize do @thread = ::Thread.current return unless work = @work end resolve(work.call) rescue Exception => error reject(error) end private def resolve(value) @guard.synchronize do @work = nil @thread = nil @value = value @state = :resolved @condition.broadcast end end private def reject(error) @guard.synchronize do @work = nil @thread = nil @value = error @state = :failed @condition.broadcast end end # Cancel the work and raise an exception in the background thread. def cancel return unless @work @guard.synchronize do @work = nil @state = :cancelled @thread&.raise(Interrupt) end end # Wait for the work to be done. # # @returns [Object] The result of the work. def wait @guard.synchronize do while @state == :pending @condition.wait(@guard) end if @state == :failed raise @value else return @value end end end end # A background worker thread. class Worker # Create a new worker. def initialize @work = ::Thread::Queue.new @thread = ::Thread.new(&method(:run)) end # Execute work until the queue is closed. def run while work = @work.pop work.call end end # Close the worker thread. def close if thread = @thread @thread = nil thread.kill end end # Call the work and notify the scheduler when it is done. def call(work) promise = Promise.new(work) @work.push(promise) begin return promise.wait ensure promise.cancel end end end # Create a new work pool. # # @parameter size [Integer] The number of threads to use. def initialize(size: Etc.nprocessors) @ready = ::Thread::Queue.new size.times do @ready.push(Worker.new) end end # Close the work pool. Kills all outstanding work. def close if ready = @ready @ready = nil ready.close while worker = ready.pop worker.close end end end # Offload work to a thread. # # @parameter work [Proc] The work to be done. def call(work) if ready = @ready worker = ready.pop begin worker.call(work) ensure ready.push(worker) end else raise RuntimeError, "No worker available!" end end end end