class Fluent::EventRouter::Pipeline::FilterOptimizer

def filter_stream(tag, es)

def filter_stream(tag, es)
  if optimizable?
    optimized_filter_stream(tag, es)
  else
    @filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }
  end
end

def filters=(filters)

def filters=(filters)
  @filters = filters
  reset_optimization
end

def filters_having_filter_stream

def filters_having_filter_stream
  @filters_having_filter_stream ||= @filters.select do |filter|
    filter.class.instance_methods(false).include?(:filter_stream)
  end
end

def initialize(filters = [])

def initialize(filters = [])
  @filters = filters
  @optimizable = nil
end

def optimizable?

def optimizable?
  return @optimizable unless @optimizable.nil?
  fs_filters = filters_having_filter_stream
  @optimizable = if fs_filters.empty?
                   true
                 else
                   # skip log message when filter is only 1, because its performance is same as non optimized chain.
                   if @filters.size > 1 && fs_filters.size >= 1
                     $log.info "disable filter chain optimization because #{fs_filters.map(&:class)} uses `#filter_stream` method."
                   end
                   false
                 end
end

def optimized_filter_stream(tag, es)

def optimized_filter_stream(tag, es)
  new_es = MultiEventStream.new
  es.each do |time, record|
    filtered_record = record
    filtered_time = time
    catch :break_loop do
      @filters.each do |filter|
        if filter.has_filter_with_time
          begin
            filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record)
            throw :break_loop unless filtered_record && filtered_time
          rescue => e
            filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
          end
        else
          begin
            filtered_record = filter.filter(tag, filtered_time, filtered_record)
            throw :break_loop unless filtered_record
          rescue => e
            filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
          end
        end
      end
      new_es.add(filtered_time, filtered_record)
    end
  end
  new_es
end

def reset_optimization

def reset_optimization
  @optimizable = nil
  @filters_having_filter_stream = nil
end