class SemanticLogger::Appender::Elasticsearch

def batch(logs)

def batch(logs)
  messages = []
  logs.each do |log|
    messages << bulk_index(log) << formatter.call(log, self)
  end
  write_to_elasticsearch(messages)
  true
end

def bulk_index(log)

def bulk_index(log)
  expanded_index_name = log.time.strftime("#{index}-#{date_pattern}")
  return {"create" => {}} if @data_stream
  bulk_index = {"index" => {"_index" => expanded_index_name}}
  bulk_index["index"].merge!({"_type" => type}) if version_supports_type?
  bulk_index
end

def default_formatter

def default_formatter
  time_key = @data_stream ? "@timestamp" : :timestamp
  SemanticLogger::Formatters::Raw.new(time_format: :iso_8601, time_key: time_key)
end

def initialize(url: "http://localhost:9200",

Default: 'GET'
Specify the HTTP method to use for GET requests with a body.
send_get_body_as [String]

An instance of selector strategy derived from `Elasticsearch::Transport::Transport::Connections::Selector::Base`.
selector [Elasticsearch::Transport::Transport::Connections::Selector::Base]

the transport and passed the transport instance.
A specific serializer class to use, will be initialized by
serializer_class [Constant]

A specific transport instance.
transport [Object]

the client and passed hosts and all arguments.
A specific transport class to use, will be initialized by
transport_class [Constant]

Options to be passed to the `Faraday::Connection` constructor.
transport_options [Hash]

A specific adapter for Faraday (e.g. `:patron`)
adapter [Symbol]

The request timeout to be passed to transport in options.
request_timeout [Integer]

Default: false
Reload connections after failure.
reload_on_failure [true|false]

Retry when specific status codes are returned.
retry_on_status [Array]

Default: false
Retry X times when request fails before raising and exception.
retry_on_failure [true|false|Integer]

Default: 1
Timeout for reloading connections in seconds.
sniffer_timeout [Integer]

Default: false
Shuffle connections on initialization and reload.
randomize_hosts [true|false]

Default: false
Reload connections after X requests.
reload_connections [true|false|Integer]

After how many seconds a dead connection should be tried again.
resurrect_after [Float]

:url above is ignored when supplying this option.
Note:
passed as an Array; `host` or `url` keys are also valid.
Single host passed as a String or Hash, or multiple hosts
hosts: [String|Hash|Array]

Default: 'http://localhost:9200'
Fully qualified address to the Elasticsearch service.
url: [String]
Elasticsearch Parameters:

Default: SemanticLogger.application
Name of this application to appear in log messages.
application: [String]

Default: SemanticLogger.host
Name of this host to appear in log messages.
host: [String]

The Proc must return true or false.
Proc: Only include log messages where the supplied Proc returns true
regular expression. All other messages will be ignored.
RegExp: Only include log messages where the class name matches the supplied.
filter: [Regexp|Proc]

Default: :raw_json (See: #call)
the output from this appender
An instance of a class that implements #call, or a Proc to be used to format
formatter: [Object|Proc|Symbol|Hash]

Default: SemanticLogger.default_level
Override the log level for this appender.
level: [:trace | :debug | :info | :warn | :error | :fatal]

Default: 'log'
Deprecated in Elasticsearch 7.0.0.
Document type to associate with logs when they are written.
type: [String]

Default: '%Y.%m.%d'
if you want monthly indexes ('%Y.%m') or weekly ('%Y.%W').
The time format used to generate the full index name. Useful
date_pattern: [String]

Default: 'semantic_logger'
I.e. The final index will look like 'semantic_logger-YYYY.MM.DD'
The final index appends the date so that indexes are used per day.
Prefix of the index to store the logs in Elasticsearch.
index: [String]
Parameters:

Create Elasticsearch appender over persistent HTTP(S)
def initialize(url: "http://localhost:9200",
               index: "semantic_logger",
               date_pattern: "%Y.%m.%d",
               type: "log",
               level: nil,
               formatter: nil,
               filter: nil,
               application: nil,
               environment: nil,
               host: nil,
               data_stream: false,
               **elasticsearch_args,
               &block)
  @url                         = url
  @index                       = index
  @date_pattern                = date_pattern
  @type                        = type
  @elasticsearch_args          = elasticsearch_args.dup
  @elasticsearch_args[:url]    = url if url && !elasticsearch_args[:hosts]
  @elasticsearch_args[:logger] = logger
  @data_stream                 = data_stream
  super(level: level, formatter: formatter, filter: filter, application: application, environment: environment, host: host, metrics: false, &block)
  reopen
end

def log(log)

Log to the index for today
def log(log)
  bulk_payload = formatter.call(log, self)
  write_to_elasticsearch([bulk_index(log), bulk_payload])
  true
end

def reopen

def reopen
  @client = ::Elasticsearch::Client.new(@elasticsearch_args)
end

def version_supports_type?

def version_supports_type?
  Gem::Version.new(::Elasticsearch::VERSION) < Gem::Version.new(7)
end

def write_to_elasticsearch(messages)

def write_to_elasticsearch(messages)
  bulk_result =
    if @data_stream
      @client.bulk(index: index, body: messages)
    else
      @client.bulk(body: messages)
    end
  return unless bulk_result["errors"]
  failed = bulk_result["items"].reject { |x| x["status"] == 201 }
  logger.error("ElasticSearch: Write failed. Messages discarded. : #{failed}")
end