# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require'console/logger'require'async'require'async/notification'require'async/semaphore'moduleAsyncmodulePoolclassControllerdefself.wrap(**options,&block)self.new(block,**options)enddefinitialize(constructor,limit: nil)# All available resources:@resources={}# Resources which may be available to be acquired:# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.@available=[]@notification=Async::Notification.new@limit=limit@constructor=constructor@guard=Async::Semaphore.new(1)@gardener=nilend# @attribute [Hash(Resource, Integer)] all allocated resources, and their associated usage.attr:resourcesdefsize@resources.sizeend# Whether the pool has any active resources.defactive?!@resources.empty?end# Whether there are resources which are currently in use.defbusy?@resources.collectdo|_,usage|returntrueifusage>0endreturnfalseend# Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.defavailable?@available.any?end# Wait until a pool resource has been freed.defwait@notification.waitenddefempty?@resources.empty?enddefacquireresource=wait_for_resourcereturnresourceunlessblock_given?beginyieldresourceensurerelease(resource)endend# Make the resource resources and let waiting tasks know that there is something resources.defrelease(resource)processed=false# A resource that is not good should also not be reusable.ifresource.reusable?processed=reuse(resource)endensureretire(resource)unlessprocessedenddefclose@available.clear@resources.each_key(&:close)@resources.clear@gardener&.stopenddefto_sif@resources.empty?"\#<#{self.class}(#{usage_string})>"else"\#<#{self.class}(#{usage_string}) #{availability_string}>"endend# Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.# @param retain [Integer] the minimum number of resources to retain.# @yield resource [Resource] unused resources.defprune(retain=0)unused=[]@resources.eachdo|resource,usage|ifusage.zero?unused<<resourceendendunused.eachdo|resource|ifblock_given?yieldresourceelseretire(resource)endbreakif@resources.size<=retainend# Update availability list:@available.clear@resources.eachdo|resource,usage|ifusage<resource.concurrencyandresource.reusable?@available<<resourceendendreturnunused.sizeenddefretire(resource)Console.logger.debug(self){"Retire #{resource}"}@resources.delete(resource)resource.close@notification.signalreturntrueendprotecteddefstart_gardenerreturnif@gardenerAsync(transient: true,annotation: "#{self.class} Gardener")do|task|@gardener=taskTask.yieldensure@gardener=nilself.closeendenddefusage_string"#{@resources.size}/#{@limit||'∞'}"enddefavailability_string@resources.collectdo|resource,usage|"#{usage}/#{resource.concurrency}#{resource.viable??nil:'*'}/#{resource.count}"end.join(";")enddefusage@resources.count{|resource,usage|usage>0}enddeffree@resources.count{|resource,usage|usage==0}end# @returns [Boolean] Whether the number of available resources is excessive and we should retire some.defoverflowing?if@resources.any?(self.free.to_f/@resources.size)>0.5endenddefreuse(resource)Console.logger.debug(self){"Reuse #{resource}"}usage=@resources[resource]ifusage.zero?raise"Trying to reuse unacquired resource: #{resource}!"end# We retire resources when adding to the @available list would overflow our pool:ifusage==1ifoverflowing?returnretire(resource)endend# If the resource was fully utilized, it now becomes available:ifusage==resource.concurrency@available.push(resource)end@resources[resource]=usage-1@notification.signalreturntrueenddefwait_for_resource# If we fail to create a resource (below), we will end up waiting for one to become resources.untilresource=available_resource@notification.waitendConsole.logger.debug(self){"Wait for resource -> #{resource}"}# if resource.concurrency > 1# @notification.signal# endreturnresourceend# @returns [Object] A new resource in a "used" state.defcreate_resourceself.start_gardener# This might return nil, which means creating the resource failed.ifresource=@constructor.call@resources[resource]=1# Make the resource available if it can be used multiple times:ifresource.concurrency>1@available.push(resource)endendreturnresourceend# @returns [Object] An existing resource in a "used" state.defavailable_resourceresource=nil@guard.acquiredoresource=get_resourceendreturnresourcerescueExceptionreuse(resource)ifresourceraiseendprivatedefget_resourcewhileresource=@available.lastifusage=@resources[resource]andusage<resource.concurrencyifresource.viable?usage=(@resources[resource]+=1)ifusage==resource.concurrency# The resource is used up to it's limit:@available.popendreturnresourceelseretire(resource)@available.popendelse# The resource has been removed already, so skip it and remove it from the availability list.@available.popendendif@limit.nil?or@resources.size<@limitConsole.logger.debug(self){"No available resources, allocating new one..."}returncreate_resourceendendendendend