class Fluent::EventRouter
Collector is either of Output, Filter or other EventRouter.
3) forward the event to the corresponding Collector
2) match the event’s tag with the MatchPatterns
1) receive an event at ‘#emit` methods
EventRouter does:---------------- -----------------
| archive.** ———> type s3 |
| logs.** ———> type copy |
| access.** ———> type forward |---------------- -----------------
| MatchPattern | | Collector |---------------- -----------------
It has a list of MatchPattern and Collector pairs:
EventRouter is responsible to route events to a collector.
def add_rule(pattern, collector)
def add_rule(pattern, collector) @match_rules << Rule.new(pattern, collector) end
def emit(tag, time, record)
def emit(tag, time, record) unless record.nil? emit_stream(tag, OneEventStream.new(time, record)) end end
def emit_array(tag, array)
def emit_array(tag, array) emit_stream(tag, ArrayEventStream.new(array)) end
def emit_error_event(tag, time, record, error)
def emit_error_event(tag, time, record, error) @emit_error_handler.emit_error_event(tag, time, record, error) end
def emit_stream(tag, es)
def emit_stream(tag, es) match(tag).emit_events(tag, es) rescue => e @emit_error_handler.handle_emits_error(tag, es, e) end
def find(tag)
def find(tag) pipeline = nil @match_rules.each_with_index { |rule, i| if rule.match?(tag) if rule.collector.is_a?(Plugin::Filter) pipeline ||= Pipeline.new pipeline.add_filter(rule.collector) else if pipeline pipeline.set_output(rule.collector) else # Use Output directly when filter is not matched pipeline = rule.collector end return pipeline end end } if pipeline # filter is matched but no match pipeline.set_output(@default_collector) pipeline else nil end end
def initialize(default_collector, emit_error_handler)
def initialize(default_collector, emit_error_handler) @match_rules = [] @match_cache = MatchCache.new @default_collector = default_collector @emit_error_handler = emit_error_handler end
def match(tag)
def match(tag) collector = @match_cache.get(tag) { find(tag) || @default_collector } collector end
def match?(tag)
def match?(tag) !!find(tag) end
def suppress_missing_match!
def suppress_missing_match! if @default_collector.respond_to?(:suppress_missing_match!) @default_collector.suppress_missing_match! end end