class ElasticAPM::Transport::Base

@api private

def add_filter(key, callback)

def add_filter(key, callback)
  @filters.add(key, callback)
end

def all_workers_alive?

def all_workers_alive?
  !!workers.all? { |t| t&.alive? }
end

def boot_worker

def boot_worker
  debug '%s: Booting worker...', pid_str
  Thread.new do
    Worker.new(
      config, queue,
      serializers: @serializers,
      filters: @filters
    ).work_forever
  end
end

def create_watcher

def create_watcher
  @watcher = Concurrent::TimerTask.execute(
    execution_interval: WATCHER_EXECUTION_INTERVAL
  ) { ensure_worker_count }
end

def ensure_worker_count

def ensure_worker_count
  @worker_mutex.synchronize do
    return if all_workers_alive?
    return if stopped.true?
    @workers.map! do |thread|
      next thread if thread&.alive?
      boot_worker
    end
  end
end

def handle_forking!

def handle_forking!
  # We can't just stop and start again because the StopMessage
  # will then be the first message processed when the transport is
  # restarted.
  stop_watcher
  ensure_worker_count
  create_watcher
end

def initialize(config)

def initialize(config)
  @config = config
  @queue = SizedQueue.new(config.api_buffer_size)
  @serializers = Serializers.new(config)
  @filters = Filters.new(config)
  @stopped = Concurrent::AtomicBoolean.new
  @workers = Array.new(config.pool_size)
  @worker_mutex = Mutex.new
end

def pid_str

def pid_str
  format('[PID:%s]', Process.pid)
end

def send_stop_messages

def send_stop_messages
  config.pool_size.times { queue.push(Worker::StopMessage.new, true) }
rescue ThreadError
  warn 'Cannot push stop messages to worker queue as it is full'
end

def start

def start
  debug '%s: Starting Transport', pid_str
  # Set @stopped to false first, in case transport is restarted;
  # ensure_worker_count requires @stopped to be false
  # ~estolfo
  @stopped.make_false unless @stopped.false?
  ensure_worker_count
  create_watcher
end

def stop

def stop
  debug '%s: Stopping Transport', pid_str
  @stopped.make_true
  stop_watcher
  stop_workers
end

def stop_watcher

def stop_watcher
  watcher&.shutdown
end

def stop_workers

def stop_workers
  debug '%s: Stopping workers', pid_str
  send_stop_messages
  @worker_mutex.synchronize do
    workers.each do |thread|
      next if thread.nil?
      next if thread.join(WORKER_JOIN_TIMEOUT)
      debug(
        '%s: Worker did not stop in %ds, killing...',
        pid_str, WORKER_JOIN_TIMEOUT
      )
      thread.kill
    end
    # Maintain the @worker array size for when transport is restarted
    @workers.fill(nil)
  end
end

def submit(resource)

def submit(resource)
  if @stopped.true?
    warn '%s: Transport stopping, no new events accepted', pid_str
    debug 'Dropping: %s', resource.inspect
    return false
  end
  queue.push(resource, true)
  true
rescue ThreadError
  throttled_queue_full_warning
  nil
rescue Exception => e
  error '%s: Failed adding to the transport queue: %p', pid_str, e.inspect
  nil
end

def throttled_queue_full_warning

def throttled_queue_full_warning
  (@queue_full_log ||= Util::Throttle.new(5) do
    warn(
      '%s: Queue is full (%i items), skipping…',
      pid_str, config.api_buffer_size
    )
  end).call
end