class Async::Pool::Controller
def self.wrap(**options, &block)
def self.wrap(**options, &block) self.new(block, **options) end
def acquire
def acquire resource = wait_for_resource return resource unless block_given? begin yield resource ensure release(resource) end end
def active?
def active? !@resources.empty? end
def availability_string
def availability_string @resources.collect do |resource,usage| "#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}" end.join(";") end
def available_resource
def available_resource @guard.acquire do while resource = @available.last if usage = @resources[resource] and usage < resource.concurrency if resource.viable? @resources[resource] += 1 return resource else retire(resource) @available.pop end else @available.pop end end if @limit.nil? or @resources.size < @limit Async.logger.debug(self) {"No available resources, allocating new one..."} return create_resource end end return nil end
def busy?
def busy? @resources.collect do |_, usage| return true if usage > 0 end return false end
def close
def close @resources.each_key(&:close) @resources.clear @gardener&.stop end
def create_resource
def create_resource self.start_gardener # This might return nil, which means creating the resource failed. if resource = @constructor.call @resources[resource] = 1 @available.push(resource) if resource.concurrency > 1 end return resource end
def empty?
def empty? @resources.empty? end
def initialize(constructor, limit: nil)
def initialize(constructor, limit: nil) @resources = {} @available = [] @notification = Async::Notification.new @limit = limit @constructor = constructor @guard = Async::Semaphore.new(1) @gardener = nil end
def prune(retain = 0)
-
retain(Integer) -- the minimum number of resources to retain.
def prune(retain = 0) unused = [] @resources.each do |resource, usage| unused << resource if usage.zero? end unused.each do |resource| if block_given? yield resource else retire(resource) end break if @resources.size <= retain end return unused.size end
def release(resource)
def release(resource) # A resource that is not good should also not be reusable. if resource.reusable? reuse(resource) else retire(resource) end end
def retire(resource)
def retire(resource) Async.logger.debug(self) {"Retire #{resource}"} @resources.delete(resource) resource.close @notification.signal end
def reuse(resource)
def reuse(resource) Async.logger.debug(self) {"Reuse #{resource}"} @resources[resource] -= 1 @available.push(resource) @notification.signal end
def size
def size @resources.size end
def start_gardener
def start_gardener return if @gardener Async(transient: true) do |task| @gardener = task Task.yield ensure @gardener = nil self.close end end
def to_s
def to_s if @resources.empty? "\#<#{self.class}(#{usage_string})>" else "\#<#{self.class}(#{usage_string}) #{availability_string}>" end end
def usage_string
def usage_string "#{@resources.size}/#{@limit || '∞'}" end
def wait
def wait @notification.wait end
def wait_for_resource
def wait_for_resource # If we fail to create a resource (below), we will end up waiting for one to become resources. until resource = available_resource @notification.wait end Async.logger.debug(self) {"Wait for resource -> #{resource}"} # if resource.concurrency > 1 # @notification.signal # end return resource end