lib/active_support/notifications/fanout.rb



# frozen_string_literal: true

require "mutex_m"
require "concurrent/map"
require "set"
require "active_support/core_ext/object/try"

module ActiveSupport
  module Notifications
    class InstrumentationSubscriberError < RuntimeError
      attr_reader :exceptions

      def initialize(exceptions)
        @exceptions = exceptions
        exception_class_names = exceptions.map { |e| e.class.name }
        super "Exception(s) occurred within instrumentation subscribers: #{exception_class_names.join(', ')}"
      end
    end

    module FanoutIteration # :nodoc:
      private
        def iterate_guarding_exceptions(collection)
          exceptions = nil

          collection.each do |s|
            yield s
          rescue Exception => e
            exceptions ||= []
            exceptions << e
          end

          if exceptions
            exceptions = exceptions.flat_map do |exception|
              exception.is_a?(InstrumentationSubscriberError) ? exception.exceptions : [exception]
            end
            if exceptions.size == 1
              raise exceptions.first
            else
              raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first
            end
          end

          collection
        end
    end

    # This is a default queue implementation that ships with Notifications.
    # It just pushes events to all registered log subscribers.
    #
    # This class is thread safe. All methods are reentrant.
    class Fanout
      include Mutex_m

      def initialize
        @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } }
        @other_subscribers = []
        @all_listeners_for = Concurrent::Map.new
        @groups_for = Concurrent::Map.new
        @silenceable_groups_for = Concurrent::Map.new
        super
      end

      def inspect # :nodoc:
        total_patterns = @string_subscribers.size + @other_subscribers.size
        "#<#{self.class} (#{total_patterns} patterns)>"
      end

      def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
        subscriber = Subscribers.new(pattern, callable || block, monotonic)
        synchronize do
          case pattern
          when String
            @string_subscribers[pattern] << subscriber
            clear_cache(pattern)
          when NilClass, Regexp
            @other_subscribers << subscriber
            clear_cache
          else
            raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
          end
        end
        subscriber
      end

      def unsubscribe(subscriber_or_name)
        synchronize do
          case subscriber_or_name
          when String
            @string_subscribers[subscriber_or_name].clear
            clear_cache(subscriber_or_name)
            @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
          else
            pattern = subscriber_or_name.try(:pattern)
            if String === pattern
              @string_subscribers[pattern].delete(subscriber_or_name)
              clear_cache(pattern)
            else
              @other_subscribers.delete(subscriber_or_name)
              clear_cache
            end
          end
        end
      end

      def clear_cache(key = nil) # :nodoc:
        if key
          @all_listeners_for.delete(key)
          @groups_for.delete(key)
          @silenceable_groups_for.delete(key)
        else
          @all_listeners_for.clear
          @groups_for.clear
          @silenceable_groups_for.clear
        end
      end

      class BaseGroup # :nodoc:
        include FanoutIteration

        def initialize(listeners, name, id, payload)
          @listeners = listeners
        end

        def each(&block)
          iterate_guarding_exceptions(@listeners, &block)
        end
      end

      class BaseTimeGroup < BaseGroup # :nodoc:
        def start(name, id, payload)
          @start_time = now
        end

        def finish(name, id, payload)
          stop_time = now
          each do |listener|
            listener.call(name, @start_time, stop_time, id, payload)
          end
        end
      end

      class MonotonicTimedGroup < BaseTimeGroup # :nodoc:
        private
          def now
            Process.clock_gettime(Process::CLOCK_MONOTONIC)
          end
      end

      class TimedGroup < BaseTimeGroup # :nodoc:
        private
          def now
            Time.now
          end
      end

      class EventedGroup < BaseGroup # :nodoc:
        def start(name, id, payload)
          each do |s|
            s.start(name, id, payload)
          end
        end

        def finish(name, id, payload)
          each do |s|
            s.finish(name, id, payload)
          end
        end
      end

      class EventObjectGroup < BaseGroup # :nodoc:
        def start(name, id, payload)
          @event = build_event(name, id, payload)
          @event.start!
        end

        def finish(name, id, payload)
          @event.payload = payload
          @event.finish!

          each do |s|
            s.call(@event)
          end
        end

        private
          def build_event(name, id, payload)
            ActiveSupport::Notifications::Event.new name, nil, nil, id, payload
          end
      end

      def groups_for(name) # :nodoc:
        groups = @groups_for.compute_if_absent(name) do
          all_listeners_for(name).reject(&:silenceable).group_by(&:group_class).transform_values do |s|
            s.map(&:delegate)
          end
        end

        silenceable_groups = @silenceable_groups_for.compute_if_absent(name) do
          all_listeners_for(name).select(&:silenceable).group_by(&:group_class).transform_values do |s|
            s.map(&:delegate)
          end
        end

        unless silenceable_groups.empty?
          groups = groups.dup
          silenceable_groups.each do |group_class, subscriptions|
            active_subscriptions = subscriptions.reject { |s| s.silenced?(name) }
            unless active_subscriptions.empty?
              groups[group_class] = (groups[group_class] || []) + active_subscriptions
            end
          end
        end

        groups
      end

      # A +Handle+ is used to record the start and finish time of event.
      #
      # Both #start and #finish must each be called exactly once.
      #
      # Where possible, it's best to use the block form: ActiveSupport::Notifications.instrument.
      # +Handle+ is a low-level API intended for cases where the block form can't be used.
      #
      #   handle = ActiveSupport::Notifications.instrumenter.build_handle("my.event", {})
      #   begin
      #     handle.start
      #     # work to be instrumented
      #   ensure
      #     handle.finish
      #   end
      class Handle
        include FanoutIteration

        def initialize(notifier, name, id, payload) # :nodoc:
          @name = name
          @id = id
          @payload = payload
          @groups = notifier.groups_for(name).map do |group_klass, grouped_listeners|
            group_klass.new(grouped_listeners, name, id, payload)
          end
          @state = :initialized
        end

        def start
          ensure_state! :initialized
          @state = :started

          iterate_guarding_exceptions(@groups) do |group|
            group.start(@name, @id, @payload)
          end
        end

        def finish
          finish_with_values(@name, @id, @payload)
        end

        def finish_with_values(name, id, payload) # :nodoc:
          ensure_state! :started
          @state = :finished

          iterate_guarding_exceptions(@groups) do |group|
            group.finish(name, id, payload)
          end
        end

        private
          def ensure_state!(expected)
            if @state != expected
              raise ArgumentError, "expected state to be #{expected.inspect} but was #{@state.inspect}"
            end
          end
      end

      include FanoutIteration

      def build_handle(name, id, payload)
        Handle.new(self, name, id, payload)
      end

      def start(name, id, payload)
        handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
        handle = build_handle(name, id, payload)
        handle_stack << handle
        handle.start
      end

      def finish(name, id, payload, listeners = nil)
        handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
        handle = handle_stack.pop
        handle.finish_with_values(name, id, payload)
      end

      def publish(name, *args)
        iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
      end

      def publish_event(event)
        iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
      end

      def all_listeners_for(name)
        # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
        @all_listeners_for[name] || synchronize do
          # use synchronisation when accessing @subscribers
          @all_listeners_for[name] ||=
            @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
        end
      end

      def listeners_for(name)
        all_listeners_for(name).reject { |s| s.silenced?(name) }
      end

      def listening?(name)
        all_listeners_for(name).any? { |s| !s.silenced?(name) }
      end

      # This is a sync queue, so there is no waiting.
      def wait
      end

      module Subscribers # :nodoc:
        def self.new(pattern, listener, monotonic)
          subscriber_class = monotonic ? MonotonicTimed : Timed

          if listener.respond_to?(:start) && listener.respond_to?(:finish)
            subscriber_class = Evented
          else
            # Doing this to detect a single argument block or callable
            # like `proc { |x| }` vs `proc { |*x| }`, `proc { |**x| }`,
            # or `proc { |x, **y| }`
            procish = listener.respond_to?(:parameters) ? listener : listener.method(:call)

            if procish.arity == 1 && procish.parameters.length == 1
              subscriber_class = EventObject
            end
          end

          subscriber_class.new(pattern, listener)
        end

        class Matcher # :nodoc:
          attr_reader :pattern, :exclusions

          def self.wrap(pattern)
            if String === pattern
              pattern
            elsif pattern.nil?
              AllMessages.new
            else
              new(pattern)
            end
          end

          def initialize(pattern)
            @pattern = pattern
            @exclusions = Set.new
          end

          def unsubscribe!(name)
            exclusions << -name if pattern === name
          end

          def ===(name)
            pattern === name && !exclusions.include?(name)
          end

          class AllMessages
            def ===(name)
              true
            end

            def unsubscribe!(*)
              false
            end
          end
        end

        class Evented # :nodoc:
          attr_reader :pattern, :delegate, :silenceable

          def initialize(pattern, delegate)
            @pattern = Matcher.wrap(pattern)
            @delegate = delegate
            @silenceable = delegate.respond_to?(:silenced?)
            @can_publish = delegate.respond_to?(:publish)
            @can_publish_event = delegate.respond_to?(:publish_event)
          end

          def group_class
            EventedGroup
          end

          def publish(name, *args)
            if @can_publish
              @delegate.publish name, *args
            end
          end

          def publish_event(event)
            if @can_publish_event
              @delegate.publish_event event
            else
              publish(event.name, event.time, event.end, event.transaction_id, event.payload)
            end
          end

          def silenced?(name)
            @silenceable && @delegate.silenced?(name)
          end

          def subscribed_to?(name)
            pattern === name
          end

          def unsubscribe!(name)
            pattern.unsubscribe!(name)
          end
        end

        class Timed < Evented # :nodoc:
          def group_class
            TimedGroup
          end

          def publish(name, *args)
            @delegate.call name, *args
          end
        end

        class MonotonicTimed < Timed # :nodoc:
          def group_class
            MonotonicTimedGroup
          end
        end

        class EventObject < Evented
          def group_class
            EventObjectGroup
          end

          def publish_event(event)
            @delegate.call event
          end
        end
      end
    end
  end
end