lib/semantic_logger/appender/kafka.rb



begin
  require "kafka"
rescue LoadError
  raise LoadError,
        'Gem ruby-kafka is required for logging to Elasticsearch. Please add the gem "ruby-kafka" to your Gemfile.'
end

require "date"

# Forward all log messages to Apache Kafka.
#
# Example:
#
#   SemanticLogger.add_appender(
#     appender: :kafka,
#
#     # At least one of these nodes must be available:
#     seed_brokers: ["kafka1:9092", "kafka2:9092"],
#
#     # Set an optional client id in order to identify the client to Kafka:
#     client_id: "my-application",
#   )
module SemanticLogger
  module Appender
    class Kafka < SemanticLogger::Subscriber
      attr_accessor :seed_brokers, :client_id, :connect_timeout, :socket_timeout,
                    :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key, :ssl_ca_certs_from_system,
                    :delivery_threshold, :delivery_interval, :required_acks,
                    :topic, :partition, :partition_key, :key

      # Send log messages to Kafka in JSON format.
      #
      # Kafka Parameters:
      #
      #   seed_brokers: [Array<String>, String]
      #     The list of brokers used to initialize the client. Either an Array of connections,
      #     or a comma separated string of connections.
      #     Connections can either be a string of "port:protocol" or a full URI with a scheme.
      #     If there's a scheme it's ignored and only host/port are used.
      #
      #   client_id: [String]
      #     The identifier for this application.
      #     Default: semantic-logger
      #
      #   topic: [String]
      #     Topic to publish log messages to.
      #     Default: 'log_messages'
      #
      #   partition: [Integer]
      #     The partition that the message should be written to.
      #     Default: nil
      #
      #   partition_key: [String]
      #     The key that should be used to assign a partition.
      #     Default: nil
      #
      #   key: [String]
      #     The message key.
      #     Default: nil
      #
      #   connect_timeout: [Integer]
      #     The timeout setting for connecting to brokers.
      #     Default: nil
      #
      #   socket_timeout: [Integer]
      #     The timeout setting for socket connections.
      #     Default: nil
      #
      #   ssl_ca_cert: [String, Array<String>]
      #     A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
      #     Default: nil
      #
      #   ssl_client_cert: [String]
      #     A PEM encoded client cert to use with a SSL connection.
      #     Must be used in combination with ssl_client_cert_key.
      #     Default: nil
      #
      #   ssl_client_cert_key [String]
      #     A PEM encoded client cert key to use with a SSL connection.
      #     Must be used in combination with ssl_client_cert.
      #     Default: nil
      #
      #   ssl_ca_certs_from_system: [boolean]
      #     Delegate SSL CA cert to the system certs
      #
      #    delivery_threshold: [Integer]
      #      Number of messages between triggering a delivery of messages to Apache Kafka.
      #      Default: 100
      #
      #    delivery_interval: [Integer]
      #      Number of seconds between triggering a delivery of messages to Apache Kafka.
      #      Default: 5
      #
      #    required_acks: [Integer]
      #      Number of replicas that must acknowledge receipt of each log message to the topic
      #      Default: 1
      #
      # Semantic Logger Parameters:
      #
      #   level: [:trace | :debug | :info | :warn | :error | :fatal]
      #     Override the log level for this appender.
      #     Default: SemanticLogger.default_level
      #
      #   formatter: [Object|Proc|Symbol|Hash]
      #     An instance of a class that implements #call, or a Proc to be used to format
      #     the output from this appender
      #     Default: :raw_json (See: #call)
      #
      #   filter: [Regexp|Proc]
      #     RegExp: Only include log messages where the class name matches the supplied.
      #     regular expression. All other messages will be ignored.
      #     Proc: Only include log messages where the supplied Proc returns true
      #           The Proc must return true or false.
      #
      #   host: [String]
      #     Name of this host to appear in log messages.
      #     Default: SemanticLogger.host
      #
      #   application: [String]
      #     Name of this application to appear in log messages.
      #     Default: SemanticLogger.application
      #
      #   metrics: [Boolean]
      #     Send metrics only events to kafka.
      #     Default: true
      def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,
                     ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false,
                     topic: "log_messages", partition: nil, partition_key: nil, key: nil,
                     delivery_threshold: 100, delivery_interval: 10, required_acks: 1,
                     metrics: true, **args, &block)
        @seed_brokers             = seed_brokers
        @client_id                = client_id
        @connect_timeout          = connect_timeout
        @socket_timeout           = socket_timeout
        @ssl_ca_cert              = ssl_ca_cert
        @ssl_client_cert          = ssl_client_cert
        @ssl_client_cert_key      = ssl_client_cert_key
        @ssl_ca_certs_from_system = ssl_ca_certs_from_system
        @topic                    = topic
        @partition                = partition
        @partition_key            = partition_key
        @key                      = key
        @delivery_threshold       = delivery_threshold
        @delivery_interval        = delivery_interval
        @required_acks            = required_acks

        super(metrics: metrics, **args, &block)
        reopen
      end

      def reopen
        @kafka = ::Kafka.new(
          seed_brokers:             seed_brokers,
          client_id:                client_id,
          connect_timeout:          connect_timeout,
          socket_timeout:           socket_timeout,
          ssl_ca_cert:              ssl_ca_cert,
          ssl_client_cert:          ssl_client_cert,
          ssl_client_cert_key:      ssl_client_cert_key,
          ssl_ca_certs_from_system: ssl_ca_certs_from_system,
          logger:                   logger
        )

        @producer = @kafka.async_producer(
          delivery_threshold: delivery_threshold,
          delivery_interval:  delivery_interval,
          required_acks:      required_acks
        )
      end

      def close
        @producer&.shutdown
        @producer = nil
        @kafka&.close
        @kafka = nil
      end

      # Forward log messages to Kafka producer thread.
      def log(log)
        json = formatter.call(log, self)
        @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key)
      end

      # Use JSON Formatter by default.
      def default_formatter
        SemanticLogger::Formatters::Json.new
      end

      # Restart producer thread since there is no other way to flush.
      def flush
        @producer.shutdown
        @producer = @kafka.async_producer(
          delivery_threshold: delivery_threshold,
          delivery_interval:  delivery_interval
        )
      end

      private

      attr_reader :producer
    end
  end
end