# frozen_string_literal: true# Released under the MIT License.# Copyright, 2019-2024, by Samuel Williams.# Copyright, 2020, by Simon Perepelitsa.# Copyright, 2024, by Thomas Morgan.require'console/logger'require'async'require'async/notification'require'async/semaphore'moduleAsyncmodulePool# A resource pool controller.classController# Create a new resource pool, using the given block to create new resources.defself.wrap(**options,&block)self.new(block,**options)end# Create a new resource pool.## @parameter constructor [Proc] A block which creates a new resource.# @parameter limit [Integer | Nil] The maximum number of resources that this pool can have at any given time. If nil, the pool can have an unlimited number of resources.# @parameter concurrency [Integer] The maximum number of concurrent tasks that can be creating a new resource.# @parameter policy [Policy] The pool policy.definitialize(constructor,limit: nil,concurrency: (limit||1),policy: nil)@constructor=constructor@limit=limit# This semaphore is used to limit the number of concurrent tasks which are creating new resources.@guard=Async::Semaphore.new(concurrency)@policy=policy@gardener=nil# All available resources:@resources={}# Resources which may be available to be acquired:# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.@available=[]# Used to signal when a resource has been released:@notification=Async::Notification.newend# @attribute [Proc] The constructor used to create new resources.attr:constructor# @attribute [Integer] The maximum number of resources that this pool can have at any given time.attr_accessor:limit# Generate a human-readable representation of the pool.defto_sif@resources.empty?"\#<#{self.class}(#{usage_string})>"else"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"endend# Generate a JSON representation of the pool.defas_json(...){limit: @limit,concurrency: @guard.limit,usage: @resources.size,availability_summary: self.availability_summary,}end# Generate a JSON representation of the pool.defto_json(...)as_json.to_json(...)end# @attribute [Integer] The maximum number of concurrent tasks that can be creating a new resource.defconcurrency@guard.limitend# Set the maximum number of concurrent tasks that can be creating a new resource.defconcurrency=value@guard.limit=valueend# @attribute [Policy] The pool policy.attr_accessor:policy# @attribute [Hash(Resource, Integer)] all allocated resources, and their associated usage.attr:resources# The number of resources in the pool.defsize@resources.sizeend# Whether the pool has any active resources.defactive?!@resources.empty?end# Whether there are resources which are currently in use.defbusy?@resources.collectdo|_,usage|returntrueifusage>0endreturnfalseend# Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.defavailable?@available.any?end# Wait until a pool resource has been freed.defwait@notification.waitend# Whether the pool is empty.defempty?@resources.empty?end# Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.defacquireresource=wait_for_resourcereturnresourceunlessblock_given?beginyieldresourceensurerelease(resource)endend# Make the resource resources and let waiting tasks know that there is something resources.defrelease(resource)processed=false# A resource that is not good should also not be reusable.ifresource.reusable?processed=reuse(resource)end# @policy.released(self, resource)ensureretire(resource)unlessprocessedend# Close all resources in the pool.defclose@available.clearwhilepair=@resources.shiftresource,usage=pairifusage>0Console.warn(self,resource: resource,usage: usage){"Closing resource while still in use!"}endresource.closeend@gardener&.stopend# Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.# @parameter retain [Integer] the minimum number of resources to retain.# @yields {|resource| ...} Any unused resource.defprune(retain=0)unused=[]# This code must not context switch:@resources.eachdo|resource,usage|ifusage.zero?unused<<resourceendend# It's okay for this to context switch:unused.eachdo|resource|ifblock_given?yieldresourceelseretire(resource)endbreakif@resources.size<=retainend# Update availability list:@available.clear@resources.eachdo|resource,usage|ifusage<resource.concurrencyandresource.reusable?@available<<resourceendendreturnunused.sizeend# Retire a specific resource.defretire(resource)Console.debug(self){"Retire #{resource}"}@resources.delete(resource)resource.close@notification.signalreturntrueendprotecteddefstart_gardenerreturnif@gardenerAsync(transient: true,annotation: "#{self.class} Gardener")do|task|@gardener=taskwhiletrueif@policy@policy.call(self)elseTask.yieldendself.waitendensure@gardener=nilself.closeendenddefusage_string"#{@resources.size}/#{@limit||'∞'}"enddefavailability_summary@resources.collectdo|resource,usage|"#{usage}/#{resource.concurrency}#{resource.viable??nil:'*'}/#{resource.count}"endend# def usage# @resources.count{|resource, usage| usage > 0}# end## def free# @resources.count{|resource, usage| usage == 0}# enddefreuse(resource)Console.debug(self){"Reuse #{resource}"}usage=@resources[resource]ifusage.nil?||usage.zero?raise"Trying to reuse unacquired resource: #{resource}!"end# If the resource was fully utilized, it now becomes available:ifusage==resource.concurrency@available.push(resource)end@resources[resource]=usage-1@notification.signalreturntrueenddefwait_for_resource# If we fail to create a resource (below), we will end up waiting for one to become resources.untilresource=available_resource@notification.waitendConsole.debug(self){"Wait for resource -> #{resource}"}# if resource.concurrency > 1# @notification.signal# endreturnresourceend# @returns [Object] A new resource in a "used" state.defcreate_resourceself.start_gardener# This might return nil, which means creating the resource failed.ifresource=@constructor.call@resources[resource]=1# Make the resource available if it can be used multiple times:ifresource.concurrency>1@available.push(resource)endend# @policy.created(self, resource)returnresourceend# @returns [Object] An existing resource in a "used" state.defavailable_resourceresource=nil@guard.acquiredoresource=get_resourceendreturnresourcerescueExceptionreuse(resource)ifresourceraiseendprivatedefget_resourcewhileresource=@available.lastifusage=@resources[resource]andusage<resource.concurrencyifresource.viable?usage=(@resources[resource]+=1)ifusage==resource.concurrency# The resource is used up to it's limit:@available.popendreturnresourceelseretire(resource)@available.popendelse# The resource has been removed already, so skip it and remove it from the availability list.@available.popendendif@limit.nil?or@resources.size<@limitConsole.debug(self){"No available resources, allocating new one..."}returncreate_resourceendendendendend