class ElasticAPM::Transport::Base

@api private

def add_filter(key, callback)

def add_filter(key, callback)
  @filters.add(key, callback)
end

def boot_worker

rubocop:disable Metrics/MethodLength
def boot_worker
  worker = Worker.new(
    config,
    queue,
    serializers: @serializers,
    filters: @filters
  )
  @workers.push worker
  @pool.post do
    worker.work_forever
    @workers.delete(worker)
  end
end

def ensure_worker_count

def ensure_worker_count
  missing = config.pool_size - @workers.length
  return unless missing > 0
  info 'Booting %i workers', missing
  missing.times { boot_worker }
end

def initialize(config)

def initialize(config)
  @config = config
  @queue = SizedQueue.new(config.api_buffer_size)
  @pool = Concurrent::FixedThreadPool.new(config.pool_size)
  @workers = []
  @serializers = Serializers.new(config)
  @filters = Filters.new(config)
end

def send_stop_messages

def send_stop_messages
  @workers.each { queue.push(Worker::StopMessage.new, true) }
rescue ThreadError
  warn 'Cannot push stop messages to worker queue as it is full'
end

def start

def start
  ensure_worker_count
end

def stop

def stop
  stop_workers
end

def stop_workers

def stop_workers
  return unless @pool.running?
  debug 'Stopping workers'
  send_stop_messages
  debug 'Shutting down pool'
  @pool.shutdown
  return if @pool.wait_for_termination(5)
  warn "Worker pool didn't close in 5 secs, killing ..."
  @pool.kill
end

def submit(resource)

def submit(resource)
  queue.push(resource, true)
  ensure_worker_count
rescue ThreadError
  warn 'Queue is full (%i items), skipping…', config.api_buffer_size
  nil
end