class Fluent::Plugin::MonitorAgentInput

def all_plugins

def all_plugins
  array = []
  # get all input plugins
  array.concat Fluent::Engine.root_agent.inputs
  # get all output plugins
  array.concat Fluent::Engine.root_agent.outputs
  # get all filter plugins
  array.concat Fluent::Engine.root_agent.filters
  Fluent::Engine.root_agent.labels.each { |name, l|
    # TODO: Add label name to outputs / filters for identifing plugins
    array.concat l.outputs
    array.concat l.filters
  }
  array
end

def configure(conf)

def configure(conf)
  super
  @port += fluentd_worker_id
end

def fluentd_opts

def fluentd_opts
  @fluentd_opts ||= get_fluentd_opts
end

def get_fluentd_opts

def get_fluentd_opts
  opts = {}
  ObjectSpace.each_object(Fluent::Supervisor) { |obj|
    opts.merge!(obj.options)
    break
  }
  opts
end

def get_monitor_info(pe, opts={})

get monitor info from the plugin `pe` and return a hash object
def get_monitor_info(pe, opts={})
  obj = {}
  # Common plugin information
  obj['plugin_id'] = pe.plugin_id
  obj['plugin_category'] = plugin_category(pe)
  obj['type'] = pe.config['@type']
  obj['config'] = pe.config if opts[:with_config]
  # run MONITOR_INFO in plugins' instance context and store the info to obj
  MONITOR_INFO.each_pair {|key,code|
    begin
      catch(:skip) do
        obj[key] = pe.instance_exec(&code)
      end
    rescue NoMethodError => e
      unless @first_warn
        log.error "NoMethodError in monitoring plugins", key: key, plugin: pe.class, error: e
        log.error_backtrace
        @first_warn = true
      end
    rescue => e
      log.warn "unexpected error in monitoring plugins", key: key, plugin: pe.class, error: e
    end
  }
  if pe.respond_to?(:statistics)
    obj.merge!(pe.statistics['output'] || {})
  end
  obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry)
  # include all instance variables if :with_debug_info is set
  if opts[:with_debug_info]
    iv = {}
    pe.instance_eval do
      instance_variables.each {|sym|
        next if IGNORE_ATTRIBUTES.include?(sym)
        key = sym.to_s[1..-1]  # removes first '@'
        iv[key] = instance_variable_get(sym)
      }
    end
    obj['instance_variables'] = iv
  elsif ivars = opts[:ivars]
    iv = {}
    ivars.each {|name|
      iname = "@#{name}"
      iv[name] = pe.instance_variable_get(iname) if pe.instance_variable_defined?(iname)
    }
    obj['instance_variables'] = iv
  end
  obj
end

def get_retry_info(pe_retry)

def get_retry_info(pe_retry)
  retry_variables = {}
  if pe_retry
    RETRY_INFO.each_pair { |key, param|
      retry_variables[key] = pe_retry.instance_variable_get(param)
    }
  end
  retry_variables
end

def initialize

def initialize
  super
  @first_warn = false
end

def multi_workers_ready?

def multi_workers_ready?
  true
end

def plugin_category(pe)

def plugin_category(pe)
  case pe
  when Fluent::Plugin::Input
    'input'.freeze
  when Fluent::Plugin::Output, Fluent::Plugin::MultiOutput, Fluent::Plugin::BareOutput
    'output'.freeze
  when Fluent::Plugin::Filter
    'filter'.freeze
  else
    'unknown'.freeze
  end
end

def plugin_info_by_id(plugin_id, opts={})

search a plugin by plugin_id
def plugin_info_by_id(plugin_id, opts={})
  found = all_plugins.find {|pe|
    pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id
  }
  if found
    get_monitor_info(found, opts)
  else
    nil
  end
end

def plugin_info_by_tag(tag, opts={})

TODO: Support output in label
try to match the tag and get the info from the matched output plugin
def plugin_info_by_tag(tag, opts={})
  matches = Fluent::Engine.root_agent.event_router.instance_variable_get(:@match_rules)
  matches.each { |rule|
    if rule.match?(tag)
      if rule.collector.is_a?(Fluent::Plugin::Output) || rule.collector.is_a?(Fluent::Output)
        return get_monitor_info(rule.collector, opts)
      end
    end
  }
  nil
end

def plugins_info_all(opts={})

def plugins_info_all(opts={})
  all_plugins.map {|pe|
    get_monitor_info(pe, opts)
  }
end

def plugins_info_by_type(type, opts={})

multiple plugins could have the same type
This method returns an array because
def plugins_info_by_type(type, opts={})
  array = all_plugins.select {|pe|
    (pe.config['@type'] == type) rescue nil
  }
  array.map {|pe|
    get_monitor_info(pe, opts)
  }
end

def start

def start
  super
  log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins for worker#{fluentd_worker_id}"
  api_handler = APIHandler.new(self)
  create_http_server(addr: @bind, port: @port, logger: log, default_app: NotFoundJson) do |serv|
    serv.get('/api/plugins') { |req| api_handler.plugins_ltsv(req) }
    serv.get('/api/plugins.json') { |req| api_handler.plugins_json(req) }
    serv.get('/api/config') { |req| api_handler.config_ltsv(req) }
    serv.get('/api/config.json') { |req| api_handler.config_json(req) }
  end
  if @tag
    log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"
    opts = {with_config: false, with_retry: false}
    timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
      es = Fluent::MultiEventStream.new
      now = Fluent::Engine.now
      plugins_info_all(opts).each { |record|
        es.add(now, record)
      }
      router.emit_stream(@tag, es)
    }
  end
end