class SemanticLogger::Appender::Async

Allow any appender to run asynchronously in a separate thread.

def active?

Returns true if the worker thread is active
def active?
  @thread&.alive?
end

def capped?

Returns [true|false] if the queue has a capped size.
def capped?
  @capped
end

def check_lag(log)

def check_lag(log)
  diff = Time.now - log.time
  return unless diff > lag_threshold_s
  logger.warn "Async: Appender thread has fallen behind by #{diff} seconds with #{queue.size} messages queued up. Consider reducing the log level or changing the appenders"
end

def close

Close all appenders and flush any outstanding messages.
def close
  # TODO: Prevent new close requests once this appender has been closed.
  submit_request(:close)
end

def create_queue

def create_queue
  if max_queue_size == -1
    @queue  = Queue.new
    @capped = false
  else
    @queue  = SizedQueue.new(max_queue_size)
    @capped = true
  end
end

def flush

All queued log messages are written and then each appender is flushed in turn.
Flush all queued log entries disk, database, etc.
def flush
  submit_request(:flush)
end

def initialize(appender:,

Default: 1,000
Number of messages to process before checking for slow logging.
lag_check_interval: [Integer]

Default: 30
Log a warning when a log message has been on the queue for longer than this period in seconds.
lag_threshold_s [Float]

Default: 10,000
-1: The queue size is uncapped and will never block no matter how long the queue is.
The maximum number of log messages to hold on the queue before blocking attempts to add to the queue.
max_queue_size: [Integer]
Parameters:

Appender proxy to allow an existing appender to run asynchronously in a separate thread.
def initialize(appender:,
               max_queue_size: 10_000,
               lag_check_interval: 1_000,
               lag_threshold_s: 30)
  @appender           = appender
  @lag_check_interval = lag_check_interval
  @lag_threshold_s    = lag_threshold_s
  @thread             = nil
  @max_queue_size     = max_queue_size
  create_queue
  thread
end

def log(log)

Add log message for processing.
def log(log)
  queue << log
end

def process

Separate thread for batching up log messages before writing.
def process
  # This thread is designed to never go down unless the main thread terminates
  # or the appender is closed.
  Thread.current.name = logger.name
  logger.trace "Async: Appender thread active"
  begin
    process_messages
  rescue StandardError => e
    # This block may be called after the file handles have been released by Ruby
    begin
      logger.error("Async: Restarting due to exception", e)
    rescue StandardError
      nil
    end
    retry
  rescue Exception => e
    # This block may be called after the file handles have been released by Ruby
    begin
      logger.error("Async: Stopping due to fatal exception", e)
    rescue StandardError
      nil
    end
  ensure
    @thread = nil
    # This block may be called after the file handles have been released by Ruby
    begin
      logger.trace("Async: Thread has stopped")
    rescue StandardError
      nil
    end
  end
end

def process_message(message)

Returns false when message processing should be stopped
def process_message(message)
  case message[:command]
  when :flush
    appender.flush
    message[:reply_queue] << true if message[:reply_queue]
  when :close
    appender.close
    message[:reply_queue] << true if message[:reply_queue]
    return false
  else
    logger.warn "Async: Appender thread: Ignoring unknown command: #{message[:command]}"
  end
  true
end

def process_messages

def process_messages
  count = 0
  while (message = queue.pop)
    if message.is_a?(Log)
      appender.log(message)
      count += 1
      # Check every few log messages whether this appender thread is falling behind
      if count > lag_check_interval
        check_lag(message)
        count = 0
      end
    else
      break unless process_message(message)
    end
  end
  logger.trace "Async: Queue Closed"
end

def reopen

Re-open appender after a fork
def reopen
  # Workaround CRuby crash on fork by recreating queue on reopen
  #   https://github.com/reidmorrison/semantic_logger/issues/103
  @queue&.close
  create_queue
  appender.reopen if appender.respond_to?(:reopen)
  @thread&.kill if @thread&.alive?
  @thread = Thread.new { process }
end

def submit_request(command)

Submit command and wait for reply
def submit_request(command)
  return false unless active?
  queue_size = queue.size
  msg        = "Async: Queued log messages: #{queue_size}, running command: #{command}"
  if queue_size > 1_000
    logger.warn msg
  elsif queue_size > 100
    logger.info msg
  elsif queue_size.positive?
    logger.trace msg
  end
  reply_queue = Queue.new
  queue << {command: command, reply_queue: reply_queue}
  reply_queue.pop
end

def thread

Starts the worker thread if not running.

Returns [Thread] the worker thread.
def thread
  return @thread if @thread&.alive?
  @thread = Thread.new { process }
end