class SemanticLogger::Appender::Kafka

def close

def close
  @producer&.shutdown
  @producer = nil
  @kafka&.close
  @kafka = nil
end

def default_formatter

Use JSON Formatter by default.
def default_formatter
  SemanticLogger::Formatters::Json.new
end

def flush

Restart producer thread since there is no other way to flush.
def flush
  @producer.shutdown
  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval
  )
end

def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,

Default: true
Send metrics only events to kafka.
metrics: [Boolean]

Default: SemanticLogger.application
Name of this application to appear in log messages.
application: [String]

Default: SemanticLogger.host
Name of this host to appear in log messages.
host: [String]

The Proc must return true or false.
Proc: Only include log messages where the supplied Proc returns true
regular expression. All other messages will be ignored.
RegExp: Only include log messages where the class name matches the supplied.
filter: [Regexp|Proc]

Default: :raw_json (See: #call)
the output from this appender
An instance of a class that implements #call, or a Proc to be used to format
formatter: [Object|Proc|Symbol|Hash]

Default: SemanticLogger.default_level
Override the log level for this appender.
level: [:trace | :debug | :info | :warn | :error | :fatal]

Semantic Logger Parameters:

Default: 1
Number of replicas that must acknowledge receipt of each log message to the topic
required_acks: [Integer]

Default: 5
Number of seconds between triggering a delivery of messages to Apache Kafka.
delivery_interval: [Integer]

Default: 100
Number of messages between triggering a delivery of messages to Apache Kafka.
delivery_threshold: [Integer]

Delegate SSL CA cert to the system certs
ssl_ca_certs_from_system: [boolean]

Default: nil
Must be used in combination with ssl_client_cert.
A PEM encoded client cert key to use with a SSL connection.
ssl_client_cert_key [String]

Default: nil
Must be used in combination with ssl_client_cert_key.
A PEM encoded client cert to use with a SSL connection.
ssl_client_cert: [String]

Default: nil
A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
ssl_ca_cert: [String, Array]

Default: nil
The timeout setting for socket connections.
socket_timeout: [Integer]

Default: nil
The timeout setting for connecting to brokers.
connect_timeout: [Integer]

Default: nil
The message key.
key: [String]

Default: nil
The key that should be used to assign a partition.
partition_key: [String]

Default: nil
The partition that the message should be written to.
partition: [Integer]

Default: 'log_messages'
Topic to publish log messages to.
topic: [String]

Default: semantic-logger
The identifier for this application.
client_id: [String]

If there's a scheme it's ignored and only host/port are used.
Connections can either be a string of "port:protocol" or a full URI with a scheme.
or a comma separated string of connections.
The list of brokers used to initialize the client. Either an Array of connections,
seed_brokers: [Array, String]

Kafka Parameters:

Send log messages to Kafka in JSON format.
def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,
               ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false,
               topic: "log_messages", partition: nil, partition_key: nil, key: nil,
               delivery_threshold: 100, delivery_interval: 10, required_acks: 1,
               metrics: true, **args, &block)
  @seed_brokers             = seed_brokers
  @client_id                = client_id
  @connect_timeout          = connect_timeout
  @socket_timeout           = socket_timeout
  @ssl_ca_cert              = ssl_ca_cert
  @ssl_client_cert          = ssl_client_cert
  @ssl_client_cert_key      = ssl_client_cert_key
  @ssl_ca_certs_from_system = ssl_ca_certs_from_system
  @topic                    = topic
  @partition                = partition
  @partition_key            = partition_key
  @key                      = key
  @delivery_threshold       = delivery_threshold
  @delivery_interval        = delivery_interval
  @required_acks            = required_acks
  super(metrics: metrics, **args, &block)
  reopen
end

def log(log)

Forward log messages to Kafka producer thread.
def log(log)
  json = formatter.call(log, self)
  @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key)
end

def reopen

def reopen
  @kafka = ::Kafka.new(
    seed_brokers:             seed_brokers,
    client_id:                client_id,
    connect_timeout:          connect_timeout,
    socket_timeout:           socket_timeout,
    ssl_ca_cert:              ssl_ca_cert,
    ssl_client_cert:          ssl_client_cert,
    ssl_client_cert_key:      ssl_client_cert_key,
    ssl_ca_certs_from_system: ssl_ca_certs_from_system,
    logger:                   logger
  )
  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval,
    required_acks:      required_acks
  )
end