class ElasticAPM::TimedWorker
@api private
def collect_and_send_transactions
def collect_and_send_transactions return if pending_transactions.empty? transactions = collect_batched_transactions payload = @serializers.transactions.build_all(transactions) begin @adapter.post('/v1/transactions', payload) rescue ::Exception => e fatal 'Failed posting: %s', e.inspect debug e.backtrace.join("\n") nil end @last_sent_transactions = Time.now end
def collect_batched_transactions
def collect_batched_transactions batch = [] begin while (transaction = pending_transactions.pop(true)) && batch.length <= config.max_queue_size batch << transaction end rescue ThreadError # queue empty end batch end
def initialize(config, messages, pending_transactions, adapter)
def initialize(config, messages, pending_transactions, adapter) @config = config @messages = messages @pending_transactions = pending_transactions @adapter = adapter @last_sent_transactions = Time.now.utc @serializers = Struct.new(:transactions, :errors).new( Serializers::Transactions.new(config), Serializers::Errors.new(config) ) end
def post_error(msg)
def post_error(msg) payload = @serializers.errors.build_all([msg.error]) @adapter.post('/v1/errors', payload) end
def process_messages
def process_messages should_exit = false while (msg = messages.pop(true)) case msg when ErrorMsg post_error msg when StopMsg should_exit = true # empty collected transactions before exiting collect_and_send_transactions end end rescue ThreadError # queue empty Thread.exit if should_exit end
def run_forever
def run_forever loop do run_once sleep SLEEP_INTERVAL end end
def run_once
def run_once collect_and_send_transactions if should_flush_transactions? process_messages end
def should_flush_transactions?
def should_flush_transactions? interval = config.flush_interval return true if interval.nil? return true if pending_transactions.length >= config.max_queue_size Time.now.utc - @last_sent_transactions >= interval end