class Fluent::Agent
Next step: ‘fluentd/label.rb`
Next step: `fluentd/root_agent.rb`
Agent is a resource unit who manages emittable plugins
def add_filter(type, pattern, conf)
def add_filter(type, pattern, conf) log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type filter = Plugin.new_filter(type) filter.router = @event_router filter.configure(conf) @filters << filter @event_router.add_rule(pattern, filter) filter end
def add_match(type, pattern, conf)
def add_match(type, pattern, conf) log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) output.router = @event_router output.configure(conf) @outputs << output @event_router.add_rule(pattern, output) output end
def configure(conf)
def configure(conf) super # initialize <match> and <filter> elements conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e| pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] || e['type'] raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type if e.name == 'filter' add_filter(type, pattern, e) else add_match(type, pattern, e) end } end
def emit_error_event(tag, time, record, error)
def emit_error_event(tag, time, record, error) end
def flush!
def flush! flush_recursive(@outputs) end
def flush_recursive(array)
def flush_recursive(array) array.each { |o| begin if o.is_a?(BufferedOutput) o.force_flush elsif o.is_a?(MultiOutput) flush_recursive(o.outputs) end rescue => e log.debug "error while force flushing", error_class: e.class, error: e log.debug_backtrace end } end
def handle_emits_error(tag, es, error)
def handle_emits_error(tag, es, error) end
def initialize(opts = {})
def initialize(opts = {}) super() @context = nil @outputs = [] @filters = [] @started_outputs = [] @started_filters = [] @log = Engine.log @event_router = EventRouter.new(NoMatchMatch.new(log), self) @error_collector = nil end
def shutdown
def shutdown @started_filters.map { |f| Thread.new do begin log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id f.shutdown rescue => e log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } # Output plugin as filter emits records at shutdown so emit problem still exist. # This problem will be resolved after actual filter mechanizm. @started_outputs.map { |o| Thread.new do begin log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id o.shutdown rescue => e log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } end
def start
def start @outputs.each { |o| o.start @started_outputs << o } @filters.each { |f| f.start @started_filters << f } end