class Async::Pool::Controller

def self.wrap(**options, &block)

def self.wrap(**options, &block)
	self.new(block, **options)
end

def acquire

def acquire
	resource = wait_for_resource
	
	return resource unless block_given?
	
	begin
		yield resource
	ensure
		release(resource)
	end
end

def active?

Whether the pool has any active resources.
def active?
	!@resources.empty?
end

def as_json(...)

def as_json(...)
	{
		limit: @limit,
		concurrency: @guard.limit,
		usage: @resources.size,
		availability_summary: self.availability_summary,
	}
end

def availability_summary

def availability_summary
	@resources.collect do |resource, usage|
		"#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}"
	end
end

def available?

Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.
def available?
	@available.any?
end

def available_resource

@returns [Object] An existing resource in a "used" state.
def available_resource
	resource = nil
	
	@guard.acquire do
		resource = get_resource
	end
	
	return resource
rescue Exception
	reuse(resource) if resource
	raise
end

def busy?

Whether there are resources which are currently in use.
def busy?
	@resources.collect do |_, usage|
		return true if usage > 0
	end
	
	return false
end

def close

def close
	@available.clear
	
	while pair = @resources.shift
		resource, usage = pair
		
		if usage > 0
			Console.logger.warn(self, resource: resource, usage: usage) {"Closing resource while still in use!"}
		end
		
		resource.close
	end
	
	@gardener&.stop
end

def concurrency

@attribute [Integer] The maximum number of concurrent tasks that can be creating a new resource.
def concurrency
	@guard.limit
end

def concurrency= value

def concurrency= value
	@guard.limit = value
end

def create_resource

@returns [Object] A new resource in a "used" state.
def create_resource
	self.start_gardener
	
	# This might return nil, which means creating the resource failed.
	if resource = @constructor.call
		@resources[resource] = 1
		
		# Make the resource available if it can be used multiple times:
		if resource.concurrency > 1
			@available.push(resource)
		end
	end
	
	# @policy.created(self, resource)
	
	return resource
end

def empty?

def empty?
	@resources.empty?
end

def get_resource

def get_resource
	while resource = @available.last
		if usage = @resources[resource] and usage < resource.concurrency
			if resource.viable?
				usage = (@resources[resource] += 1)
				
				if usage == resource.concurrency
					# The resource is used up to it's limit:
					@available.pop
				end
				
				return resource
			else
				retire(resource)
				@available.pop
			end
		else
			# The resource has been removed already, so skip it and remove it from the availability list.
			@available.pop
		end
	end
	
	if @limit.nil? or @resources.size < @limit
		Console.logger.debug(self) {"No available resources, allocating new one..."}
		
		return create_resource
	end
end

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)
	@constructor = constructor
	@limit = limit
	
	# This semaphore is used to limit the number of concurrent tasks which are creating new resources.
	@guard = Async::Semaphore.new(concurrency)
	
	@policy = policy
	@gardener = nil
	
	# All available resources:
	@resources = {}
	
	# Resources which may be available to be acquired:
	# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
	@available = []
	
	# Used to signal when a resource has been released:
	@notification = Async::Notification.new
end

def prune(retain = 0)

Parameters:
  • retain (Integer) -- the minimum number of resources to retain.
def prune(retain = 0)
	unused = []
	
	# This code must not context switch:
	@resources.each do |resource, usage|
		if usage.zero?
			unused << resource
		end
	end
	
	# It's okay for this to context switch:
	unused.each do |resource|
		if block_given?
			yield resource
		else
			retire(resource)
		end
		
		break if @resources.size <= retain
	end
	
	# Update availability list:
	@available.clear
	@resources.each do |resource, usage|
		if usage < resource.concurrency and resource.reusable?
			@available << resource
		end
	end
	
	return unused.size
end

def release(resource)

Make the resource resources and let waiting tasks know that there is something resources.
def release(resource)
	processed = false
	
	# A resource that is not good should also not be reusable.
	if resource.reusable?
		processed = reuse(resource)
	end
	
	# @policy.released(self, resource)
ensure
	retire(resource) unless processed
end

def retire(resource)

def retire(resource)
	Console.logger.debug(self) {"Retire #{resource}"}
	
	@resources.delete(resource)
	
	resource.close
	
	@notification.signal
	
	return true
end

def reuse(resource)

def reuse(resource)
	Console.logger.debug(self) {"Reuse #{resource}"}
	usage = @resources[resource]
	
	if usage.zero?
		raise "Trying to reuse unacquired resource: #{resource}!"
	end
	
	# If the resource was fully utilized, it now becomes available:
	if usage == resource.concurrency
		@available.push(resource)
	end
	
	@resources[resource] = usage - 1
	
	@notification.signal
	
	return true
end

def size

def size
	@resources.size
end

def start_gardener

def start_gardener
	return if @gardener
	
	Async(transient: true, annotation: "#{self.class} Gardener") do |task|
		@gardener = task
		
		while true
			if @policy
				@policy.call(self)
			else
				Task.yield
			end
			
			self.wait
		end
	ensure
		@gardener = nil
		self.close
	end
end

def to_json(...)

def to_json(...)
	as_json.to_json(...)
end

def to_s

def to_s
	if @resources.empty?
		"\#<#{self.class}(#{usage_string})>"
	else
		"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
	end
end

def usage_string

def usage_string
	"#{@resources.size}/#{@limit || '∞'}"
end

def wait

Wait until a pool resource has been freed.
def wait
	@notification.wait
end

def wait_for_resource

def wait_for_resource
	# If we fail to create a resource (below), we will end up waiting for one to become resources.
	until resource = available_resource
		@notification.wait
	end
	
	Console.logger.debug(self) {"Wait for resource -> #{resource}"}
	
	# if resource.concurrency > 1
	# 	@notification.signal
	# end
	
	return resource
end