class Fluent::EventRouter::Pipeline::FilterOptimizer
def filter_stream(tag, es)
def filter_stream(tag, es) if optimizable? optimized_filter_stream(tag, es) else @filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) } end end
def filters=(filters)
def filters=(filters) @filters = filters reset_optimization end
def filters_having_filter_stream
def filters_having_filter_stream @filters_having_filter_stream ||= @filters.select do |filter| filter.class.instance_methods(false).include?(:filter_stream) end end
def initialize(filters = [])
def initialize(filters = []) @filters = filters @optimizable = nil end
def optimizable?
def optimizable? return @optimizable unless @optimizable.nil? fs_filters = filters_having_filter_stream @optimizable = if fs_filters.empty? true else # skip log message when filter is only 1, because its performance is same as non optimized chain. if @filters.size > 1 && fs_filters.size >= 1 $log.info "disable filter chain optimization because #{fs_filters.map(&:class)} uses `#filter_stream` method." end false end end
def optimized_filter_stream(tag, es)
def optimized_filter_stream(tag, es) new_es = MultiEventStream.new es.each do |time, record| filtered_record = record filtered_time = time catch :break_loop do @filters.each do |filter| if filter.has_filter_with_time begin filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record) throw :break_loop unless filtered_record && filtered_time rescue => e filter.router.emit_error_event(tag, filtered_time, filtered_record, e) end else begin filtered_record = filter.filter(tag, filtered_time, filtered_record) throw :break_loop unless filtered_record rescue => e filter.router.emit_error_event(tag, filtered_time, filtered_record, e) end end end new_es.add(filtered_time, filtered_record) end end new_es end
def reset_optimization
def reset_optimization @optimizable = nil @filters_having_filter_stream = nil end