class SemanticLogger::Appender::Elasticsearch
def batch(logs)
def batch(logs) messages = [] logs.each do |log| messages << bulk_index(log) << formatter.call(log, self) end write_to_elasticsearch(messages) true end
def bulk_index(log)
def bulk_index(log) expanded_index_name = log.time.strftime("#{index}-#{date_pattern}") return {"create" => {}} if @data_stream bulk_index = {"index" => {"_index" => expanded_index_name}} bulk_index["index"].merge!({"_type" => type}) if version_supports_type? bulk_index end
def default_formatter
def default_formatter time_key = @data_stream ? "@timestamp" : :timestamp SemanticLogger::Formatters::Raw.new(time_format: :iso_8601, time_key: time_key) end
def initialize(url: "http://localhost:9200",
Specify the HTTP method to use for GET requests with a body.
send_get_body_as [String]
An instance of selector strategy derived from `Elasticsearch::Transport::Transport::Connections::Selector::Base`.
selector [Elasticsearch::Transport::Transport::Connections::Selector::Base]
the transport and passed the transport instance.
A specific serializer class to use, will be initialized by
serializer_class [Constant]
A specific transport instance.
transport [Object]
the client and passed hosts and all arguments.
A specific transport class to use, will be initialized by
transport_class [Constant]
Options to be passed to the `Faraday::Connection` constructor.
transport_options [Hash]
A specific adapter for Faraday (e.g. `:patron`)
adapter [Symbol]
The request timeout to be passed to transport in options.
request_timeout [Integer]
Default: false
Reload connections after failure.
reload_on_failure [true|false]
Retry when specific status codes are returned.
retry_on_status [Array
Default: false
Retry X times when request fails before raising and exception.
retry_on_failure [true|false|Integer]
Default: 1
Timeout for reloading connections in seconds.
sniffer_timeout [Integer]
Default: false
Shuffle connections on initialization and reload.
randomize_hosts [true|false]
Default: false
Reload connections after X requests.
reload_connections [true|false|Integer]
After how many seconds a dead connection should be tried again.
resurrect_after [Float]
:url above is ignored when supplying this option.
Note:
passed as an Array; `host` or `url` keys are also valid.
Single host passed as a String or Hash, or multiple hosts
hosts: [String|Hash|Array]
Default: 'http://localhost:9200'
Fully qualified address to the Elasticsearch service.
url: [String]
Elasticsearch Parameters:
Default: SemanticLogger.application
Name of this application to appear in log messages.
application: [String]
Default: SemanticLogger.host
Name of this host to appear in log messages.
host: [String]
The Proc must return true or false.
Proc: Only include log messages where the supplied Proc returns true
regular expression. All other messages will be ignored.
RegExp: Only include log messages where the class name matches the supplied.
filter: [Regexp|Proc]
Default: :raw_json (See: #call)
the output from this appender
An instance of a class that implements #call, or a Proc to be used to format
formatter: [Object|Proc|Symbol|Hash]
Default: SemanticLogger.default_level
Override the log level for this appender.
level: [:trace | :debug | :info | :warn | :error | :fatal]
Default: 'log'
Deprecated in Elasticsearch 7.0.0.
Document type to associate with logs when they are written.
type: [String]
Default: '%Y.%m.%d'
if you want monthly indexes ('%Y.%m') or weekly ('%Y.%W').
The time format used to generate the full index name. Useful
date_pattern: [String]
Default: 'semantic_logger'
I.e. The final index will look like 'semantic_logger-YYYY.MM.DD'
The final index appends the date so that indexes are used per day.
Prefix of the index to store the logs in Elasticsearch.
index: [String]
Parameters:
Create Elasticsearch appender over persistent HTTP(S)
def initialize(url: "http://localhost:9200", index: "semantic_logger", date_pattern: "%Y.%m.%d", type: "log", level: nil, formatter: nil, filter: nil, application: nil, environment: nil, host: nil, data_stream: false, **elasticsearch_args, &block) @url = url @index = index @date_pattern = date_pattern @type = type @elasticsearch_args = elasticsearch_args.dup @elasticsearch_args[:url] = url if url && !elasticsearch_args[:hosts] @elasticsearch_args[:logger] = logger @data_stream = data_stream super(level: level, formatter: formatter, filter: filter, application: application, environment: environment, host: host, metrics: false, &block) reopen end
def log(log)
def log(log) bulk_payload = formatter.call(log, self) write_to_elasticsearch([bulk_index(log), bulk_payload]) true end
def reopen
def reopen @client = ::Elasticsearch::Client.new(@elasticsearch_args) end
def version_supports_type?
def version_supports_type? Gem::Version.new(::Elasticsearch::VERSION) < Gem::Version.new(7) end
def write_to_elasticsearch(messages)
def write_to_elasticsearch(messages) bulk_result = if @data_stream @client.bulk(index: index, body: messages) else @client.bulk(body: messages) end return unless bulk_result["errors"] failed = bulk_result["items"].reject { |x| x["status"] == 201 } logger.error("ElasticSearch: Write failed. Messages discarded. : #{failed}") end