#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'json'
require 'webrick'
require 'cgi'
require 'fluent/config/types'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/plugin/multi_output'
require 'fluent/plugin/filter'
module Fluent::Plugin
class MonitorAgentInput < Input
Fluent::Plugin.register_input('monitor_agent', self)
helpers :timer, :thread, :http_server
desc 'The address to bind to.'
config_param :bind, :string, default: '0.0.0.0'
desc 'The port to listen to.'
config_param :port, :integer, default: 24220
desc 'The tag with which internal metrics are emitted.'
config_param :tag, :string, default: nil
desc 'Determine the rate to emit internal metrics as events.'
config_param :emit_interval, :time, default: 60
desc 'Determine whether to include the config information.'
config_param :include_config, :bool, default: true
desc 'Determine whether to include the retry information.'
config_param :include_retry, :bool, default: true
class APIHandler
def initialize(agent)
@agent = agent
end
def plugins_ltsv(req)
list = build_object(build_option(req))
render_ltsv(list)
end
def plugins_json(req)
opts = build_option(req)
obj = build_object(opts)
render_json({ 'plugins' => obj }, pretty_json: opts[:pretty_json])
end
def config_ltsv(_req)
obj = {
'pid' => Process.pid,
'ppid' => Process.ppid
}.merge(@agent.fluentd_opts)
render_ltsv([obj])
end
def config_json(req)
obj = {
'pid' => Process.pid,
'ppid' => Process.ppid
}.merge(@agent.fluentd_opts)
opts = build_option(req)
render_json(obj, pretty_json: opts[:pretty_json])
end
private
def render_error_json(code:, msg:, pretty_json: nil, **additional_params)
resp = additional_params.merge('message' => msg)
render_json(resp, code: code, pretty_json: pretty_json)
end
def render_json(obj, code: 200, pretty_json: nil)
body =
if pretty_json
JSON.pretty_generate(obj)
else
obj.to_json
end
[code, { 'Content-Type' => 'application/json' }, body]
end
def render_ltsv(obj, code: 200)
normalized = JSON.parse(obj.to_json)
text = ''
normalized.each do |hash|
row = []
hash.each do |k, v|
if v.is_a?(Array)
row << "#{k}:#{v.join(',')}"
elsif v.is_a?(Hash)
next
else
row << "#{k}:#{v}"
end
end
text << row.join("\t") << "\n"
end
[code, { 'Content-Type' => 'text/plain' }, text]
end
def build_object(opts)
qs = opts[:query]
if tag = qs['tag'.freeze].first
# ?tag= to search an output plugin by match pattern
if obj = @agent.plugin_info_by_tag(tag, opts)
list = [obj]
else
list = []
end
elsif plugin_id = (qs['@id'.freeze].first || qs['id'.freeze].first)
# ?@id= to search a plugin by 'id <plugin_id>' config param
if obj = @agent.plugin_info_by_id(plugin_id, opts)
list = [obj]
else
list = []
end
elsif plugin_type = (qs['@type'.freeze].first || qs['type'.freeze].first)
# ?@type= to search plugins by 'type <type>' config param
list = @agent.plugins_info_by_type(plugin_type, opts)
else
# otherwise show all plugins
list = @agent.plugins_info_all(opts)
end
list
end
def build_option(req)
qs = Hash.new { |_, _| [] }
# parse ?=query string
if req.query_string
qs.merge!(CGI.parse(req.query_string))
end
# if ?debug=1 is set, set :with_debug_info for get_monitor_info
# and :pretty_json for render_json_error
opts = { query: qs }
if qs['debug'.freeze].first
opts[:with_debug_info] = true
opts[:pretty_json] = true
end
if ivars = qs['with_ivars'.freeze].first
opts[:ivars] = ivars.split(',')
end
if with_config = qs['with_config'.freeze].first
opts[:with_config] = Fluent::Config.bool_value(with_config)
else
opts[:with_config] = @agent.include_config
end
if with_retry = qs['with_retry'.freeze].first
opts[:with_retry] = Fluent::Config.bool_value(with_retry)
else
opts[:with_retry] = @agent.include_retry
end
opts
end
end
def initialize
super
@first_warn = false
end
def configure(conf)
super
@port += fluentd_worker_id
end
def multi_workers_ready?
true
end
class NotFoundJson
BODY = { 'message' => 'Not found' }.to_json
def self.call(_req)
[404, { 'Content-Type' => 'application/json' }, BODY]
end
end
def start
super
log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins for worker#{fluentd_worker_id}"
api_handler = APIHandler.new(self)
create_http_server(addr: @bind, port: @port, logger: log, default_app: NotFoundJson) do |serv|
serv.get('/api/plugins') { |req| api_handler.plugins_ltsv(req) }
serv.get('/api/plugins.json') { |req| api_handler.plugins_json(req) }
serv.get('/api/config') { |req| api_handler.config_ltsv(req) }
serv.get('/api/config.json') { |req| api_handler.config_json(req) }
end
if @tag
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"
opts = {with_config: false, with_retry: false}
timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
es = Fluent::MultiEventStream.new
now = Fluent::Engine.now
plugins_info_all(opts).each { |record|
es.add(now, record)
}
router.emit_stream(@tag, es)
}
end
end
# They are deprecated but remain for compatibility
MONITOR_INFO = {
'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) },
'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys },
'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size },
'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil },
}
def all_plugins
array = []
# get all input plugins
array.concat Fluent::Engine.root_agent.inputs
# get all output plugins
array.concat Fluent::Engine.root_agent.outputs
# get all filter plugins
array.concat Fluent::Engine.root_agent.filters
Fluent::Engine.root_agent.labels.each { |name, l|
# TODO: Add label name to outputs / filters for identifing plugins
array.concat l.outputs
array.concat l.filters
}
array
end
# try to match the tag and get the info from the matched output plugin
# TODO: Support output in label
def plugin_info_by_tag(tag, opts={})
matches = Fluent::Engine.root_agent.event_router.instance_variable_get(:@match_rules)
matches.each { |rule|
if rule.match?(tag)
if rule.collector.is_a?(Fluent::Plugin::Output) || rule.collector.is_a?(Fluent::Output)
return get_monitor_info(rule.collector, opts)
end
end
}
nil
end
# search a plugin by plugin_id
def plugin_info_by_id(plugin_id, opts={})
found = all_plugins.find {|pe|
pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id
}
if found
get_monitor_info(found, opts)
else
nil
end
end
# This method returns an array because
# multiple plugins could have the same type
def plugins_info_by_type(type, opts={})
array = all_plugins.select {|pe|
(pe.config['@type'] == type) rescue nil
}
array.map {|pe|
get_monitor_info(pe, opts)
}
end
def plugins_info_all(opts={})
all_plugins.map {|pe|
get_monitor_info(pe, opts)
}
end
IGNORE_ATTRIBUTES = %i(@config_root_section @config @masked_config)
# get monitor info from the plugin `pe` and return a hash object
def get_monitor_info(pe, opts={})
obj = {}
# Common plugin information
obj['plugin_id'] = pe.plugin_id
obj['plugin_category'] = plugin_category(pe)
obj['type'] = pe.config['@type']
obj['config'] = pe.config if opts[:with_config]
# run MONITOR_INFO in plugins' instance context and store the info to obj
MONITOR_INFO.each_pair {|key,code|
begin
catch(:skip) do
obj[key] = pe.instance_exec(&code)
end
rescue NoMethodError => e
unless @first_warn
log.error "NoMethodError in monitoring plugins", key: key, plugin: pe.class, error: e
log.error_backtrace
@first_warn = true
end
rescue => e
log.warn "unexpected error in monitoring plugins", key: key, plugin: pe.class, error: e
end
}
if pe.respond_to?(:statistics)
obj.merge!(pe.statistics['output'] || {})
end
obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry)
# include all instance variables if :with_debug_info is set
if opts[:with_debug_info]
iv = {}
pe.instance_eval do
instance_variables.each {|sym|
next if IGNORE_ATTRIBUTES.include?(sym)
key = sym.to_s[1..-1] # removes first '@'
iv[key] = instance_variable_get(sym)
}
end
obj['instance_variables'] = iv
elsif ivars = opts[:ivars]
iv = {}
ivars.each {|name|
iname = "@#{name}"
iv[name] = pe.instance_variable_get(iname) if pe.instance_variable_defined?(iname)
}
obj['instance_variables'] = iv
end
obj
end
RETRY_INFO = {
'start' => '@start',
'steps' => '@steps',
'next_time' => '@next_time',
}
def get_retry_info(pe_retry)
retry_variables = {}
if pe_retry
RETRY_INFO.each_pair { |key, param|
retry_variables[key] = pe_retry.instance_variable_get(param)
}
end
retry_variables
end
def plugin_category(pe)
case pe
when Fluent::Plugin::Input
'input'.freeze
when Fluent::Plugin::Output, Fluent::Plugin::MultiOutput, Fluent::Plugin::BareOutput
'output'.freeze
when Fluent::Plugin::Filter
'filter'.freeze
else
'unknown'.freeze
end
end
def fluentd_opts
@fluentd_opts ||= get_fluentd_opts
end
def get_fluentd_opts
opts = {}
ObjectSpace.each_object(Fluent::Supervisor) { |obj|
opts.merge!(obj.options)
break
}
opts
end
end
end