lib/airbrake-ruby/backlog.rb
module Airbrake # Backlog accepts notices and APM events and synchronously sends them in the # background at regular intervals. The backlog is a queue of data that failed # to be sent due to some error. In a nutshell, it's a retry mechanism. # # @api private # @since v6.2.0 class Backlog include Loggable # @return [Integer] how many records to keep in the backlog BACKLOG_SIZE = 100 # @return [Integer] flush period in seconds TWO_MINUTES = 60 * 2 def initialize(sync_sender, flush_period = TWO_MINUTES) @sync_sender = sync_sender @flush_period = flush_period @queue = SizedQueue.new(BACKLOG_SIZE).extend(MonitorMixin) @has_backlog_data = @queue.new_cond @schedule_flush = nil @seen = Set.new end # Appends data to the backlog. Once appended, the flush schedule will # start. Chainable. # # @example # backlog << [{ 'data' => 1 }, 'https://airbrake.io/api'] # # @param [Array<#to_json, String>] data An array of two elements, where the # first element is the data we are sending and the second element is the # URL that we are sending to # @return [self] def <<(data) @queue.synchronize do return self if @seen.include?(data) @seen << data begin @queue.push(data, true) rescue ThreadError logger.error("#{LOG_LABEL} Airbrake::Backlog full") return self end @has_backlog_data.signal schedule_flush self end end # Closes all the resources that this sender has allocated. # # @return [void] # @since v6.2.0 def close @queue.synchronize do if @schedule_flush @schedule_flush.kill logger.debug("#{LOG_LABEL} Airbrake::Backlog closed") end end end private def schedule_flush @schedule_flush ||= Thread.new do loop do @queue.synchronize do wait next if @queue.empty? flush end end end end def wait @has_backlog_data.wait(@flush_period) while time_elapsed < @flush_period @last_flush = nil end def time_elapsed MonotonicTime.time_in_s - last_flush end def last_flush @last_flush ||= MonotonicTime.time_in_s end def flush unless @queue.empty? logger.debug( "#{LOG_LABEL} Airbrake::Backlog flushing #{@queue.size} messages", ) end failed = 0 until @queue.empty? data, endpoint = @queue.pop promise = Airbrake::Promise.new @sync_sender.send(data, promise, endpoint) failed += 1 if promise.rejected? end if failed > 0 logger.debug( "#{LOG_LABEL} Airbrake::Backlog #{failed} messages were not flushed", ) end @seen.clear end end end