class ElasticAPM::Transport::Worker

@api private

def adapter

def adapter
  @adapter ||= Connection
end

def initialize(

def initialize(
  config,
  queue,
  serializers:,
  filters:
)
  @config = config
  @queue = queue
  @serializers = serializers
  @filters = filters
  @connection = self.class.adapter.new(config)
end

def process(resource)

def process(resource)
  return unless (json = serialize_and_filter(resource))
  connection.write(json)
end

def serialize_and_filter(resource)

def serialize_and_filter(resource)
  if resource.respond_to?(:prepare_for_serialization!)
    resource.prepare_for_serialization!
  end
  serialized = serializers.serialize(resource)
  # if a filter returns nil, it means skip the event
  return nil if @filters.apply!(serialized) == Filters::SKIP
  JSON.fast_generate(serialized)
rescue Exception
  error format('Failed converting event to JSON: %s', resource.inspect)
  error serialized.inspect
  nil
end

def work_forever

def work_forever
  while (msg = queue.pop)
    case msg
    when StopMessage
      debug 'Stopping worker [%s]', self
      connection.flush(:halt)
      break
    else
      process msg
    end
  end
rescue Exception => e
  warn 'Worker died with exception: %s', e.inspect
  debug e.backtrace.join("\n")
end