class Async::Pool::Controller

def self.wrap(**options, &block)

def self.wrap(**options, &block)
	self.new(block, **options)
end

def acquire

def acquire
	resource = wait_for_resource
	
	return resource unless block_given?
	
	begin
		yield resource
	ensure
		release(resource)
	end
end

def active?

Whether the pool has any active resources.
def active?
	!@resources.empty?
end

def availability_string

def availability_string
	@resources.collect do |resource,usage|
		"#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}"
	end.join(";")
end

def available?

Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.
def available?
	@available.any?
end

def available_resource

@returns [Object] An existing resource in a "used" state.
def available_resource
	@guard.acquire do
		while resource = @available.last
			if usage = @resources[resource] and usage < resource.concurrency
				if resource.viable?
					usage = (@resources[resource] += 1)
					
					if usage == resource.concurrency
						# The resource is used up to it's limit:
						@available.pop
					end
					
					return resource
				else
					retire(resource)
					@available.pop
				end
			else
				# The resource has been removed already, so skip it and remove it from the availability list.
				@available.pop
			end
		end
		
		if @limit.nil? or @resources.size < @limit
			Async.logger.debug(self) {"No available resources, allocating new one..."}
			
			return create_resource
		end
	end
	
	return nil
end

def busy?

Whether there are resources which are currently in use.
def busy?
	@resources.collect do |_, usage|
		return true if usage > 0
	end
	
	return false
end

def close

def close
	@available.clear
	
	@resources.each_key(&:close)
	@resources.clear
	
	@gardener&.stop
end

def create_resource

@returns [Object] A new resource in a "used" state.
def create_resource
	self.start_gardener
	
	# This might return nil, which means creating the resource failed.
	if resource = @constructor.call
		@resources[resource] = 1
		
		# Make the resource available if it can be used multiple times:
		if resource.concurrency > 1
			@available.push(resource)
		end
	end
	
	return resource
end

def empty?

def empty?
	@resources.empty?
end

def free

def free
	@resources.count{|resource, usage| usage == 0}
end

def initialize(constructor, limit: nil)

def initialize(constructor, limit: nil)
	# All available resources:
	@resources = {}
	
	# Resources which may be available to be acquired:
	# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
	@available = []
	
	@notification = Async::Notification.new
	
	@limit = limit
	
	@constructor = constructor
	@guard = Async::Semaphore.new(1)
	
	@gardener = nil
end

def overflowing?

@returns [Boolean] Whether the number of available resources is excessive and we should retire some.
def overflowing?
	if @resources.any?
		(self.free.to_f / @resources.size) > 0.5
	end
end

def prune(retain = 0)

Parameters:
  • retain (Integer) -- the minimum number of resources to retain.
def prune(retain = 0)
	unused = []
	
	@resources.each do |resource, usage|
		if usage.zero?
			unused << resource
		end
	end
	
	unused.each do |resource|
		if block_given?
			yield resource
		else
			retire(resource)
		end
		
		break if @resources.size <= retain
	end
	
	# Update availability list:
	@available.clear
	@resources.each do |resource, usage|
		if usage < resource.concurrency and resource.reusable?
			@available << resource
		end
	end
	
	return unused.size
end

def release(resource)

Make the resource resources and let waiting tasks know that there is something resources.
def release(resource)
	# A resource that is not good should also not be reusable.
	if resource.reusable?
		reuse(resource)
	else
		retire(resource)
	end
end

def retire(resource)

def retire(resource)
	Async.logger.debug(self) {"Retire #{resource}"}
	
	@resources.delete(resource)
	
	resource.close
	
	@notification.signal
end

def reuse(resource)

def reuse(resource)
	Async.logger.debug(self) {"Reuse #{resource}"}
	usage = @resources[resource]
	
	if usage.zero?
		raise "Trying to reuse unacquired resource: #{resource}!"
	end
	
	# We retire resources when adding to the @available list would overflow our pool:
	if usage == 1
		if overflowing?
			return retire(resource)
		end
	end
	
	# If the resource was fully utilized, it now becomes available:
	if usage == resource.concurrency
		@available.push(resource)
	end
	
	@resources[resource] = usage - 1
	
	@notification.signal
end

def size

def size
	@resources.size
end

def start_gardener

def start_gardener
	return if @gardener
	
	Async(transient: true, annotation: "#{self.class} Gardener") do |task|
		@gardener = task
		
		Task.yield
	ensure
		@gardener = nil
		self.close
	end
end

def to_s

def to_s
	if @resources.empty?
		"\#<#{self.class}(#{usage_string})>"
	else
		"\#<#{self.class}(#{usage_string}) #{availability_string}>"
	end
end

def usage

def usage
	@resources.count{|resource, usage| usage > 0}
end

def usage_string

def usage_string
	"#{@resources.size}/#{@limit || '∞'}"
end

def wait

Wait until a pool resource has been freed.
def wait
	@notification.wait
end

def wait_for_resource

def wait_for_resource
	# If we fail to create a resource (below), we will end up waiting for one to become resources.
	until resource = available_resource
		@notification.wait
	end
	
	Async.logger.debug(self) {"Wait for resource -> #{resource}"}
	
	# if resource.concurrency > 1
	# 	@notification.signal
	# end
	
	return resource
end