class Fluent::Test::InputTestDriver

def run(num_waits = 10, &block)

def run(num_waits = 10, &block)
  m = method(:emit_stream)
  unless Engine.singleton_class.ancestors.include?(EmitStreamWrapper)
    Engine.singleton_class.prepend EmitStreamWrapper
  end
  Engine.emit_stream_callee = m
  unless instance.router.singleton_class.ancestors.include?(EmitStreamWrapper)
    instance.router.singleton_class.prepend EmitStreamWrapper
  end
  instance.router.emit_stream_callee = m
  super(num_waits) {
    block.call if block
    if @expected_emits_length || @expects || @run_post_conditions
      # counters for emits and emit_streams
      i, j = 0, 0
      # Events of expected length will be emitted at the end.
      max_length = @expected_emits_length
      max_length ||= @expects.length if @expects
      if max_length
        register_run_post_condition do
          i == max_length
        end
      end
      # Set running timeout to avoid infinite loop caused by some errors.
      started_at = Time.now
      register_run_breaking_condition do
        Time.now >= started_at + @run_timeout
      end
      until run_should_stop?
        if j >= @emit_streams.length
          sleep 0.01
          next
        end
        tag, events = @emit_streams[j]
        events.each do |time, record|
          if @expects
            assert_equal(@expects[i], [tag, time, record])
            assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime)
          end
          i += 1
        end
        j += 1
      end
      assert_equal(@expects.length, i) if @expects
    end
  }
  self
end