class SemanticLogger::Appender::Kafka
def close
def close @producer&.shutdown @producer = nil @kafka&.close @kafka = nil end
def default_formatter
def default_formatter SemanticLogger::Formatters::Json.new end
def flush
def flush @producer.shutdown @producer = @kafka.async_producer( delivery_threshold: delivery_threshold, delivery_interval: delivery_interval ) end
def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,
Send metrics only events to kafka.
metrics: [Boolean]
Default: SemanticLogger.application
Name of this application to appear in log messages.
application: [String]
Default: SemanticLogger.host
Name of this host to appear in log messages.
host: [String]
The Proc must return true or false.
Proc: Only include log messages where the supplied Proc returns true
regular expression. All other messages will be ignored.
RegExp: Only include log messages where the class name matches the supplied.
filter: [Regexp|Proc]
Default: :raw_json (See: #call)
the output from this appender
An instance of a class that implements #call, or a Proc to be used to format
formatter: [Object|Proc|Symbol|Hash]
Default: SemanticLogger.default_level
Override the log level for this appender.
level: [:trace | :debug | :info | :warn | :error | :fatal]
Semantic Logger Parameters:
Default: 1
Number of replicas that must acknowledge receipt of each log message to the topic
required_acks: [Integer]
Default: 5
Number of seconds between triggering a delivery of messages to Apache Kafka.
delivery_interval: [Integer]
Default: 100
Number of messages between triggering a delivery of messages to Apache Kafka.
delivery_threshold: [Integer]
Delegate SSL CA cert to the system certs
ssl_ca_certs_from_system: [boolean]
Default: nil
Must be used in combination with ssl_client_cert.
A PEM encoded client cert key to use with a SSL connection.
ssl_client_cert_key [String]
Default: nil
Must be used in combination with ssl_client_cert_key.
A PEM encoded client cert to use with a SSL connection.
ssl_client_cert: [String]
Default: nil
A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
ssl_ca_cert: [String, Array
Default: nil
The timeout setting for socket connections.
socket_timeout: [Integer]
Default: nil
The timeout setting for connecting to brokers.
connect_timeout: [Integer]
Default: nil
The message key.
key: [String]
Default: nil
The key that should be used to assign a partition.
partition_key: [String]
Default: nil
The partition that the message should be written to.
partition: [Integer]
Default: 'log_messages'
Topic to publish log messages to.
topic: [String]
Default: semantic-logger
The identifier for this application.
client_id: [String]
If there's a scheme it's ignored and only host/port are used.
Connections can either be a string of "port:protocol" or a full URI with a scheme.
or a comma separated string of connections.
The list of brokers used to initialize the client. Either an Array of connections,
seed_brokers: [Array
Kafka Parameters:
Send log messages to Kafka in JSON format.
def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, required_acks: 1, metrics: true, **args, &block) @seed_brokers = seed_brokers @client_id = client_id @connect_timeout = connect_timeout @socket_timeout = socket_timeout @ssl_ca_cert = ssl_ca_cert @ssl_client_cert = ssl_client_cert @ssl_client_cert_key = ssl_client_cert_key @ssl_ca_certs_from_system = ssl_ca_certs_from_system @topic = topic @partition = partition @partition_key = partition_key @key = key @delivery_threshold = delivery_threshold @delivery_interval = delivery_interval @required_acks = required_acks super(metrics: metrics, **args, &block) reopen end
def log(log)
def log(log) json = formatter.call(log, self) @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key) end
def reopen
def reopen @kafka = ::Kafka.new( seed_brokers: seed_brokers, client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, ssl_ca_cert: ssl_ca_cert, ssl_client_cert: ssl_client_cert, ssl_client_cert_key: ssl_client_cert_key, ssl_ca_certs_from_system: ssl_ca_certs_from_system, logger: logger ) @producer = @kafka.async_producer( delivery_threshold: delivery_threshold, delivery_interval: delivery_interval, required_acks: required_acks ) end