lib/fluent/plugin/in_monitor_agent.rb



#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'json'
require 'webrick'
require 'cgi'

require 'fluent/config/types'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/plugin/multi_output'
require 'fluent/plugin/filter'

module Fluent::Plugin
  class MonitorAgentInput < Input
    Fluent::Plugin.register_input('monitor_agent', self)

    helpers :timer, :thread, :http_server

    desc 'The address to bind to.'
    config_param :bind, :string, default: '0.0.0.0'
    desc 'The port to listen to.'
    config_param :port, :integer, default: 24220
    desc 'The tag with which internal metrics are emitted.'
    config_param :tag, :string, default: nil
    desc 'Determine the rate to emit internal metrics as events.'
    config_param :emit_interval, :time, default: 60
    desc 'Determine whether to include the config information.'
    config_param :include_config, :bool, default: true
    desc 'Determine whether to include the retry information.'
    config_param :include_retry, :bool, default: true

    class APIHandler
      def initialize(agent)
        @agent = agent
      end

      def plugins_ltsv(req)
        list = build_object(build_option(req))

        render_ltsv(list)
      end

      def plugins_json(req)
        opts = build_option(req)
        obj = build_object(opts)

        render_json({ 'plugins' => obj }, pretty_json: opts[:pretty_json])
      end

      def config_ltsv(_req)
        obj = {
          'pid' => Process.pid,
          'ppid' => Process.ppid
        }.merge(@agent.fluentd_opts)

        render_ltsv([obj])
      end

      def config_json(req)
        obj = {
          'pid' => Process.pid,
          'ppid' => Process.ppid
        }.merge(@agent.fluentd_opts)
        opts = build_option(req)

        render_json(obj, pretty_json: opts[:pretty_json])
      end

      private

      def render_error_json(code:, msg:, pretty_json: nil, **additional_params)
        resp = additional_params.merge('message' => msg)
        render_json(resp, code: code, pretty_json: pretty_json)
      end

      def render_json(obj, code: 200, pretty_json: nil)
        body =
          if pretty_json
            JSON.pretty_generate(obj)
          else
            obj.to_json
          end

        [code, { 'Content-Type' => 'application/json' }, body]
      end

      def render_ltsv(obj, code: 200)
        normalized = JSON.parse(obj.to_json)
        text = ''
        normalized.each do |hash|
          row = []
          hash.each do |k, v|
            if v.is_a?(Array)
              row << "#{k}:#{v.join(',')}"
            elsif v.is_a?(Hash)
              next
            else
              row << "#{k}:#{v}"
            end
          end

          text << row.join("\t") << "\n"
        end

        [code, { 'Content-Type' => 'text/plain' }, text]
      end

      def build_object(opts)
        qs = opts[:query]
        if tag = qs['tag'.freeze].first
          # ?tag= to search an output plugin by match pattern
          if obj = @agent.plugin_info_by_tag(tag, opts)
            list = [obj]
          else
            list = []
          end
        elsif plugin_id = (qs['@id'.freeze].first || qs['id'.freeze].first)
          # ?@id= to search a plugin by 'id <plugin_id>' config param
          if obj = @agent.plugin_info_by_id(plugin_id, opts)
            list = [obj]
          else
            list = []
          end
        elsif plugin_type = (qs['@type'.freeze].first || qs['type'.freeze].first)
          # ?@type= to search plugins by 'type <type>' config param
          list = @agent.plugins_info_by_type(plugin_type, opts)
        else
          # otherwise show all plugins
          list = @agent.plugins_info_all(opts)
        end

        list
      end

      def build_option(req)
        qs = Hash.new { |_, _| [] }
        # parse ?=query string
        if req.query_string
          qs.merge!(CGI.parse(req.query_string))
        end

        # if ?debug=1 is set, set :with_debug_info for get_monitor_info
        # and :pretty_json for render_json_error
        opts = { query: qs }
        if qs['debug'.freeze].first
          opts[:with_debug_info] = true
          opts[:pretty_json] = true
        end

        if ivars = qs['with_ivars'.freeze].first
          opts[:ivars] = ivars.split(',')
        end

        if with_config = qs['with_config'.freeze].first
          opts[:with_config] = Fluent::Config.bool_value(with_config)
        else
          opts[:with_config] = @agent.include_config
        end

        if with_retry = qs['with_retry'.freeze].first
          opts[:with_retry] = Fluent::Config.bool_value(with_retry)
        else
          opts[:with_retry] = @agent.include_retry
        end

        opts
      end
    end

    def initialize
      super

      @first_warn = false
    end

    def configure(conf)
      super
      @port += fluentd_worker_id
    end

    def multi_workers_ready?
      true
    end

    class NotFoundJson
      BODY = { 'message' => 'Not found' }.to_json
      def self.call(_req)
        [404, { 'Content-Type' => 'application/json' }, BODY]
      end
    end

    def start
      super

      log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins for worker#{fluentd_worker_id}"
      api_handler = APIHandler.new(self)
      create_http_server(addr: @bind, port: @port, logger: log, default_app: NotFoundJson) do |serv|
        serv.get('/api/plugins') { |req| api_handler.plugins_ltsv(req) }
        serv.get('/api/plugins.json') { |req| api_handler.plugins_json(req) }
        serv.get('/api/config') { |req| api_handler.config_ltsv(req) }
        serv.get('/api/config.json') { |req| api_handler.config_json(req) }
      end

      if @tag
        log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

        opts = {with_config: false, with_retry: false}
        timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
          es = Fluent::MultiEventStream.new
          now = Fluent::Engine.now
          plugins_info_all(opts).each { |record|
            es.add(now, record)
          }
          router.emit_stream(@tag, es)
        }
      end
    end

    # They are deprecated but remain for compatibility
    MONITOR_INFO = {
      'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) },
      'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
      'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys },
      'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size },
      'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil },
    }

    def all_plugins
      array = []

      # get all input plugins
      array.concat Fluent::Engine.root_agent.inputs

      # get all output plugins
      array.concat Fluent::Engine.root_agent.outputs

      # get all filter plugins
      array.concat Fluent::Engine.root_agent.filters

      Fluent::Engine.root_agent.labels.each { |name, l|
        # TODO: Add label name to outputs / filters for identifing plugins
        array.concat l.outputs
        array.concat l.filters
      }

      array
    end

    # try to match the tag and get the info from the matched output plugin
    # TODO: Support output in label
    def plugin_info_by_tag(tag, opts={})
      matches = Fluent::Engine.root_agent.event_router.instance_variable_get(:@match_rules)
      matches.each { |rule|
        if rule.match?(tag)
          if rule.collector.is_a?(Fluent::Plugin::Output) || rule.collector.is_a?(Fluent::Output)
            return get_monitor_info(rule.collector, opts)
          end
        end
      }
      nil
    end

    # search a plugin by plugin_id
    def plugin_info_by_id(plugin_id, opts={})
      found = all_plugins.find {|pe|
        pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id
      }
      if found
        get_monitor_info(found, opts)
      else
        nil
      end
    end

    # This method returns an array because
    # multiple plugins could have the same type
    def plugins_info_by_type(type, opts={})
      array = all_plugins.select {|pe|
        (pe.config['@type'] == type) rescue nil
      }
      array.map {|pe|
        get_monitor_info(pe, opts)
      }
    end

    def plugins_info_all(opts={})
      all_plugins.map {|pe|
        get_monitor_info(pe, opts)
      }
    end

    IGNORE_ATTRIBUTES = %i(@config_root_section @config @masked_config)

    # get monitor info from the plugin `pe` and return a hash object
    def get_monitor_info(pe, opts={})
      obj = {}

      # Common plugin information
      obj['plugin_id'] = pe.plugin_id
      obj['plugin_category'] = plugin_category(pe)
      obj['type'] = pe.config['@type']
      obj['config'] = pe.config if opts[:with_config]

      # run MONITOR_INFO in plugins' instance context and store the info to obj
      MONITOR_INFO.each_pair {|key,code|
        begin
          catch(:skip) do
            obj[key] = pe.instance_exec(&code)
          end
        rescue NoMethodError => e
          unless @first_warn
            log.error "NoMethodError in monitoring plugins", key: key, plugin: pe.class, error: e
            log.error_backtrace
            @first_warn = true
          end
        rescue => e
          log.warn "unexpected error in monitoring plugins", key: key, plugin: pe.class, error: e
        end
      }

      if pe.respond_to?(:statistics)
        obj.merge!(pe.statistics['output'] || {})
      end

      obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry)

      # include all instance variables if :with_debug_info is set
      if opts[:with_debug_info]
        iv = {}
        pe.instance_eval do
          instance_variables.each {|sym|
            next if IGNORE_ATTRIBUTES.include?(sym)
            key = sym.to_s[1..-1]  # removes first '@'
            iv[key] = instance_variable_get(sym)
          }
        end
        obj['instance_variables'] = iv
      elsif ivars = opts[:ivars]
        iv = {}
        ivars.each {|name|
          iname = "@#{name}"
          iv[name] = pe.instance_variable_get(iname) if pe.instance_variable_defined?(iname)
        }
        obj['instance_variables'] = iv
      end

      obj
    end

    RETRY_INFO = {
        'start' => '@start',
        'steps' => '@steps',
        'next_time' => '@next_time',
    }

    def get_retry_info(pe_retry)
      retry_variables = {}

      if pe_retry
        RETRY_INFO.each_pair { |key, param|
          retry_variables[key] = pe_retry.instance_variable_get(param)
        }
      end

      retry_variables
    end

    def plugin_category(pe)
      case pe
      when Fluent::Plugin::Input
        'input'.freeze
      when Fluent::Plugin::Output, Fluent::Plugin::MultiOutput, Fluent::Plugin::BareOutput
        'output'.freeze
      when Fluent::Plugin::Filter
        'filter'.freeze
      else
        'unknown'.freeze
      end
    end

    def fluentd_opts
      @fluentd_opts ||= get_fluentd_opts
    end

    def get_fluentd_opts
      opts = {}
      ObjectSpace.each_object(Fluent::Supervisor) { |obj|
        opts.merge!(obj.options)
        break
      }
      opts
    end
  end
end