lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb
module Concurrent module Collection # @!macro priority_queue # # @!visibility private # @!macro internal_implementation_note class RubyNonConcurrentPriorityQueue # @!macro priority_queue_method_initialize def initialize(opts = {}) order = opts.fetch(:order, :max) @comparator = [:min, :low].include?(order) ? -1 : 1 clear end # @!macro priority_queue_method_clear def clear @queue = [nil] @length = 0 true end # @!macro priority_queue_method_delete def delete(item) return false if empty? original_length = @length k = 1 while k <= @length if @queue[k] == item swap(k, @length) @length -= 1 sink(k) || swim(k) @queue.pop else k += 1 end end @length != original_length end # @!macro priority_queue_method_empty def empty? size == 0 end # @!macro priority_queue_method_include def include?(item) @queue.include?(item) end alias_method :has_priority?, :include? # @!macro priority_queue_method_length def length @length end alias_method :size, :length # @!macro priority_queue_method_peek def peek empty? ? nil : @queue[1] end # @!macro priority_queue_method_pop def pop return nil if empty? max = @queue[1] swap(1, @length) @length -= 1 sink(1) @queue.pop max end alias_method :deq, :pop alias_method :shift, :pop # @!macro priority_queue_method_push def push(item) raise ArgumentError.new('cannot enqueue nil') if item.nil? @length += 1 @queue << item swim(@length) true end alias_method :<<, :push alias_method :enq, :push # @!macro priority_queue_method_from_list def self.from_list(list, opts = {}) queue = new(opts) list.each{|item| queue << item } queue end private # Exchange the values at the given indexes within the internal array. # # @param [Integer] x the first index to swap # @param [Integer] y the second index to swap # # @!visibility private def swap(x, y) temp = @queue[x] @queue[x] = @queue[y] @queue[y] = temp end # Are the items at the given indexes ordered based on the priority # order specified at construction? # # @param [Integer] x the first index from which to retrieve a comparable value # @param [Integer] y the second index from which to retrieve a comparable value # # @return [Boolean] true if the two elements are in the correct priority order # else false # # @!visibility private def ordered?(x, y) (@queue[x] <=> @queue[y]) == @comparator end # Percolate down to maintain heap invariant. # # @param [Integer] k the index at which to start the percolation # # @!visibility private def sink(k) success = false while (j = (2 * k)) <= @length do j += 1 if j < @length && ! ordered?(j, j+1) break if ordered?(k, j) swap(k, j) success = true k = j end success end # Percolate up to maintain heap invariant. # # @param [Integer] k the index at which to start the percolation # # @!visibility private def swim(k) success = false while k > 1 && ! ordered?(k/2, k) do swap(k, k/2) k = k/2 success = true end success end end end end