class Async::Pool::Controller
A resource pool controller.
def self.wrap(**options, &block)
def self.wrap(**options, &block) self.new(block, **options) end
def acquire
def acquire resource = wait_for_resource return resource unless block_given? begin yield resource ensure release(resource) end end
def acquire_existing_resource
Acquire an existing resource with zero usage.
def acquire_existing_resource while @resources.any? @resources.each do |resource, usage| if usage == 0 return resource end end @notification.wait end # Only when the pool has been completely drained, return nil: return nil end
def acquire_or_create_resource
def acquire_or_create_resource while resource = @available.last if usage = @resources[resource] and usage < resource.concurrency if resource.viable? usage = (@resources[resource] += 1) if usage == resource.concurrency # The resource is used up to it's limit: @available.pop end return resource else retire(resource) @available.pop end else # The resource has been removed already, so skip it and remove it from the availability list. @available.pop end end if @limit.nil? or @resources.size < @limit Console.debug(self) {"No available resources, allocating new one..."} return create_resource end end
def active?
def active? !@resources.empty? end
def as_json(...)
def as_json(...) { limit: @limit, concurrency: @guard.limit, usage: @resources.size, availability_summary: self.availability_summary, } end
def availability_summary
def availability_summary @resources.collect do |resource, usage| "#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}" end end
def available?
def available? @available.any? end
def available_resource
def available_resource resource = nil @guard.acquire do resource = acquire_or_create_resource end return resource rescue Exception => error reuse(resource) if resource raise end
def busy?
def busy? @resources.collect do |_, usage| return true if usage > 0 end return false end
def close
def close self.drain @available.clear @gardener&.stop end
def concurrency
def concurrency @guard.limit end
def concurrency= value
def concurrency= value @guard.limit = value end
def create_resource
def create_resource self.start_gardener # This might return nil, which means creating the resource failed. if resource = @constructor.call @resources[resource] = 1 # Make the resource available if it can be used multiple times: if resource.concurrency > 1 @available.push(resource) end end # @policy.created(self, resource) return resource end
def drain
def drain Console.debug(self, "Draining pool...", size: @resources.size) # Enumerate all existing resources and retire them: while resource = acquire_existing_resource retire(resource) end end
def empty?
def empty? @resources.empty? end
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
@parameter concurrency [Integer] The maximum number of concurrent tasks that can be creating a new resource.
@parameter limit [Integer | Nil] The maximum number of resources that this pool can have at any given time. If nil, the pool can have an unlimited number of resources.
@parameter constructor [Proc] A block which creates a new resource.
Create a new resource pool.
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) @constructor = constructor @limit = limit # This semaphore is used to limit the number of concurrent tasks which are creating new resources. @guard = Async::Semaphore.new(concurrency) @policy = policy @gardener = nil @tags = tags # All available resources: @resources = {} # Resources which may be available to be acquired: # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable. @available = [] # Used to signal when a resource has been released: @notification = Async::Notification.new end
def prune(retain = 0)
@parameter retain [Integer] the minimum number of resources to retain.
Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
def prune(retain = 0) unused = [] # This code must not context switch: @resources.each do |resource, usage| if usage.zero? unused << resource end end # It's okay for this to context switch: unused.each do |resource| if block_given? yield resource else retire(resource) end break if @resources.size <= retain end # Update availability list: @available.clear @resources.each do |resource, usage| if usage < resource.concurrency and resource.reusable? @available << resource end end return unused.size end
def release(resource)
def release(resource) processed = false # A resource that is not good should also not be reusable. if resource.reusable? processed = reuse(resource) end # @policy.released(self, resource) ensure retire(resource) unless processed end
def retire(resource)
def retire(resource) Console.debug(self) {"Retire #{resource}"} @resources.delete(resource) resource.close @notification.signal return true end
def reuse(resource)
def reuse(resource) Console.debug(self) {"Reuse #{resource}"} usage = @resources[resource] if usage.nil? || usage.zero? raise "Trying to reuse unacquired resource: #{resource}!" end # If the resource was fully utilized, it now becomes available: if usage == resource.concurrency @available.push(resource) end @resources[resource] = usage - 1 @notification.signal return true end
def size
def size @resources.size end
def start_gardener
def start_gardener return if @gardener @gardener = true Async(transient: true, annotation: "#{self.class} Gardener") do |task| @gardener = task while true if @policy @policy.call(self) else Task.yield end self.wait end ensure @gardener = nil self.close end end
def to_json(...)
def to_json(...) as_json.to_json(...) end
def to_s
def to_s if @resources.empty? "\#<#{self.class}(#{usage_string})>" else "\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>" end end
def usage_string
def usage_string "#{@resources.size}/#{@limit || '∞'}" end
def wait
def wait @notification.wait end
def wait_for_resource
def wait_for_resource # If we fail to create a resource (below), we will end up waiting for one to become resources. until resource = available_resource @notification.wait end # Be careful not to context switch or fail here. return resource end