## Fluentd## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#require'fluent/match'require'fluent/event'require'fluent/filter'moduleFluent## EventRouter is responsible to route events to a collector.## It has a list of MatchPattern and Collector pairs:## +----------------+ +-----------------+# | MatchPattern | | Collector |# +----------------+ +-----------------+# | access.** ---------> type forward |# | logs.** ---------> type copy |# | archive.** ---------> type s3 |# +----------------+ +-----------------+## EventRouter does:## 1) receive an event at `#emit` methods# 2) match the event's tag with the MatchPatterns# 3) forward the event to the corresponding Collector## Collector is either of Output, Filter or other EventRouter.#classEventRouterdefinitialize(default_collector,emit_error_handler)@match_rules=[]@match_cache=MatchCache.new@default_collector=default_collector@emit_error_handler=emit_error_handlerendattr_accessor:default_collectorattr_accessor:emit_error_handlerclassRuledefinitialize(pattern,collector)patterns=pattern.split(/\s+/).map{|str|MatchPattern.create(str)}@pattern=ifpatterns.length==1patterns[0]elseOrMatchPattern.new(patterns)end@pattern_str=pattern@collector=collectorenddefmatch?(tag)@pattern.match(tag)endattr_reader:collectorattr_reader:pattern_strenddefsuppress_missing_match!if@default_collector.respond_to?(:suppress_missing_match!)@default_collector.suppress_missing_match!endend# called by Agent to add new match pattern and collectordefadd_rule(pattern,collector)@match_rules<<Rule.new(pattern,collector)enddefemit(tag,time,record)unlessrecord.nil?emit_stream(tag,OneEventStream.new(time,record))endenddefemit_array(tag,array)emit_stream(tag,ArrayEventStream.new(array))enddefemit_stream(tag,es)match(tag).emit_events(tag,es)rescue=>e@emit_error_handler.handle_emits_error(tag,es,e)enddefemit_error_event(tag,time,record,error)@emit_error_handler.emit_error_event(tag,time,record,error)enddefmatch?(tag)!!find(tag)enddefmatch(tag)collector=@match_cache.get(tag){find(tag)||@default_collector}collectorendclassMatchCacheMATCH_CACHE_SIZE=1024definitializesuper@map={}@keys=[]enddefget(key)ifcollector=@map[key]returncollectorendcollector=@map[key]=yieldif@keys.size>=MATCH_CACHE_SIZE# expire the oldest key@map.delete@keys.shiftend@keys<<keycollectorendendprivateclassPipelinedefinitialize@filters=[]@output=nil@optimizer=FilterOptimizer.newenddefadd_filter(filter)@filters<<filter@optimizer.filters=@filtersenddefset_output(output)@output=outputenddefemit_events(tag,es)processed=@optimizer.filter_stream(tag,es)@output.emit_events(tag,processed)endclassFilterOptimizerdefinitialize(filters=[])@filters=filters@optimizable=nilenddeffilters=(filters)@filters=filtersreset_optimizationenddeffilter_stream(tag,es)ifoptimizable?optimized_filter_stream(tag,es)else@filters.reduce(es){|acc,filter|filter.filter_stream(tag,acc)}endendprivatedefoptimized_filter_stream(tag,es)new_es=MultiEventStream.newes.eachdo|time,record|filtered_record=recordfiltered_time=timecatch:break_loopdo@filters.eachdo|filter|iffilter.has_filter_with_timebeginfiltered_time,filtered_record=filter.filter_with_time(tag,filtered_time,filtered_record)throw:break_loopunlessfiltered_record&&filtered_timerescue=>efilter.router.emit_error_event(tag,filtered_time,filtered_record,e)endelsebeginfiltered_record=filter.filter(tag,filtered_time,filtered_record)throw:break_loopunlessfiltered_recordrescue=>efilter.router.emit_error_event(tag,filtered_time,filtered_record,e)endendendnew_es.add(filtered_time,filtered_record)endendnew_esenddefoptimizable?return@optimizableunless@optimizable.nil?fs_filters=filters_having_filter_stream@optimizable=iffs_filters.empty?trueelse# skip log message when filter is only 1, because its performance is same as non optimized chain.if@filters.size>1&&fs_filters.size>=1$log.info"disable filter chain optimization because #{fs_filters.map(&:class)} uses `#filter_stream` method."endfalseendenddeffilters_having_filter_stream@filters_having_filter_stream||=@filters.selectdo|filter|filter.class.instance_methods(false).include?(:filter_stream)endenddefreset_optimization@optimizable=nil@filters_having_filter_stream=nilendendenddeffind(tag)pipeline=nil@match_rules.each_with_index{|rule,i|ifrule.match?(tag)ifrule.collector.is_a?(Plugin::Filter)pipeline||=Pipeline.newpipeline.add_filter(rule.collector)elseifpipelinepipeline.set_output(rule.collector)else# Use Output directly when filter is not matchedpipeline=rule.collectorendreturnpipelineendend}ifpipeline# filter is matched but no matchpipeline.set_output(@default_collector)pipelineelsenilendendendend