class SemanticLogger::Appender::Async
Allow any appender to run asynchronously in a separate thread.
def active?
def active? @thread&.alive? end
def capped?
def capped? @capped end
def check_lag(log)
def check_lag(log) diff = Time.now - log.time return unless diff > lag_threshold_s logger.warn "Async: Appender thread has fallen behind by #{diff} seconds with #{queue.size} messages queued up. Consider reducing the log level or changing the appenders" end
def close
def close # TODO: Prevent new close requests once this appender has been closed. submit_request(:close) end
def create_queue
def create_queue if max_queue_size == -1 @queue = Queue.new @capped = false else @queue = SizedQueue.new(max_queue_size) @capped = true end end
def flush
Flush all queued log entries disk, database, etc.
def flush submit_request(:flush) end
def initialize(appender:,
Number of messages to process before checking for slow logging.
lag_check_interval: [Integer]
Default: 30
Log a warning when a log message has been on the queue for longer than this period in seconds.
lag_threshold_s [Float]
Default: 10,000
-1: The queue size is uncapped and will never block no matter how long the queue is.
The maximum number of log messages to hold on the queue before blocking attempts to add to the queue.
max_queue_size: [Integer]
Parameters:
Appender proxy to allow an existing appender to run asynchronously in a separate thread.
def initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30) @appender = appender @lag_check_interval = lag_check_interval @lag_threshold_s = lag_threshold_s @thread = nil @max_queue_size = max_queue_size create_queue thread end
def log(log)
def log(log) queue << log end
def process
def process # This thread is designed to never go down unless the main thread terminates # or the appender is closed. Thread.current.name = logger.name logger.trace "Async: Appender thread active" begin process_messages rescue StandardError => e # This block may be called after the file handles have been released by Ruby begin logger.error("Async: Restarting due to exception", e) rescue StandardError nil end retry rescue Exception => e # This block may be called after the file handles have been released by Ruby begin logger.error("Async: Stopping due to fatal exception", e) rescue StandardError nil end ensure @thread = nil # This block may be called after the file handles have been released by Ruby begin logger.trace("Async: Thread has stopped") rescue StandardError nil end end end
def process_message(message)
def process_message(message) case message[:command] when :flush appender.flush message[:reply_queue] << true if message[:reply_queue] when :close appender.close message[:reply_queue] << true if message[:reply_queue] return false else logger.warn "Async: Appender thread: Ignoring unknown command: #{message[:command]}" end true end
def process_messages
def process_messages count = 0 while (message = queue.pop) if message.is_a?(Log) appender.log(message) count += 1 # Check every few log messages whether this appender thread is falling behind if count > lag_check_interval check_lag(message) count = 0 end else break unless process_message(message) end end logger.trace "Async: Queue Closed" end
def reopen
def reopen # Workaround CRuby crash on fork by recreating queue on reopen # https://github.com/reidmorrison/semantic_logger/issues/103 @queue&.close create_queue appender.reopen if appender.respond_to?(:reopen) @thread&.kill if @thread&.alive? @thread = Thread.new { process } end
def submit_request(command)
def submit_request(command) return false unless active? queue_size = queue.size msg = "Async: Queued log messages: #{queue_size}, running command: #{command}" if queue_size > 1_000 logger.warn msg elsif queue_size > 100 logger.info msg elsif queue_size.positive? logger.trace msg end reply_queue = Queue.new queue << {command: command, reply_queue: reply_queue} reply_queue.pop end
def thread
Returns [Thread] the worker thread.
def thread return @thread if @thread&.alive? @thread = Thread.new { process } end