class Async::Pool::Controller
def self.wrap(**options, &block)
def self.wrap(**options, &block) self.new(block, **options) end
def acquire
def acquire resource = wait_for_resource return resource unless block_given? begin yield resource ensure release(resource) end end
def active?
def active? !@resources.empty? end
def as_json(...)
def as_json(...) { limit: @limit, concurrency: @guard.limit, usage: @resources.size, availability_summary: self.availability_summary, } end
def availability_summary
def availability_summary @resources.collect do |resource, usage| "#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}" end end
def available?
def available? @available.any? end
def available_resource
def available_resource resource = nil @guard.acquire do resource = get_resource end return resource rescue Exception reuse(resource) if resource raise end
def busy?
def busy? @resources.collect do |_, usage| return true if usage > 0 end return false end
def close
def close @available.clear while pair = @resources.shift resource, usage = pair if usage > 0 Console.logger.warn(self, resource: resource, usage: usage) {"Closing resource while still in use!"} end resource.close end @gardener&.stop end
def concurrency
def concurrency @guard.limit end
def concurrency= value
def concurrency= value @guard.limit = value end
def create_resource
def create_resource self.start_gardener # This might return nil, which means creating the resource failed. if resource = @constructor.call @resources[resource] = 1 # Make the resource available if it can be used multiple times: if resource.concurrency > 1 @available.push(resource) end end # @policy.created(self, resource) return resource end
def empty?
def empty? @resources.empty? end
def get_resource
def get_resource while resource = @available.last if usage = @resources[resource] and usage < resource.concurrency if resource.viable? usage = (@resources[resource] += 1) if usage == resource.concurrency # The resource is used up to it's limit: @available.pop end return resource else retire(resource) @available.pop end else # The resource has been removed already, so skip it and remove it from the availability list. @available.pop end end if @limit.nil? or @resources.size < @limit Console.logger.debug(self) {"No available resources, allocating new one..."} return create_resource end end
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil) @constructor = constructor @limit = limit # This semaphore is used to limit the number of concurrent tasks which are creating new resources. @guard = Async::Semaphore.new(concurrency) @policy = policy @gardener = nil # All available resources: @resources = {} # Resources which may be available to be acquired: # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable. @available = [] # Used to signal when a resource has been released: @notification = Async::Notification.new end
def prune(retain = 0)
-
retain(Integer) -- the minimum number of resources to retain.
def prune(retain = 0) unused = [] # This code must not context switch: @resources.each do |resource, usage| if usage.zero? unused << resource end end # It's okay for this to context switch: unused.each do |resource| if block_given? yield resource else retire(resource) end break if @resources.size <= retain end # Update availability list: @available.clear @resources.each do |resource, usage| if usage < resource.concurrency and resource.reusable? @available << resource end end return unused.size end
def release(resource)
def release(resource) processed = false # A resource that is not good should also not be reusable. if resource.reusable? processed = reuse(resource) end # @policy.released(self, resource) ensure retire(resource) unless processed end
def retire(resource)
def retire(resource) Console.logger.debug(self) {"Retire #{resource}"} @resources.delete(resource) resource.close @notification.signal return true end
def reuse(resource)
def reuse(resource) Console.logger.debug(self) {"Reuse #{resource}"} usage = @resources[resource] if usage.zero? raise "Trying to reuse unacquired resource: #{resource}!" end # If the resource was fully utilized, it now becomes available: if usage == resource.concurrency @available.push(resource) end @resources[resource] = usage - 1 @notification.signal return true end
def size
def size @resources.size end
def start_gardener
def start_gardener return if @gardener Async(transient: true, annotation: "#{self.class} Gardener") do |task| @gardener = task while true if @policy @policy.call(self) else Task.yield end self.wait end ensure @gardener = nil self.close end end
def to_json(...)
def to_json(...) as_json.to_json(...) end
def to_s
def to_s if @resources.empty? "\#<#{self.class}(#{usage_string})>" else "\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>" end end
def usage_string
def usage_string "#{@resources.size}/#{@limit || '∞'}" end
def wait
def wait @notification.wait end
def wait_for_resource
def wait_for_resource # If we fail to create a resource (below), we will end up waiting for one to become resources. until resource = available_resource @notification.wait end Console.logger.debug(self) {"Wait for resource -> #{resource}"} # if resource.concurrency > 1 # @notification.signal # end return resource end