class ElasticAPM::Worker

@api private

def build_timer_task

def build_timer_task
  Concurrent::TimerTask.new(execution_interval: config.flush_interval) do
    messages.push(FlushMsg.new)
  end
end

def collect_and_send_transactions

def collect_and_send_transactions
  return if pending_transactions.empty?
  transactions = collect_batched_transactions
  payload = @serializers.transactions.build_all(transactions)
  begin
    @adapter.post('/v1/transactions', payload)
  rescue ::Exception => e
    fatal 'Failed posting: %s', e.inspect
    debug e.backtrace.join("\n")
    nil
  end
end

def collect_batched_transactions

def collect_batched_transactions
  batch = []
  begin
    while (transaction = pending_transactions.pop(true)) &&
          batch.length <= config.max_queue_size
      batch << transaction
    end
  rescue ThreadError # queue empty
  end
  batch
end

def initialize(config, messages, pending_transactions, adapter)

def initialize(config, messages, pending_transactions, adapter)
  @config = config
  @messages = messages
  @pending_transactions = pending_transactions
  @adapter = adapter
  @serializers = Struct.new(:transactions, :errors).new(
    Serializers::Transactions.new(config),
    Serializers::Errors.new(config)
  )
end

def post_error(msg)

def post_error(msg)
  payload = @serializers.errors.build_all([msg.error])
  @adapter.post('/v1/errors', payload)
end

def run_forever

rubocop:disable Metrics/MethodLength
def run_forever
  @timer_task = build_timer_task.execute
  while (msg = messages.pop)
    case msg
    when ErrorMsg
      post_error msg
    when FlushMsg
      collect_and_send_transactions
    when StopMsg
      # empty collected transactions before exiting
      collect_and_send_transactions
      stop!
    end
  end
end

def stop!

def stop!
  @timer_task && @timer_task.shutdown
  Thread.exit
end