class EventMachine::Iterator


end
async_http_get(url){ iter.next }
EM::Iterator.new(urls, 10).each do |url,iter|
to fetch 10 urls at a time, simply pass in a concurrency of 10:
Now, you can take advantage of the asynchronous api to issue requests in parallel. For example,
})
puts ‘all done!’

}, proc{ |responses|
}
iter.return(res)
async_http_get(url){ |res|
EM::Iterator.new(urls).map(proc{ |url,iter|
could be written as:
puts ‘all done!’

responses = urls.map{ |url| sync_http_get(url) }
a synchronous iterator such as:
async_http_get(url){ |response| … }
response = sync_http_get(url); …
functions. For example, given a sync and async http api:
This is especially useful when doing asynchronous work via reactor libraries and
EM::Iterator.new(0..10).each{ |num,iter| iter.next }
becomes:
(0..10).each{ |num| }
instead of happening automatically after the yielded block finishes executing. For example:
Unlike ruby’s built-in iterators, the end of the current iteration cycle is signaled manually,
A simple iterator for concurrent asynchronous work.

def concurrency=(val)


to accomodate the new concurrency level.
Change the concurrency of this iterator. Workers will automatically be spawned or destroyed
def concurrency=(val)
  old = @concurrency
  @concurrency = val
  spawn_workers if val > old and @started and !@ended
end

def each(foreach=nil, after=nil, &blk)


)
proc{ puts 'all done' }
proc{ |num,iter| iter.next },
EM::Iterator.new(1..100).each(

An optional second proc is invoked after the iteration is complete.

end
iter.next
puts num
EM::Iterator.new(1..100).each do |num, iter|

Iterate over a set of items using the specified block or proc.
def each(foreach=nil, after=nil, &blk)
  raise ArgumentError, 'proc or block required for iteration' unless foreach ||= blk
  raise RuntimeError, 'cannot iterate over an iterator more than once' if @started or @ended
  @started = true
  @pending = 0
  @workers = 0
  all_done = proc{
    after.call if after and @ended and @pending == 0
  }
  @process_next = proc{
    # p [:process_next, :pending=, @pending, :workers=, @workers, :ended=, @ended, :concurrency=, @concurrency, :list=, @list]
    unless @ended or @workers > @concurrency
      item = next_item()
      if item.equal?(Stop)
        @ended = true
        @workers -= 1
        all_done.call
      else
        @pending += 1
        is_done = false
        on_done = proc{
          raise RuntimeError, 'already completed this iteration' if is_done
          is_done = true
          @pending -= 1
          if @ended
            all_done.call
          else
            EM.next_tick(@process_next)
          end
        }
        class << on_done
          alias :next :call
        end
        foreach.call(item, on_done)
      end
    else
      @workers -= 1
    end
  }
  spawn_workers
  self
end

def initialize(list, concurrency = 1)


EventMachine::Iterator::Stop to signal the end of the iterations.
to be processed each time it is called. If a proc is used, it must return
The list may either be an array-like object, or a proc that returns a new object

is started via #each, #map or #inject
will create an iterator over the range that processes 10 items at a time. Iteration

i = EM::Iterator.new(1..100, 10)

Create a new parallel async iterator with specified concurrency.
def initialize(list, concurrency = 1)
  raise ArgumentError, 'concurrency must be bigger than zero' unless (concurrency > 0)
  if list.respond_to?(:call)
    @list = nil
    @list_proc = list
  elsif list.respond_to?(:to_a)
    @list = list.to_a.dup
    @list_proc = nil
  else
    raise ArgumentError, 'argument must be a proc or an array'
  end
  @concurrency = concurrency
  @started = false
  @ended = false
end

def inject(obj, foreach, after)


})
p results
}, proc{ |results|
}
iter.return(hash)
hash[cmd] = status.exitstatus == 0 ? output.strip : nil
EM.system(cmd){ |output,status|
EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter|

Inject the results of an asynchronous iteration onto a given object.
def inject(obj, foreach, after)
  each(proc{ |item,iter|
    is_done = false
    on_done = proc{ |res|
      raise RuntimeError, 'already returned a value for this iteration' if is_done
      is_done = true
      obj = res
      iter.next
    }
    class << on_done
      alias :return :call
      def next
        raise NoMethodError, 'must call #return on an inject iterator'
      end
    end
    foreach.call(obj, item, on_done)
  }, proc{
    after.call(obj)
  })
end

def map(foreach, after)


})
p results
}, proc{ |results|
}
iter.return(output)
EM.system(cmd){ |output,status|
EM::Iterator.new(%w[ pwd uptime uname date ], 2).map(proc{ |cmd,iter|

Collect the results of an asynchronous iteration into an array.
def map(foreach, after)
  index = 0
  inject([], proc{ |results,item,iter|
    i = index
    index += 1
    is_done = false
    on_done = proc{ |res|
      raise RuntimeError, 'already returned a value for this iteration' if is_done
      is_done = true
      results[i] = res
      iter.return(results)
    }
    class << on_done
      alias :return :call
      def next
        raise NoMethodError, 'must call #return on a map iterator'
      end
    end
    foreach.call(item, on_done)
  }, proc{ |results|
    after.call(results)
  })
end

def next

def next
  raise NoMethodError, 'must call #return on a map iterator'
end

def next

def next
  raise NoMethodError, 'must call #return on an inject iterator'
end

def next_item

Once items have run out, will return EM::Iterator::Stop. Procs must supply this themselves
Return the next item from @list or @list_proc.
def next_item
  if @list_proc
    @list_proc.call
  else
    @list.empty? ? Stop : @list.shift
  end
end

def spawn_workers


Spawn workers to consume items from the iterator's enumerator based on the current concurrency level.
def spawn_workers
  EM.next_tick(start_worker = proc{
    if @workers < @concurrency and !@ended
      # p [:spawning_worker, :workers=, @workers, :concurrency=, @concurrency, :ended=, @ended]
      @workers += 1
      @process_next.call
      EM.next_tick(start_worker)
    end
  })
  nil
end