lib/elastic_apm/instrumenter.rb



# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# frozen_string_literal: true

require 'elastic_apm/trace_context'
require 'elastic_apm/child_durations'
require 'elastic_apm/span'
require 'elastic_apm/transaction'
require 'elastic_apm/span_helpers'

module ElasticAPM
  # @api private
  class Instrumenter
    TRANSACTION_KEY = :__elastic_instrumenter_transaction_key
    SPAN_KEY = :__elastic_instrumenter_spans_key

    include Logging

    # @api private
    class Current
      def initialize
        self.transaction = nil
        self.spans = []
      end

      def transaction
        Thread.current[TRANSACTION_KEY]
      end

      def transaction=(transaction)
        Thread.current[TRANSACTION_KEY] = transaction
      end

      def spans
        Thread.current[SPAN_KEY] ||= []
      end

      def spans=(spans)
        Thread.current[SPAN_KEY] ||= []
        Thread.current[SPAN_KEY] = spans
      end
    end

    def initialize(config, metrics:, stacktrace_builder:, &enqueue)
      @config = config
      @stacktrace_builder = stacktrace_builder
      @enqueue = enqueue
      @metrics = metrics

      @current = Current.new
    end

    attr_reader :stacktrace_builder, :enqueue

    def start
      debug 'Starting instrumenter'
      # We call register! on @subscriber in case the
      # instrumenter was stopped and started again
      @subscriber&.register!
    end

    def stop
      debug 'Stopping instrumenter'

      self.current_transaction = nil
      current_spans.pop until current_spans.empty?

      @subscriber&.unregister!
    end

    def handle_forking!
      stop
      start
    end

    def subscriber=(subscriber)
      debug 'Registering ActiveSupport::Notifications subscriber'
      @subscriber = subscriber
      @subscriber.register!
    end

    # transactions

    def current_transaction
      @current.transaction
    end

    def current_transaction=(transaction)
      @current.transaction = transaction
    end

    def start_transaction(
      name = nil,
      type = nil,
      config:,
      context: nil,
      trace_context: nil
    )
      return nil unless config.instrument?

      if (transaction = current_transaction)
        raise ExistingTransactionError,
          "Transactions may not be nested.\n" \
          "Already inside #{transaction.inspect}"
      end

      if trace_context
        sampled = trace_context.recorded?
        sample_rate = trace_context.tracestate.sample_rate
      else
        sampled = random_sample?(config)
        sample_rate = config.transaction_sample_rate
      end

      transaction =
        Transaction.new(
          name,
          type,
          context: context,
          trace_context: trace_context,
          sampled: sampled,
          sample_rate: sample_rate,
          config: config
        )

      transaction.start

      self.current_transaction = transaction
    end

    def end_transaction(result = nil)
      return nil unless (transaction = current_transaction)

      self.current_transaction = nil

      transaction.done result

      enqueue.call transaction

      update_transaction_metrics(transaction)

      transaction
    end

    # spans

    def current_spans
      @current.spans
    end

    def current_span
      current_spans.last
    end

    # rubocop:disable Metrics/CyclomaticComplexity
    # rubocop:disable Metrics/PerceivedComplexity
    # rubocop:disable Metrics/ParameterLists
    def start_span(
      name,
      type = nil,
      subtype: nil,
      action: nil,
      backtrace: nil,
      context: nil,
      trace_context: nil,
      parent: nil,
      sync: nil
    )

      transaction =
        case parent
        when Span
          parent.transaction
        when Transaction
          parent
        else
          current_transaction
        end
      return unless transaction
      return unless transaction.sampled?
      return unless transaction.inc_started_spans!

      parent ||= (current_span || current_transaction)

      span = Span.new(
        name: name,
        subtype: subtype,
        action: action,
        transaction: transaction,
        parent: parent,
        trace_context: trace_context,
        type: type,
        context: context,
        stacktrace_builder: stacktrace_builder,
        sync: sync
      )

      if backtrace && transaction.span_frames_min_duration
        span.original_backtrace = backtrace
      end

      current_spans.push span

      span.start
    end
    # rubocop:enable Metrics/ParameterLists
    # rubocop:enable Metrics/CyclomaticComplexity
    # rubocop:enable Metrics/PerceivedComplexity

    def end_span
      return unless (span = current_spans.pop)

      span.done

      enqueue.call span

      update_span_metrics(span)

      span
    end

    # metadata

    def set_label(key, value)
      return unless current_transaction

      key = key.to_s.gsub(/[."*]/, '_').to_sym
      current_transaction.context.labels[key] = value
    end

    def set_custom_context(context)
      return unless current_transaction
      current_transaction.context.custom.merge!(context)
    end

    def set_user(user)
      return unless current_transaction
      current_transaction.set_user(user)
    end

    def inspect
      '<ElasticAPM::Instrumenter ' \
        "current_transaction=#{current_transaction.inspect}" \
        '>'
    end

    private

    def random_sample?(config)
      rand <= config.transaction_sample_rate
    end

    def update_transaction_metrics(transaction)
      return unless transaction.collect_metrics?

      tags = {
        'transaction.name': transaction.name,
        'transaction.type': transaction.type
      }

      @metrics.get(:transaction).timer(
        :'transaction.duration.sum.us',
        tags: tags, reset_on_collect: true
      ).update(transaction.duration)

      @metrics.get(:transaction).counter(
        :'transaction.duration.count',
        tags: tags, reset_on_collect: true
      ).inc!

      return unless transaction.sampled?
      return unless transaction.breakdown_metrics

      @metrics.get(:breakdown).counter(
        :'transaction.breakdown.count',
        tags: tags, reset_on_collect: true
      ).inc!

      span_tags = tags.merge('span.type': 'app')

      @metrics.get(:breakdown).timer(
        :'span.self_time.sum.us',
        tags: span_tags, reset_on_collect: true
      ).update(transaction.self_time)

      @metrics.get(:breakdown).counter(
        :'span.self_time.count',
        tags: span_tags, reset_on_collect: true
      ).inc!
    end

    def update_span_metrics(span)
      return unless span.transaction.collect_metrics?

      tags = {
        'span.type': span.type,
        'transaction.name': span.transaction.name,
        'transaction.type': span.transaction.type
      }

      tags[:'span.subtype'] = span.subtype if span.subtype

      @metrics.get(:breakdown).timer(
        :'span.self_time.sum.us',
        tags: tags, reset_on_collect: true
      ).update(span.self_time)

      @metrics.get(:breakdown).counter(
        :'span.self_time.count',
        tags: tags, reset_on_collect: true
      ).inc!
    end
  end
end