lib/async/queue.rb



# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2022, by Samuel Williams.
# Copyright, 2019, by Ryan Musgrave.
# Copyright, 2020-2022, by Bruno Sutic.

require_relative 'notification'

module Async
	# A queue which allows items to be processed in order.
	#
	# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
	#
	# @public Since `stable-v1`.
	class Queue
		# Create a new queue.
		#
		# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
		# @parameter available [Notification] The notification to use for signaling when items are available.
		def initialize(parent: nil, available: Notification.new)
			@items = []
			@parent = parent
			@available = available
		end
		
		# @attribute [Array] The items in the queue.
		attr :items
		
		# @returns [Integer] The number of items in the queue.
		def size
			@items.size
		end
		
		# @returns [Boolean] Whether the queue is empty.
		def empty?
			@items.empty?
		end
		
		# Add an item to the queue.
		def <<(item)
			@items << item
			
			@available.signal unless self.empty?
		end
		
		# Add multiple items to the queue.
		def enqueue(*items)
			@items.concat(items)
			
			@available.signal unless self.empty?
		end
		
		# Remove and return the next item from the queue.
		def dequeue
			while @items.empty?
				@available.wait
			end
			
			@items.shift
		end
		
		# Process each item in the queue.
		#
		# @asynchronous Executes the given block concurrently for each item.
		#
		# @parameter arguments [Array] The arguments to pass to the block.
		# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
		# @parameter options [Hash] The options to pass to the task.
		# @yields {|task| ...} When the system is idle, the block will be executed in a new task.
		def async(parent: (@parent or Task.current), **options, &block)
			while item = self.dequeue
				parent.async(item, **options, &block)
			end
		end
		
		# Enumerate each item in the queue.
		def each
			while item = self.dequeue
				yield item
			end
		end
		
		# Signal the queue with a value, the same as {#enqueue}.
		def signal(value)
			self.enqueue(value)
		end
		
		# Wait for an item to be available, the same as {#dequeue}.
		def wait
			self.dequeue
		end
	end
	
	# A queue which limits the number of items that can be enqueued.
	# @public Since `stable-v1`.
	class LimitedQueue < Queue
		# Create a new limited queue.
		#
		# @parameter limit [Integer] The maximum number of items that can be enqueued.
		# @parameter full [Notification] The notification to use for signaling when the queue is full.
		def initialize(limit = 1, full: Notification.new, **options)
			super(**options)
			
			@limit = limit
			@full = full
		end
		
		# @attribute [Integer] The maximum number of items that can be enqueued.
		attr :limit
		
		# @returns [Boolean] Whether trying to enqueue an item would block.
		def limited?
			@items.size >= @limit
		end
		
		# Add an item to the queue.
		#
		# If the queue is full, this method will block until there is space available.
		#
		# @parameter item [Object] The item to add to the queue.
		def <<(item)
			while limited?
				@full.wait
			end
			
			super
		end
		
		# Add multiple items to the queue.
		#
		# If the queue is full, this method will block until there is space available. 
		#
		# @parameter items [Array] The items to add to the queue.
		def enqueue(*items)
			while !items.empty?
				while limited?
					@full.wait
				end
				
				available = @limit - @items.size
				@items.concat(items.shift(available))
				
				@available.signal unless self.empty?
			end
		end
		
		# Remove and return the next item from the queue.
		#
		# If the queue is empty, this method will block until an item is available.
		#
		# @returns [Object] The next item in the queue.
		def dequeue
			item = super
			
			@full.signal
			
			return item
		end
	end
end