class Fluent::Test::FilterTestDriver
def emit(record, time = Engine.now)
def emit(record, time = Engine.now) emit_with_tag(@tag, record, time) end
def emit_with_tag(tag, record, time = Engine.now)
def emit_with_tag(tag, record, time = Engine.now) @events[tag] ||= MultiEventStream.new @events[tag].add(time, record) end
def filter_stream(es)
def filter_stream(es) filter_stream_with_tag(@tag, es) end
def filter_stream_with_tag(tag, es)
def filter_stream_with_tag(tag, es) @events[tag] = es end
def filtered_as_array
def filtered_as_array all = [] @filtered.each { |time, record| all << [@tag, time, record] } all end
def initialize(klass, tag = 'filter.test', &block)
def initialize(klass, tag = 'filter.test', &block) super(klass, &block) @tag = tag @events = {} @filtered = MultiEventStream.new end
def run(num_waits = 0, &block)
def run(num_waits = 0, &block) super(num_waits) { block.call if block @events.each { |tag, es| processed = @instance.filter_stream(tag, es) processed.each { |time, record| @filtered.add(time, record) } } } self end