class Appsignal::CheckIn::Scheduler
@!visibility private
def add_event(event)
def add_event(event) # Remove redundant events, keeping the newly added one, which # should be the one with the most recent timestamp. @events.reject! do |existing_event| next unless Event.redundant?(event, existing_event) Appsignal.internal_logger.debug( "Replacing previously scheduled #{Event.describe([existing_event])}" ) true end @events << event end
def initialize
def initialize # The mutex is used to synchronize access to the events array, the # waker thread and the main thread, as well as queue writes # (which depend on the events array) and closes (so they do not # happen at the same time that an event is added to the scheduler) @mutex = Mutex.new # The transmitter thread will be started when an event is first added. @thread = nil @queue = Thread::Queue.new # Scheduled events that have not been sent to the transmitter thread # yet. A copy of this array is pushed to the queue by the waker thread # after it has awaited the debounce period. @events = [] # The waker thread is used to schedule debounces. It will be started # when an event is first added. @waker = nil # For internal testing purposes. @transmitted = 0 end
def push_events
def push_events return if @events.empty? # Push a copy of the events to the queue, and clear the events array. # This ensures that `@events` always contains events that have not # yet been pushed to the queue. events = @events.dup Event.deduplicate_cron!(events) @queue.push(events) @events.clear start_waker(BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS) end
def run
def run loop do events = @queue.pop break if events.nil? transmit(events) @transmitted += 1 end end
def schedule(event)
def schedule(event) unless Appsignal.active? Appsignal.internal_logger.debug( "Cannot transmit #{Event.describe([event])}: AppSignal is not active" ) return end @mutex.synchronize do if @queue.closed? Appsignal.internal_logger.debug( "Cannot transmit #{Event.describe([event])}: AppSignal is stopped" ) return end add_event(event) # If we're not already waiting to be awakened from a scheduled # debounce, schedule a short debounce, which will push the events # to the queue and schedule a long debounce. start_waker(INITIAL_DEBOUNCE_SECONDS) if @waker.nil? Appsignal.internal_logger.debug( "Scheduling #{Event.describe([event])} to be transmitted" ) # Make sure to start the thread after an event has been added. @thread ||= Thread.new(&method(:run)) end end
def start_waker(debounce)
def start_waker(debounce) stop_waker @waker = Thread.new do sleep(debounce) @mutex.synchronize do # Make sure this waker doesn't get killed, so it can push # events and schedule a new waker. @waker = nil push_events end end end
def stop
def stop @mutex.synchronize do # Flush all events before closing the queue. push_events rescue ClosedQueueError # The queue is already closed (by a previous call to `#stop`) # so it is not possible to push events to it anymore. ensure # Ensure calling `#stop` closes the queue and kills # the waker thread, disallowing any further events from being # scheduled with `#schedule`. stop_waker @queue.close # Block until the thread has finished. @thread&.join end end
def stop_waker
def stop_waker @waker&.kill @waker&.join @waker = nil end
def transmit(events)
def transmit(events) description = Event.describe(events) begin @transmitter ||= Transmitter.new( "#{Appsignal.config[:logging_endpoint]}/check_ins/json" ) response = @transmitter.transmit(events, :format => :ndjson) if (200...300).include?(response.code.to_i) Appsignal.internal_logger.debug("Transmitted #{description}") else Appsignal.internal_logger.error( "Failed to transmit #{description}: #{response.code} status code" ) end rescue => e Appsignal.internal_logger .error("Failed to transmit #{description}: #{e.class}: #{e.message}") end end