lib/fluent/test/driver/base_owner.rb



#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/test/driver/base'
require 'fluent/test/driver/test_event_router'

module Fluent
  module Test
    module Driver
      class BaseOwner < Base
        def initialize(klass, opts: {}, &block)
          super

          if opts
            @instance.system_config_override(opts)
          end
          @instance.log = TestLogger.new
          @logs = @instance.log.out.logs

          @event_streams = nil
          @error_events = nil
        end

        def configure(conf, syntax: :v1)
          if conf.is_a?(Fluent::Config::Element)
            @config = conf
          else
            @config = Config.parse(conf, "(test)", "(test_dir)", syntax: syntax)
          end

          if @instance.respond_to?(:router=)
            @event_streams = []
            @error_events = []

            driver = self
            mojule = Module.new do
              define_method(:event_emitter_router) do |label_name|
                TestEventRouter.new(driver)
              end
            end
            @instance.singleton_class.prepend mojule
          end

          @instance.configure(@config)
          self
        end

        Emit = Struct.new(:tag, :es)
        ErrorEvent = Struct.new(:tag, :time, :record, :error)

        # via TestEventRouter
        def emit_event_stream(tag, es)
          @event_streams << Emit.new(tag, es)
        end

        def emit_error_event(tag, time, record, error)
          @error_events << ErrorEvent.new(tag, time, record, error)
        end

        def events(tag: nil)
          if block_given?
            event_streams(tag: tag) do |t, es|
              es.each do |time, record|
                yield t, time, record
              end
            end
          else
            list = []
            event_streams(tag: tag) do |t, es|
              es.each do |time, record|
                list << [t, time, record]
              end
            end
            list
          end
        end

        def event_streams(tag: nil)
          return [] if @event_streams.nil?
          selected = @event_streams.select{|e| tag.nil? ? true : e.tag == tag }
          if block_given?
            selected.each do |e|
              yield e.tag, e.es
            end
          else
            selected.map{|e| [e.tag, e.es] }
          end
        end

        def emit_count
          @event_streams.size
        end

        def record_count
          @event_streams.reduce(0) {|a, e| a + e.es.size }
        end

        def error_events(tag: nil)
          selected = @error_events.select{|e| tag.nil? ? true : e.tag == tag }
          if block_given?
            selected.each do |e|
              yield e.tag, e.time, e.record, e.error
            end
          else
            selected.map{|e| [e.tag, e.time, e.record, e.error] }
          end
        end

        def run(expect_emits: nil, expect_records: nil, timeout: nil, start: true, shutdown: true, &block)
          if expect_emits
            @run_post_conditions << ->(){ emit_count >= expect_emits }
          end
          if expect_records
            @run_post_conditions << ->(){ record_count >= expect_records }
          end

          super(timeout: timeout, start: start, shutdown: shutdown, &block)
        end
      end
    end
  end
end