lib/airbrake-ruby/backlog.rb



module Airbrake
  # Backlog accepts notices and APM events and synchronously sends them in the
  # background at regular intervals. The backlog is a queue of data that failed
  # to be sent due to some error. In a nutshell, it's a retry mechanism.
  #
  # @api private
  # @since v6.2.0
  class Backlog
    include Loggable

    # @return [Integer] how many records to keep in the backlog
    BACKLOG_SIZE = 100

    # @return [Integer] flush period in seconds
    TWO_MINUTES = 60 * 2

    def initialize(sync_sender, flush_period = TWO_MINUTES)
      @sync_sender = sync_sender
      @flush_period = flush_period
      @queue = SizedQueue.new(BACKLOG_SIZE).extend(MonitorMixin)
      @has_backlog_data = @queue.new_cond
      @schedule_flush = nil

      @seen = Set.new
    end

    # Appends data to the backlog. Once appended, the flush schedule will
    # start. Chainable.
    #
    # @example
    #   backlog << [{ 'data' => 1 }, 'https://airbrake.io/api']
    #
    # @param [Array<#to_json, String>] data An array of two elements, where the
    #   first element is the data we are sending and the second element is the
    #   URL that we are sending to
    # @return [self]
    def <<(data)
      @queue.synchronize do
        return self if @seen.include?(data)

        @seen << data

        begin
          @queue.push(data, true)
        rescue ThreadError
          logger.error("#{LOG_LABEL} Airbrake::Backlog full")
          return self
        end

        @has_backlog_data.signal
        schedule_flush

        self
      end
    end

    # Closes all the resources that this sender has allocated.
    #
    # @return [void]
    # @since v6.2.0
    def close
      @queue.synchronize do
        if @schedule_flush
          @schedule_flush.kill
          logger.debug("#{LOG_LABEL} Airbrake::Backlog closed")
        end
      end
    end

    private

    def schedule_flush
      @schedule_flush ||= Thread.new do
        loop do
          @queue.synchronize do
            wait
            next if @queue.empty?

            flush
          end
        end
      end
    end

    def wait
      @has_backlog_data.wait(@flush_period) while time_elapsed < @flush_period
      @last_flush = nil
    end

    def time_elapsed
      MonotonicTime.time_in_s - last_flush
    end

    def last_flush
      @last_flush ||= MonotonicTime.time_in_s
    end

    def flush
      unless @queue.empty?
        logger.debug(
          "#{LOG_LABEL} Airbrake::Backlog flushing #{@queue.size} messages",
        )
      end

      failed = 0

      until @queue.empty?
        data, endpoint = @queue.pop
        promise = Airbrake::Promise.new
        @sync_sender.send(data, promise, endpoint)
        failed += 1 if promise.rejected?
      end

      if failed > 0
        logger.debug(
          "#{LOG_LABEL} Airbrake::Backlog #{failed} messages were not flushed",
        )
      end

      @seen.clear
    end
  end
end