class ElasticAPM::Worker
@api private
def build_timer_task
def build_timer_task Concurrent::TimerTask.new(execution_interval: config.flush_interval) do messages.push(FlushMsg.new) end end
def collect_and_send_transactions
def collect_and_send_transactions return if pending_transactions.empty? transactions = collect_batched_transactions payload = @serializers.transactions.build_all(transactions) begin @adapter.post('/v1/transactions', payload) rescue ::Exception => e fatal 'Failed posting: %s', e.inspect debug e.backtrace.join("\n") nil end end
def collect_batched_transactions
def collect_batched_transactions batch = [] begin while (transaction = pending_transactions.pop(true)) && batch.length <= config.max_queue_size batch << transaction end rescue ThreadError # queue empty end batch end
def initialize(config, messages, pending_transactions, adapter)
def initialize(config, messages, pending_transactions, adapter) @config = config @messages = messages @pending_transactions = pending_transactions @adapter = adapter @serializers = Struct.new(:transactions, :errors).new( Serializers::Transactions.new(config), Serializers::Errors.new(config) ) end
def post_error(msg)
def post_error(msg) payload = @serializers.errors.build_all([msg.error]) @adapter.post('/v1/errors', payload) end
def run_forever
def run_forever @timer_task = build_timer_task.execute while (msg = messages.pop) case msg when ErrorMsg post_error msg when FlushMsg collect_and_send_transactions when StopMsg # empty collected transactions before exiting collect_and_send_transactions stop! end end end
def stop!
def stop! @timer_task && @timer_task.shutdown Thread.exit end