lib/elastic_apm/transport/base.rb



# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# frozen_string_literal: true

require 'elastic_apm/metadata'
require 'elastic_apm/transport/user_agent'
require 'elastic_apm/transport/headers'
require 'elastic_apm/transport/connection'
require 'elastic_apm/transport/worker'
require 'elastic_apm/transport/serializers'
require 'elastic_apm/transport/filters'
require 'elastic_apm/transport/connection/http'

require 'elastic_apm/util/throttle'

module ElasticAPM
  module Transport
    # @api private
    class Base
      include Logging

      WATCHER_EXECUTION_INTERVAL = 5
      WORKER_JOIN_TIMEOUT = 5

      def initialize(config)
        @config = config
        @queue = SizedQueue.new(config.api_buffer_size)

        @serializers = Serializers.new(config)
        @filters = Filters.new(config)

        @stopped = Concurrent::AtomicBoolean.new
        @workers = Array.new(config.pool_size)

        @worker_mutex = Mutex.new
      end

      attr_reader :config, :queue, :filters, :workers, :watcher, :stopped

      def start
        debug '%s: Starting Transport', pid_str
        # Set @stopped to false first, in case transport is restarted;
        # ensure_worker_count requires @stopped to be false
        # ~estolfo
        @stopped.make_false unless @stopped.false?

        ensure_worker_count
        create_watcher
      end

      def stop
        debug '%s: Stopping Transport', pid_str

        @stopped.make_true

        stop_watcher
        stop_workers
      end

      def submit(resource)
        if @stopped.true?
          warn '%s: Transport stopping, no new events accepted', pid_str
          debug 'Dropping: %s', resource.inspect
          return false
        end

        queue.push(resource, true)

        true
      rescue ThreadError
        throttled_queue_full_warning
        nil
      rescue Exception => e
        error '%s: Failed adding to the transport queue: %p', pid_str, e.inspect
        nil
      end

      def add_filter(key, callback)
        @filters.add(key, callback)
      end

      def handle_forking!
        # We can't just stop and start again because the StopMessage
        # will then be the first message processed when the transport is
        # restarted.
        stop_watcher
        ensure_worker_count
        create_watcher
      end

      private

      def pid_str
        format('[PID:%s]', Process.pid)
      end

      def create_watcher
        @watcher = Concurrent::TimerTask.execute(
          execution_interval: WATCHER_EXECUTION_INTERVAL
        ) { ensure_worker_count }
      end

      def ensure_worker_count
        @worker_mutex.synchronize do
          return if all_workers_alive?
          return if stopped.true?

          @workers.map! do |thread|
            next thread if thread&.alive?

            boot_worker
          end
        end
      end

      def all_workers_alive?
        !!workers.all? { |t| t&.alive? }
      end

      def boot_worker
        debug '%s: Booting worker...', pid_str

        Thread.new do
          Worker.new(
            config, queue,
            serializers: @serializers,
            filters: @filters
          ).work_forever
        end
      end

      def stop_workers
        debug '%s: Stopping workers', pid_str

        send_stop_messages

        @worker_mutex.synchronize do
          workers.each do |thread|
            next if thread.nil?
            next if thread.join(WORKER_JOIN_TIMEOUT)

            debug(
              '%s: Worker did not stop in %ds, killing...',
              pid_str, WORKER_JOIN_TIMEOUT
            )
            thread.kill
          end

          # Maintain the @worker array size for when transport is restarted
          @workers.fill(nil)
        end
      end

      def send_stop_messages
        config.pool_size.times { queue.push(Worker::StopMessage.new, true) }
      rescue ThreadError
        warn 'Cannot push stop messages to worker queue as it is full'
      end

      def stop_watcher
        watcher&.shutdown
      end

      def throttled_queue_full_warning
        (@queue_full_log ||= Util::Throttle.new(5) do
          warn(
            '%s: Queue is full (%i items), skipping…',
            pid_str, config.api_buffer_size
          )
        end).call
      end
    end
  end
end