# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2019-2024, by Samuel Williams.
# Copyright, 2020, by Simon Perepelitsa.
# Copyright, 2024, by Thomas Morgan.
require "console/logger"
require "async"
require "async/notification"
require "async/semaphore"
require "traces"
module Async
module Pool
# A resource pool controller.
class Controller
# Create a new resource pool, using the given block to create new resources.
def self.wrap(**options, &block)
self.new(block, **options)
end
# Create a new resource pool.
#
# @parameter constructor [Proc] A block which creates a new resource.
# @parameter limit [Integer | Nil] The maximum number of resources that this pool can have at any given time. If nil, the pool can have an unlimited number of resources.
# @parameter concurrency [Integer] The maximum number of concurrent tasks that can be creating a new resource.
# @parameter policy [Policy] The pool policy.
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
@constructor = constructor
@limit = limit
# This semaphore is used to limit the number of concurrent tasks which are creating new resources.
@guard = Async::Semaphore.new(concurrency)
@policy = policy
@gardener = nil
@tags = tags
# All available resources:
@resources = {}
# Resources which may be available to be acquired:
# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
@available = []
# Used to signal when a resource has been released:
@notification = Async::Notification.new
end
# @attribute [Proc] The constructor used to create new resources.
attr :constructor
# @attribute [Integer] The maximum number of resources that this pool can have at any given time.
attr_accessor :limit
# Generate a human-readable representation of the pool.
def to_s
if @resources.empty?
"\#<#{self.class}(#{usage_string})>"
else
"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
end
end
# Generate a JSON representation of the pool.
def as_json(...)
{
limit: @limit,
concurrency: @guard.limit,
usage: @resources.size,
availability_summary: self.availability_summary,
}
end
# Generate a JSON representation of the pool.
def to_json(...)
as_json.to_json(...)
end
# @attribute [Integer] The maximum number of concurrent tasks that can be creating a new resource.
def concurrency
@guard.limit
end
# Set the maximum number of concurrent tasks that can be creating a new resource.
def concurrency= value
@guard.limit = value
end
# @attribute [Policy] The pool policy.
attr_accessor :policy
# @attribute [Hash(Resource, Integer)] all allocated resources, and their associated usage.
attr :resources
# @attribute [Array(String)] The name of the pool.
attr_accessor :tags
# The number of resources in the pool.
def size
@resources.size
end
# Whether the pool has any active resources.
def active?
!@resources.empty?
end
# Whether there are resources which are currently in use.
def busy?
@resources.collect do |_, usage|
return true if usage > 0
end
return false
end
# Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.
def available?
@available.any?
end
# Wait until a pool resource has been freed.
def wait
@notification.wait
end
# Whether the pool is empty.
def empty?
@resources.empty?
end
# Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.
def acquire
resource = wait_for_resource
return resource unless block_given?
begin
yield resource
ensure
release(resource)
end
end
# Make the resource resources and let waiting tasks know that there is something resources.
def release(resource)
processed = false
# A resource that is not good should also not be reusable.
if resource.reusable?
processed = reuse(resource)
end
# @policy.released(self, resource)
ensure
retire(resource) unless processed
end
def drain
Console.debug(self, "Draining pool...", size: @resources.size)
# Enumerate all existing resources and retire them:
while resource = acquire_existing_resource
retire(resource)
end
end
# Close all resources in the pool.
def close
self.drain
@available.clear
@gardener&.stop
end
# Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
# @parameter retain [Integer] the minimum number of resources to retain.
# @yields {|resource| ...} Any unused resource.
def prune(retain = 0)
unused = []
# This code must not context switch:
@resources.each do |resource, usage|
if usage.zero?
unused << resource
end
end
# It's okay for this to context switch:
unused.each do |resource|
if block_given?
yield resource
else
retire(resource)
end
break if @resources.size <= retain
end
# Update availability list:
@available.clear
@resources.each do |resource, usage|
if usage < resource.concurrency and resource.reusable?
@available << resource
end
end
return unused.size
end
# Retire a specific resource.
def retire(resource)
Console.debug(self) {"Retire #{resource}"}
@resources.delete(resource)
resource.close
@notification.signal
return true
end
protected
def start_gardener
return if @gardener
@gardener = true
Async(transient: true, annotation: "#{self.class} Gardener") do |task|
@gardener = task
while true
if @policy
@policy.call(self)
else
Task.yield
end
self.wait
end
ensure
@gardener = nil
self.close
end
end
def usage_string
"#{@resources.size}/#{@limit || '∞'}"
end
def availability_summary
@resources.collect do |resource, usage|
"#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}"
end
end
# def usage
# @resources.count{|resource, usage| usage > 0}
# end
#
# def free
# @resources.count{|resource, usage| usage == 0}
# end
def reuse(resource)
Console.debug(self) {"Reuse #{resource}"}
usage = @resources[resource]
if usage.nil? || usage.zero?
raise "Trying to reuse unacquired resource: #{resource}!"
end
# If the resource was fully utilized, it now becomes available:
if usage == resource.concurrency
@available.push(resource)
end
@resources[resource] = usage - 1
@notification.signal
return true
end
def wait_for_resource
# If we fail to create a resource (below), we will end up waiting for one to become resources.
until resource = available_resource
@notification.wait
end
# Be careful not to context switch or fail here.
return resource
end
# @returns [Object] A new resource in a "used" state.
def create_resource
self.start_gardener
# This might return nil, which means creating the resource failed.
if resource = @constructor.call
@resources[resource] = 1
# Make the resource available if it can be used multiple times:
if resource.concurrency > 1
@available.push(resource)
end
end
# @policy.created(self, resource)
return resource
end
# @returns [Object] An existing resource in a "used" state.
def available_resource
resource = nil
@guard.acquire do
resource = acquire_or_create_resource
end
return resource
rescue Exception => error
reuse(resource) if resource
raise
end
private
# Acquire an existing resource with zero usage.
# If there are resources that are in use, wait until they are released.
def acquire_existing_resource
while @resources.any?
@resources.each do |resource, usage|
if usage == 0
return resource
end
end
@notification.wait
end
# Only when the pool has been completely drained, return nil:
return nil
end
def acquire_or_create_resource
while resource = @available.last
if usage = @resources[resource] and usage < resource.concurrency
if resource.viable?
usage = (@resources[resource] += 1)
if usage == resource.concurrency
# The resource is used up to it's limit:
@available.pop
end
return resource
else
retire(resource)
@available.pop
end
else
# The resource has been removed already, so skip it and remove it from the availability list.
@available.pop
end
end
if @limit.nil? or @resources.size < @limit
Console.debug(self) {"No available resources, allocating new one..."}
return create_resource
end
end
Traces::Provider(self) do
def create_resource(...)
attributes = {
concurrency: @guard.limit,
}
attributes.merge!(@tags) if @tags
Traces.trace('async.pool.create', attributes: attributes) {super}
end
def drain(...)
attributes = {
size: @resources.size,
}
attributes.merge!(@tags) if @tags
Traces.trace('async.pool.drain', attributes: attributes) {super}
end
def acquire(...)
attributes = {
size: @resources.size,
limit: @limit,
}
attributes.merge!(@tags) if @tags
Traces.trace('async.pool.acquire', attributes: attributes) {super}
end
def release(...)
attributes = {
size: @resources.size,
}
attributes.merge!(@tags) if @tags
Traces.trace('async.pool.release', attributes: attributes) {super}
end
def retire(...)
attributes = {
size: @resources.size,
}
attributes.merge!(@tags) if @tags
Traces.trace('async.pool.retire', attributes: attributes) {super}
end
end
end
end
end