class Fluent::Test::InputTestDriver
def run(num_waits = 10, &block)
def run(num_waits = 10, &block) m = method(:emit_stream) unless Engine.singleton_class.ancestors.include?(EmitStreamWrapper) Engine.singleton_class.prepend EmitStreamWrapper end Engine.emit_stream_callee = m unless instance.router.singleton_class.ancestors.include?(EmitStreamWrapper) instance.router.singleton_class.prepend EmitStreamWrapper end instance.router.emit_stream_callee = m super(num_waits) { block.call if block if @expected_emits_length || @expects || @run_post_conditions # counters for emits and emit_streams i, j = 0, 0 # Events of expected length will be emitted at the end. max_length = @expected_emits_length max_length ||= @expects.length if @expects if max_length register_run_post_condition do i == max_length end end # Set running timeout to avoid infinite loop caused by some errors. started_at = Time.now register_run_breaking_condition do Time.now >= started_at + @run_timeout end until run_should_stop? if j >= @emit_streams.length sleep 0.01 next end tag, events = @emit_streams[j] events.each do |time, record| if @expects assert_equal(@expects[i], [tag, time, record]) assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime) end i += 1 end j += 1 end assert_equal(@expects.length, i) if @expects end } self end