class Async::Pool::Controller
def self.wrap(**options, &block)
def self.wrap(**options, &block) self.new(block, **options) end
def acquire
def acquire resource = wait_for_resource return resource unless block_given? begin yield resource ensure release(resource) end end
def active?
def active? !@resources.empty? end
def availability_string
def availability_string @resources.collect do |resource,usage| "#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}" end.join(";") end
def available?
def available? @available.any? end
def available_resource
def available_resource resource = nil @guard.acquire do resource = get_resource end return resource rescue Exception reuse(resource) if resource raise end
def busy?
def busy? @resources.collect do |_, usage| return true if usage > 0 end return false end
def close
def close @available.clear @resources.each_key(&:close) @resources.clear @gardener&.stop end
def create_resource
def create_resource self.start_gardener # This might return nil, which means creating the resource failed. if resource = @constructor.call @resources[resource] = 1 # Make the resource available if it can be used multiple times: if resource.concurrency > 1 @available.push(resource) end end return resource end
def empty?
def empty? @resources.empty? end
def free
def free @resources.count{|resource, usage| usage == 0} end
def get_resource
def get_resource esource = @available.last ge = @resources[resource] and usage < resource.concurrency source.viable? e = (@resources[resource] += 1) sage == resource.concurrency he resource is used up to it's limit: ailable.pop rn resource re(resource) ilable.pop resource has been removed already, so skip it and remove it from the availability list. lable.pop it.nil? or @resources.size < @limit e.logger.debug(self) {"No available resources, allocating new one..."} create_resource
def initialize(constructor, limit: nil)
def initialize(constructor, limit: nil) # All available resources: @resources = {} # Resources which may be available to be acquired: # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable. @available = [] @notification = Async::Notification.new @limit = limit @constructor = constructor @guard = Async::Semaphore.new(1) @gardener = nil end
def overflowing?
def overflowing? if @resources.any? (self.free.to_f / @resources.size) > 0.5 end end
def prune(retain = 0)
-
retain(Integer) -- the minimum number of resources to retain.
def prune(retain = 0) unused = [] @resources.each do |resource, usage| if usage.zero? unused << resource end end unused.each do |resource| if block_given? yield resource else retire(resource) end break if @resources.size <= retain end # Update availability list: @available.clear @resources.each do |resource, usage| if usage < resource.concurrency and resource.reusable? @available << resource end end return unused.size end
def release(resource)
def release(resource) # A resource that is not good should also not be reusable. if resource.reusable? reuse(resource) else retire(resource) end end
def retire(resource)
def retire(resource) Console.logger.debug(self) {"Retire #{resource}"} @resources.delete(resource) resource.close @notification.signal end
def reuse(resource)
def reuse(resource) Console.logger.debug(self) {"Reuse #{resource}"} usage = @resources[resource] if usage.zero? raise "Trying to reuse unacquired resource: #{resource}!" end # We retire resources when adding to the @available list would overflow our pool: if usage == 1 if overflowing? return retire(resource) end end # If the resource was fully utilized, it now becomes available: if usage == resource.concurrency @available.push(resource) end @resources[resource] = usage - 1 @notification.signal end
def size
def size @resources.size end
def start_gardener
def start_gardener return if @gardener Async(transient: true, annotation: "#{self.class} Gardener") do |task| @gardener = task Task.yield ensure @gardener = nil self.close end end
def to_s
def to_s if @resources.empty? "\#<#{self.class}(#{usage_string})>" else "\#<#{self.class}(#{usage_string}) #{availability_string}>" end end
def usage
def usage @resources.count{|resource, usage| usage > 0} end
def usage_string
def usage_string "#{@resources.size}/#{@limit || '∞'}" end
def wait
def wait @notification.wait end
def wait_for_resource
def wait_for_resource # If we fail to create a resource (below), we will end up waiting for one to become resources. until resource = available_resource @notification.wait end Console.logger.debug(self) {"Wait for resource -> #{resource}"} # if resource.concurrency > 1 # @notification.signal # end return resource end