lib/fluent/plugin/in_forward.rb



#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#


require 'fluent/plugin/input'
require 'fluent/msgpack_factory'
require 'yajl'
require 'digest'
require 'securerandom'

module Fluent::Plugin
  class ForwardInput < Input
    Fluent::Plugin.register_input('forward', self)

    # See the wiki page below for protocol specification
    # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1

    helpers :server

    LISTEN_PORT = 24224

    desc 'The port to listen to.'
    config_param :port, :integer, default: LISTEN_PORT
    desc 'The bind address to listen to.'
    config_param :bind, :string, default: '0.0.0.0'

    config_param :backlog, :integer, default: nil
    # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src
    desc 'The timeout time used to set linger option.'
    config_param :linger_timeout, :integer, default: 0
    # This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value.
    config_param :blocking_timeout, :time, default: 0.5
    desc 'Try to resolve hostname from IP addresses or not.'
    config_param :resolve_hostname, :bool, default: nil
    desc 'Connections will be disconnected right after receiving first message if this value is true.'
    config_param :deny_keepalive, :bool, default: false
    desc 'Check the remote connection is still available by sending a keepalive packet if this value is true.'
    config_param :send_keepalive_packet, :bool, default: false

    desc 'Log warning if received chunk size is larger than this value.'
    config_param :chunk_size_warn_limit, :size, default: nil
    desc 'Received chunk is dropped if it is larger than this value.'
    config_param :chunk_size_limit, :size, default: nil
    desc 'Skip an event if incoming event is invalid.'
    config_param :skip_invalid_event, :bool, default: false

    desc "The field name of the client's source address."
    config_param :source_address_key, :string, default: nil
    desc "The field name of the client's hostname."
    config_param :source_hostname_key, :string, default: nil

    desc "New tag instead of incoming tag"
    config_param :tag, :string, default: nil
    desc "Add prefix to incoming tag"
    config_param :add_tag_prefix, :string, default: nil

    config_section :security, required: false, multi: false do
      desc 'The hostname'
      config_param :self_hostname, :string
      desc 'Shared key for authentication'
      config_param :shared_key, :string, secret: true
      desc 'If true, use user based authentication'
      config_param :user_auth, :bool, default: false
      desc 'Allow anonymous source. <client> sections required if disabled.'
      config_param :allow_anonymous_source, :bool, default: true

      ### User based authentication
      config_section :user, param_name: :users, required: false, multi: true do
        desc 'The username for authentication'
        config_param :username, :string
        desc 'The password for authentication'
        config_param :password, :string, secret: true
      end

      ### Client ip/network authentication & per_host shared key
      config_section :client, param_name: :clients, required: false, multi: true do
        desc 'The IP address or host name of the client'
        config_param :host, :string, default: nil
        desc 'Network address specification'
        config_param :network, :string, default: nil
        desc 'Shared key per client'
        config_param :shared_key, :string, default: nil, secret: true
        desc 'Array of username.'
        config_param :users, :array, default: []
      end
    end

    def configure(conf)
      super

      if @source_hostname_key
        # TODO: add test
        if @resolve_hostname.nil?
          @resolve_hostname = true
        elsif !@resolve_hostname # user specifies "false" in config
          raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
        end
      end
      @enable_field_injection = @source_address_key || @source_hostname_key

      raise Fluent::ConfigError, "'tag' parameter must not be empty" if @tag && @tag.empty?
      raise Fluent::ConfigError, "'add_tag_prefix' parameter must not be empty" if @add_tag_prefix && @add_tag_prefix.empty?

      if @security
        if @security.user_auth && @security.users.empty?
          raise Fluent::ConfigError, "<user> sections required if user_auth enabled"
        end
        if !@security.allow_anonymous_source && @security.clients.empty?
          raise Fluent::ConfigError, "<client> sections required if allow_anonymous_source disabled"
        end

        @nodes = []

        @security.clients.each do |client|
          if client.host && client.network
            raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
          end
          if !client.host && !client.network
            raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
          end
          source = nil
          if client.host
            begin
              source = IPSocket.getaddress(client.host)
            rescue SocketError
              raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
            end
          end
          source_addr = begin
                          IPAddr.new(source || client.network)
                        rescue ArgumentError
                          raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                        end
          @nodes.push({
              address: source_addr,
              shared_key: (client.shared_key || @security.shared_key),
              users: client.users
            })
        end
      end

      if @send_keepalive_packet && @deny_keepalive
        raise Fluent::ConfigError, "both 'send_keepalive_packet' and 'deny_keepalive' cannot be set to true"
      end
    end

    def multi_workers_ready?
      true
    end

    HEARTBEAT_UDP_PAYLOAD = "\0"

    def start
      super

      shared_socket = system_config.workers > 1

      log.info "listening port", port: @port, bind: @bind
      server_create_connection(
        :in_forward_server, @port,
        bind: @bind,
        shared: shared_socket,
        resolve_name: @resolve_hostname,
        linger_timeout: @linger_timeout,
        send_keepalive_packet: @send_keepalive_packet,
        backlog: @backlog,
        &method(:handle_connection)
      )

      server_create(:in_forward_server_udp_heartbeat, @port, shared: shared_socket, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock|
        log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data
        begin
          sock.write HEARTBEAT_UDP_PAYLOAD
        rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
          log.trace "error while heartbeat response", host: sock.remote_host, error: e
        end
      end
    end

    def handle_connection(conn)
      send_data = ->(serializer, data){ conn.write serializer.call(data) }

      log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port
      state = :established
      nonce = nil
      user_auth_salt = nil

      if @security
        # security enabled session MUST use MessagePack as serialization format
        state = :helo
        nonce = generate_salt
        user_auth_salt = generate_salt
        send_data.call(:to_msgpack.to_proc, generate_helo(nonce, user_auth_salt))
        state = :pingpong
      end

      log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port

      read_messages(conn) do |msg, chunk_size, serializer|
        case state
        when :pingpong
          success, reason_or_salt, shared_key = check_ping(msg, conn.remote_addr, user_auth_salt, nonce)
          unless success
            conn.on(:write_complete) { |c| c.close_after_write_complete }
            send_data.call(serializer, generate_pong(false, reason_or_salt, nonce, shared_key))
            next
          end
          send_data.call(serializer, generate_pong(true, reason_or_salt, nonce, shared_key))

          log.debug "connection established", address: conn.remote_addr, port: conn.remote_port
          state = :established
        when :established
          options = on_message(msg, chunk_size, conn)
          if options && r = response(options)
            log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
            conn.on(:write_complete) { |c| c.close } if @deny_keepalive
            send_data.call(serializer, r)
          else
            if @deny_keepalive
              conn.close
            end
          end
        else
          raise "BUG: unknown session state: #{state}"
        end
      end
    end

    def read_messages(conn, &block)
      feeder = nil
      serializer = nil
      bytes = 0
      conn.data do |data|
        # only for first call of callback
        unless feeder
          first = data[0]
          if first == '{' || first == '[' # json
            parser = Yajl::Parser.new
            parser.on_parse_complete = ->(obj){
              block.call(obj, bytes, serializer)
              bytes = 0
            }
            serializer = :to_json.to_proc
            feeder = ->(d){ parser << d }
          else # msgpack
            parser = Fluent::Engine.msgpack_factory.unpacker
            serializer = :to_msgpack.to_proc
            feeder = ->(d){
              parser.feed_each(d){|obj|
                block.call(obj, bytes, serializer)
                bytes = 0
              }
            }
          end
        end

        bytes += data.bytesize
        feeder.call(data)
      end
    end

    def response(option)
      if option && option['chunk']
        return { 'ack' => option['chunk'] }
      end
      nil
    end

    def on_message(msg, chunk_size, conn)
      if msg.nil?
        # for future TCP heartbeat_request
        return
      end

      # TODO: raise an exception if broken chunk is generated by recoverable situation
      unless msg.is_a?(Array)
        log.warn "incoming chunk is broken:", host: conn.remote_host, msg: msg
        return
      end

      tag = msg[0]
      entries = msg[1]

      if @chunk_size_limit && (chunk_size > @chunk_size_limit)
        log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: conn.remote_host, limit: @chunk_size_limit, size: chunk_size
        return
      elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
        log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: conn.remote_host, limit: @chunk_size_warn_limit, size: chunk_size
      end

      tag = @tag.dup if @tag
      tag = "#{@add_tag_prefix}.#{tag}" if @add_tag_prefix

      case entries
      when String
        # PackedForward
        option = msg[2]
        size = (option && option['size']) || 0
        es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
        es = es_class.new(entries, nil, size.to_i)
        es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
        if @enable_field_injection
          es = add_source_info(es, conn)
        end
        router.emit_stream(tag, es)

      when Array
        # Forward
        es = if @skip_invalid_event
               check_and_skip_invalid_event(tag, entries, conn.remote_host)
             else
               es = Fluent::MultiEventStream.new
               entries.each { |e|
                 record = e[1]
                 next if record.nil?
                 time = e[0]
                 time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
                 es.add(time, record)
               }
               es
             end
        if @enable_field_injection
          es = add_source_info(es, conn)
        end
        router.emit_stream(tag, es)
        option = msg[2]

      else
        # Message
        time = msg[1]
        record = msg[2]
        if @skip_invalid_event && invalid_event?(tag, time, record)
          log.warn "got invalid event and drop it:", host: conn.remote_host, tag: tag, time: time, record: record
          return msg[3] # retry never succeeded so return ack and drop incoming event.
        end
        return if record.nil?
        time = Fluent::Engine.now if time.to_i == 0
        if @enable_field_injection
          record[@source_address_key] = conn.remote_addr if @source_address_key
          record[@source_hostname_key] = conn.remote_host if @source_hostname_key
        end
        router.emit(tag, time, record)
        option = msg[3]
      end

      # return option for response
      option
    end

    def invalid_event?(tag, time, record)
      !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
    end

    def check_and_skip_invalid_event(tag, es, remote_host)
      new_es = Fluent::MultiEventStream.new
      es.each { |time, record|
        if invalid_event?(tag, time, record)
          log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
          next
        end
        new_es.add(time, record)
      }
      new_es
    end

    def add_source_info(es, conn)
      new_es = Fluent::MultiEventStream.new
      if @source_address_key && @source_hostname_key
        address = conn.remote_addr
        hostname = conn.remote_host
        es.each { |time, record|
          record[@source_address_key] = address
          record[@source_hostname_key] = hostname
          new_es.add(time, record)
        }
      elsif @source_address_key
        address = conn.remote_addr
        es.each { |time, record|
          record[@source_address_key] = address
          new_es.add(time, record)
        }
      elsif @source_hostname_key
        hostname = conn.remote_host
        es.each { |time, record|
          record[@source_hostname_key] = hostname
          new_es.add(time, record)
        }
      else
        raise "BUG: don't call this method in this case"
      end
      new_es
    end

    def select_authenticate_users(node, username)
      if node.nil? || node[:users].empty?
        @security.users.select{|u| u.username == username}
      else
        @security.users.select{|u| node[:users].include?(u.username) && u.username == username}
      end
    end

    def generate_salt
      ::SecureRandom.random_bytes(16)
    end

    def generate_helo(nonce, user_auth_salt)
      log.debug "generating helo"
      # ['HELO', options(hash)]
      ['HELO', {'nonce' => nonce, 'auth' => (@security ? user_auth_salt : ''), 'keepalive' => !@deny_keepalive}]
    end

    def check_ping(message, remote_addr, user_auth_salt, nonce)
      log.debug "checking ping"
      # ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || '']
      unless message.size == 6 && message[0] == 'PING'
        return false, 'invalid ping message'
      end
      _ping, hostname, shared_key_salt, shared_key_hexdigest, username, password_digest = message

      node = @nodes.select{|n| n[:address].include?(remote_addr) rescue false }.first
      if !node && !@security.allow_anonymous_source
        log.warn "Anonymous client disallowed", address: remote_addr, hostname: hostname
        return false, "anonymous source host '#{remote_addr}' denied", nil
      end

      shared_key = node ? node[:shared_key] : @security.shared_key
      serverside = Digest::SHA512.new.update(shared_key_salt).update(hostname).update(nonce).update(shared_key).hexdigest
      if shared_key_hexdigest != serverside
        log.warn "Shared key mismatch", address: remote_addr, hostname: hostname
        return false, 'shared_key mismatch', nil
      end

      if @security.user_auth
        users = select_authenticate_users(node, username)
        success = false
        users.each do |user|
          passhash = Digest::SHA512.new.update(user_auth_salt).update(username).update(user[:password]).hexdigest
          success ||= (passhash == password_digest)
        end
        unless success
          log.warn "Authentication failed", address: remote_addr, hostname: hostname, username: username
          return false, 'username/password mismatch', nil
        end
      end

      return true, shared_key_salt, shared_key
    end

    def generate_pong(auth_result, reason_or_salt, nonce, shared_key)
      log.debug "generating pong"
      # ['PONG', bool(authentication result), 'reason if authentication failed', self_hostname, sha512_hex(salt + self_hostname + nonce + sharedkey)]
      unless auth_result
        return ['PONG', false, reason_or_salt, '', '']
      end

      shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest
      ['PONG', true, '', @security.self_hostname, shared_key_digest_hex]
    end
  end
end