class Async::Queue
@public Since *Async v1*.
@asynchronous This class is thread-safe.
It has a compatible interface with {Notification} and {Condition}, except that it’s multi-value.
maintaining compatibility with the fiber scheduler.
This implementation uses Thread::Queue internally for thread safety while
A thread-safe queue which allows items to be processed in order.
def <<(item)
def <<(item) self.push(item) end
def async(parent: (@parent or Task.current), **options, &block)
@parameter options [Hash] The options to pass to the task.
@parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
@parameter arguments [Array] The arguments to pass to the block.
@asynchronous Executes the given block concurrently for each item.
Process each item in the queue.
def async(parent: (@parent or Task.current), **options, &block) while item = self.dequeue parent.async(item, **options, &block) end end
def close
def close @delegate.close end
def closed?
def closed? @delegate.closed? end
def dequeue(timeout: nil)
@parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
Remove and return the next item from the queue.
def dequeue(timeout: nil) @delegate.pop(timeout: timeout) end
def each
def each while item = self.dequeue yield item end end
def empty?
def empty? @delegate.empty? end
def enqueue(*items)
def enqueue(*items) items.each {|item| @delegate.push(item)} rescue ClosedQueueError raise ClosedError, "Cannot enqueue items to a closed queue!" end
def initialize(parent: nil, delegate: Thread::Queue.new)
Create a new thread-safe queue.
def initialize(parent: nil, delegate: Thread::Queue.new) @delegate = delegate @parent = parent end
def pop(timeout: nil)
@parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
Compatibility with {::Queue#pop}.
def pop(timeout: nil) @delegate.pop(timeout: timeout) end
def push(item)
def push(item) @delegate.push(item) rescue ClosedQueueError raise ClosedError, "Cannot enqueue items to a closed queue!" end
def signal(value = nil)
def signal(value = nil) self.enqueue(value) end
def size
def size @delegate.size end
def wait
def wait self.dequeue end
def waiting_count
def waiting_count @delegate.num_waiting end