class ElasticAPM::Transport::Base
@api private
def add_filter(key, callback)
def add_filter(key, callback) @filters.add(key, callback) end
def all_workers_alive?
def all_workers_alive? !!workers.all? { |t| t&.alive? } end
def boot_worker
def boot_worker debug '%s: Booting worker...', pid_str Thread.new do Worker.new( config, queue, serializers: @serializers, filters: @filters ).work_forever end end
def create_watcher
def create_watcher @watcher = Concurrent::TimerTask.execute( execution_interval: WATCHER_EXECUTION_INTERVAL ) { ensure_worker_count } end
def ensure_worker_count
def ensure_worker_count @worker_mutex.synchronize do return if all_workers_alive? return if stopped.true? @workers.map! do |thread| next thread if thread&.alive? boot_worker end end end
def handle_forking!
def handle_forking! # We can't just stop and start again because the StopMessage # will then be the first message processed when the transport is # restarted. stop_watcher ensure_worker_count create_watcher end
def initialize(config)
def initialize(config) @config = config @queue = SizedQueue.new(config.api_buffer_size) @serializers = Serializers.new(config) @filters = Filters.new(config) @stopped = Concurrent::AtomicBoolean.new @workers = Array.new(config.pool_size) @worker_mutex = Mutex.new end
def pid_str
def pid_str format('[PID:%s]', Process.pid) end
def send_stop_messages
def send_stop_messages config.pool_size.times { queue.push(Worker::StopMessage.new, true) } rescue ThreadError warn 'Cannot push stop messages to worker queue as it is full' end
def start
def start debug '%s: Starting Transport', pid_str # Set @stopped to false first, in case transport is restarted; # ensure_worker_count requires @stopped to be false # ~estolfo @stopped.make_false unless @stopped.false? ensure_worker_count create_watcher end
def stop
def stop debug '%s: Stopping Transport', pid_str @stopped.make_true stop_watcher stop_workers end
def stop_watcher
def stop_watcher watcher&.shutdown end
def stop_workers
def stop_workers debug '%s: Stopping workers', pid_str send_stop_messages @worker_mutex.synchronize do workers.each do |thread| next if thread.nil? next if thread.join(WORKER_JOIN_TIMEOUT) debug( '%s: Worker did not stop in %ds, killing...', pid_str, WORKER_JOIN_TIMEOUT ) thread.kill end # Maintain the @worker array size for when transport is restarted @workers.fill(nil) end end
def submit(resource)
def submit(resource) if @stopped.true? warn '%s: Transport stopping, no new events accepted', pid_str debug 'Dropping: %s', resource.inspect return false end queue.push(resource, true) true rescue ThreadError throttled_queue_full_warning nil rescue Exception => e error '%s: Failed adding to the transport queue: %p', pid_str, e.inspect nil end
def throttled_queue_full_warning
def throttled_queue_full_warning (@queue_full_log ||= Util::Throttle.new(5) do warn( '%s: Queue is full (%i items), skipping…', pid_str, config.api_buffer_size ) end).call end