class EventMachine::Queue


end
q.pop { |msg| puts(msg) }
3.times do
q.push(‘one’, ‘two’, ‘three’)
q = EM::Queue.new
@example
* Pushing processing onto the reactor thread
* API sugar for stateful protocols
scheduler. It services two primary purposes:
This class provides a simple queue abstraction on top of the reactor
A cross thread, reactor scheduled, linear queue.

def empty?

Other tags:
    Note: - This is a peek, it's not thread safe, and may only tend toward accuracy.

Returns:
  • (Boolean) -
def empty?
  @drain.empty? && @sink.empty?
end

def initialize

def initialize
  @sink  = []
  @drain = []
  @popq  = []
end

def num_waiting

Other tags:
    Note: - This is a peek at the number of jobs that are currently waiting on the Queue

Returns:
  • (Integer) - Waiting size
def num_waiting
  @popq.size
end

def pop(*a, &b)

Returns:
  • (NilClass) - nil
def pop(*a, &b)
  cb = EM::Callback(*a, &b)
  EM.schedule do
    if @drain.empty?
      @drain = @sink
      @sink = []
    end
    if @drain.empty?
      @popq << cb
    else
      cb.call @drain.shift
    end
  end
  nil # Always returns nil
end

def push(*items)

next reactor tick.
in the queue immediately, but will be scheduled for addition during the
Push items onto the queue in the reactor thread. The items will not appear
def push(*items)
  EM.schedule do
    @sink.push(*items)
    unless @popq.empty?
      @drain = @sink
      @sink = []
      @popq.shift.call @drain.shift until @drain.empty? || @popq.empty?
    end
  end
end

def size

Other tags:
    Note: - This is a peek, it's not thread safe, and may only tend toward accuracy.

Returns:
  • (Integer) - Queue size
def size
  @drain.size + @sink.size
end