lib/fluent/test/driver/output.rb



#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/test/driver/base_owner'
require 'fluent/test/driver/event_feeder'

require 'fluent/plugin/output'
require 'timeout'

module Fluent
  module Test
    module Driver
      class Output < BaseOwner
        include EventFeeder

        def initialize(klass, opts: {}, &block)
          super
          raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Output" unless @instance.is_a? Fluent::Plugin::Output
          @flush_buffer_at_cleanup = nil
          @wait_flush_completion = nil
          @force_flush_retry = nil
          @format_hook = nil
          @format_results = []
        end

        def run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block)
          @flush_buffer_at_cleanup = flush
          @wait_flush_completion = wait_flush_completion
          @force_flush_retry = force_flush_retry
          super(**kwargs, &block)
        end

        def run_actual(**kwargs, &block)
          if @force_flush_retry
            @instance.retry_for_error_chunk = true
          end
          val = super(**kwargs, &block)
          if @flush_buffer_at_cleanup
            self.flush
          end
          val
        ensure
          @instance.retry_for_error_chunk = false
        end

        def formatted
          @format_results
        end

        def flush
          @instance.force_flush
          wait_flush_completion if @wait_flush_completion
        end

        def wait_flush_completion
          buffer_queue = ->(){ @instance.buffer && @instance.buffer.queue.size > 0 }
          dequeued_chunks = ->(){
            @instance.dequeued_chunks_mutex &&
            @instance.dequeued_chunks &&
            @instance.dequeued_chunks_mutex.synchronize{ @instance.dequeued_chunks.size > 0 }
          }

          Timeout.timeout(10) do
            while buffer_queue.call || dequeued_chunks.call
              sleep 0.1
            end
          end
        end

        def instance_hook_after_started
          super

          # it's decided after #start whether output plugin instances use @custom_format or not.
          if @instance.instance_eval{ @custom_format }
            @format_hook = format_hook = ->(result){ @format_results << result }
            m = Module.new do
              define_method(:format) do |tag, time, record|
                result = super(tag, time, record)
                format_hook.call(result)
                result
              end
            end
            @instance.singleton_class.prepend m
          end
        end
      end
    end
  end
end