class Async::Pool::Controller

A resource pool controller.

def self.wrap(**options, &block)

Create a new resource pool, using the given block to create new resources.
def self.wrap(**options, &block)
	self.new(block, **options)
end

def acquire

Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.
def acquire
	resource = wait_for_resource
	
	return resource unless block_given?
	
	begin
		yield resource
	ensure
		release(resource)
	end
end

def acquire_existing_resource

If there are resources that are in use, wait until they are released.
Acquire an existing resource with zero usage.
def acquire_existing_resource
	while @resources.any?
		@resources.each do |resource, usage|
			if usage == 0
				return resource
			end
		end
		
		@notification.wait
	end
	
	# Only when the pool has been completely drained, return nil:
	return nil
end

def acquire_or_create_resource

def acquire_or_create_resource
	while resource = @available.last
		if usage = @resources[resource] and usage < resource.concurrency
			if resource.viable?
				usage = (@resources[resource] += 1)
				
				if usage == resource.concurrency
					# The resource is used up to it's limit:
					@available.pop
				end
				
				return resource
			else
				retire(resource)
				@available.pop
			end
		else
			# The resource has been removed already, so skip it and remove it from the availability list.
			@available.pop
		end
	end
	
	if @limit.nil? or @resources.size < @limit
		Console.debug(self) {"No available resources, allocating new one..."}
		
		return create_resource
	end
end

def active?

Whether the pool has any active resources.
def active?
	!@resources.empty?
end

def as_json(...)

Generate a JSON representation of the pool.
def as_json(...)
	{
		limit: @limit,
		concurrency: @guard.limit,
		usage: @resources.size,
		availability_summary: self.availability_summary,
	}
end

def availability_summary

def availability_summary
	@resources.collect do |resource, usage|
		"#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}"
	end
end

def available?

Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.
def available?
	@available.any?
end

def available_resource

@returns [Object] An existing resource in a "used" state.
def available_resource
	resource = nil
	
	@guard.acquire do
		resource = acquire_or_create_resource
	end
	
	return resource
rescue Exception => error
	reuse(resource) if resource
	raise
end

def busy?

Whether there are resources which are currently in use.
def busy?
	@resources.collect do |_, usage|
		return true if usage > 0
	end
	
	return false
end

def close

Drain the pool, clear all resources, and stop the gardener.
def close
	self.drain
	
	@available.clear
	@gardener&.stop
end

def concurrency

@attribute [Integer] The maximum number of concurrent tasks that can be creating a new resource.
def concurrency
	@guard.limit
end

def concurrency= value

Set the maximum number of concurrent tasks that can be creating a new resource.
def concurrency= value
	@guard.limit = value
end

def create_resource

@returns [Object] A new resource in a "used" state.
def create_resource
	self.start_gardener
	
	# This might return nil, which means creating the resource failed.
	if resource = @constructor.call
		@resources[resource] = 1
		
		# Make the resource available if it can be used multiple times:
		if resource.concurrency > 1
			@available.push(resource)
		end
	end
	
	# @policy.created(self, resource)
	
	return resource
end

def drain

Drain the pool, closing all resources.
def drain
	Console.debug(self, "Draining pool...", size: @resources.size)
	
	# Enumerate all existing resources and retire them:
	while resource = acquire_existing_resource
		retire(resource)
	end
end

def empty?

Whether the pool is empty.
def empty?
	@resources.empty?
end

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)

@parameter policy [Policy] The pool policy.
@parameter concurrency [Integer] The maximum number of concurrent tasks that can be creating a new resource.
@parameter limit [Integer | Nil] The maximum number of resources that this pool can have at any given time. If nil, the pool can have an unlimited number of resources.
@parameter constructor [Proc] A block which creates a new resource.

Create a new resource pool.
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
	@constructor = constructor
	@limit = limit
	
	# This semaphore is used to limit the number of concurrent tasks which are creating new resources.
	@guard = Async::Semaphore.new(concurrency)
	
	@policy = policy
	@gardener = nil
	
	@tags = tags
	
	# All available resources:
	@resources = {}
	
	# Resources which may be available to be acquired:
	# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
	@available = []
	
	# Used to signal when a resource has been released:
	@notification = Async::Notification.new
end

def prune(retain = 0)

@yields {|resource| ...} Any unused resource.
@parameter retain [Integer] the minimum number of resources to retain.
Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
def prune(retain = 0)
	unused = []
	
	# This code must not context switch:
	@resources.each do |resource, usage|
		if usage.zero?
			unused << resource
		end
	end
	
	# It's okay for this to context switch:
	unused.each do |resource|
		if block_given?
			yield resource
		else
			retire(resource)
		end
		
		break if @resources.size <= retain
	end
	
	# Update availability list:
	@available.clear
	@resources.each do |resource, usage|
		if usage < resource.concurrency and resource.reusable?
			@available << resource
		end
	end
	
	return unused.size
end

def release(resource)

Make the resource resources and let waiting tasks know that there is something resources.
def release(resource)
	processed = false
	
	# A resource that is not good should also not be reusable.
	if resource.reusable?
		processed = reuse(resource)
	end
	
	# @policy.released(self, resource)
ensure
	retire(resource) unless processed
end

def retire(resource)

Retire a specific resource.
def retire(resource)
	Console.debug(self) {"Retire #{resource}"}
	
	@resources.delete(resource)
	
	resource.close
	
	@notification.signal
	
	return true
end

def reuse(resource)

def reuse(resource)
	Console.debug(self) {"Reuse #{resource}"}
	
	usage = @resources[resource]
	
	if usage.nil? || usage.zero?
		raise "Trying to reuse unacquired resource: #{resource}!"
	end
	
	# If the resource was fully utilized, it now becomes available:
	if usage == resource.concurrency
		@available.push(resource)
	end
	
	@resources[resource] = usage - 1
	
	@notification.signal
	
	return true
end

def size

The number of resources in the pool.
def size
	@resources.size
end

def start_gardener

def start_gardener
	return if @gardener
	
	@gardener = true
	
	Async(transient: true, annotation: "#{self.class} Gardener") do |task|
		@gardener = task
		
		while true
			if @policy
				@policy.call(self)
			else
				Task.yield
			end
			
			self.wait
		end
	ensure
		@gardener = nil
		self.close
	end
end

def to_json(...)

Generate a JSON representation of the pool.
def to_json(...)
	as_json.to_json(...)
end

def to_s

Generate a human-readable representation of the pool.
def to_s
	if @resources.empty?
		"\#<#{self.class}(#{usage_string})>"
	else
		"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
	end
end

def usage_string

def usage_string
	"#{@resources.size}/#{@limit || '∞'}"
end

def wait

Wait until a pool resource has been freed.
def wait
	@notification.wait
end

def wait_for_resource

def wait_for_resource
	# If we fail to create a resource (below), we will end up waiting for one to become resources.
	until resource = available_resource
		@notification.wait
	end
	
	# Be careful not to context switch or fail here.
	
	return resource
end