class Fluent::Test::FilterTestDriver

def emit(record, time = Engine.now)

def emit(record, time = Engine.now)
  emit_with_tag(@tag, record, time)
end

def emit_with_tag(tag, record, time = Engine.now)

def emit_with_tag(tag, record, time = Engine.now)
  @events[tag] ||= MultiEventStream.new
  @events[tag].add(time, record)
end

def filter_stream(es)

def filter_stream(es)
  filter_stream_with_tag(@tag, es)
end

def filter_stream_with_tag(tag, es)

def filter_stream_with_tag(tag, es)
  @events[tag] = es
end

def filtered_as_array

def filtered_as_array
  all = []
  @filtered.each { |time, record|
    all << [@tag, time, record]
  }
  all
end

def initialize(klass, tag = 'filter.test', &block)

def initialize(klass, tag = 'filter.test', &block)
  super(klass, &block)
  @tag = tag
  @events = {}
  @filtered = MultiEventStream.new
end

def run(num_waits = 0, &block)

Almost filters don't use threads so default is 0. It reduces test time.
def run(num_waits = 0, &block)
  super(num_waits) {
    block.call if block
    @events.each { |tag, es|
      processed = @instance.filter_stream(tag, es)
      processed.each { |time, record|
        @filtered.add(time, record)
      }
    }
  }
  self
end