lib/posthog/client.rb



# frozen_string_literal: true

require 'time'

require 'posthog/defaults'
require 'posthog/logging'
require 'posthog/utils'
require 'posthog/send_worker'
require 'posthog/noop_worker'
require 'posthog/feature_flags'
require 'posthog/send_feature_flags_options'

module PostHog
  class Client
    include PostHog::Utils
    include PostHog::Logging

    # @param [Hash] opts
    # @option opts [String] :api_key Your project's api_key
    # @option opts [String] :personal_api_key Your personal API key
    # @option opts [FixNum] :max_queue_size Maximum number of calls to be
    #   remain queued. Defaults to 10_000.
    # @option opts [Bool] :test_mode +true+ if messages should remain
    #   queued for testing. Defaults to +false+.
    # @option opts [Proc] :on_error Handles error calls from the API.
    # @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://app.posthog.com`
    # @option opts [Integer] :feature_flags_polling_interval How often to poll for feature flag definition changes.
    #   Measured in seconds, defaults to 30.
    # @option opts [Integer] :feature_flag_request_timeout_seconds How long to wait for feature flag evaluation.
    #   Measured in seconds, defaults to 3.
    # @option opts [Proc] :before_send A block that receives the event hash and should return either a modified hash
    #   to be sent to PostHog or nil to prevent the event from being sent. e.g. `before_send: ->(event) { event }`
    def initialize(opts = {})
      symbolize_keys!(opts)

      opts[:host] ||= 'https://app.posthog.com'

      @queue = Queue.new
      @api_key = opts[:api_key]
      @max_queue_size = opts[:max_queue_size] || Defaults::Queue::MAX_SIZE
      @worker_mutex = Mutex.new
      @worker = if opts[:test_mode]
                  NoopWorker.new(@queue)
                else
                  SendWorker.new(@queue, @api_key, opts)
                end
      @worker_thread = nil
      @feature_flags_poller = nil
      @personal_api_key = opts[:personal_api_key]

      check_api_key!

      @feature_flags_poller =
        FeatureFlagsPoller.new(
          opts[:feature_flags_polling_interval],
          opts[:personal_api_key],
          @api_key,
          opts[:host],
          opts[:feature_flag_request_timeout_seconds] || Defaults::FeatureFlags::FLAG_REQUEST_TIMEOUT_SECONDS,
          opts[:on_error]
        )

      @distinct_id_has_sent_flag_calls = SizeLimitedHash.new(Defaults::MAX_HASH_SIZE) do |hash, key|
        hash[key] = []
      end

      @before_send = opts[:before_send]
    end

    # Synchronously waits until the worker has cleared the queue.
    #
    # Use only for scripts which are not long-running, and will specifically
    # exit
    def flush
      while !@queue.empty? || @worker.is_requesting?
        ensure_worker_running
        sleep(0.1)
      end
    end

    # Clears the queue without waiting.
    #
    # Use only in test mode
    def clear
      @queue.clear
    end

    # @!macro common_attrs
    #   @option attrs [String] :message_id ID that uniquely
    #     identifies a message across the API. (optional)
    #   @option attrs [Time] :timestamp When the event occurred (optional)
    #   @option attrs [String] :distinct_id The ID for this user in your database

    # Captures an event
    #
    # @param [Hash] attrs
    #
    # @option attrs [String] :event Event name
    # @option attrs [Hash] :properties Event properties (optional)
    # @option attrs [Bool, Hash, SendFeatureFlagsOptions] :send_feature_flags
    #   Whether to send feature flags with this event, or configuration for feature flag evaluation (optional)
    # @option attrs [String] :uuid ID that uniquely identifies an event;
    #                             events in PostHog are deduplicated by the
    #                             combination of teamId, timestamp date,
    #                             event name, distinct id, and UUID
    # @macro common_attrs
    def capture(attrs)
      symbolize_keys! attrs

      send_feature_flags_param = attrs[:send_feature_flags]
      if send_feature_flags_param
        # Handle different types of send_feature_flags parameter
        case send_feature_flags_param
        when true
          # Backward compatibility: simple boolean
          feature_variants = @feature_flags_poller.get_feature_variants(attrs[:distinct_id], attrs[:groups] || {})
        when Hash
          # Hash with options
          options = SendFeatureFlagsOptions.from_hash(send_feature_flags_param)
          feature_variants = @feature_flags_poller.get_feature_variants(
            attrs[:distinct_id],
            attrs[:groups] || {},
            options ? options.person_properties : {},
            options ? options.group_properties : {},
            options ? options.only_evaluate_locally : false
          )
        when SendFeatureFlagsOptions
          # SendFeatureFlagsOptions object
          feature_variants = @feature_flags_poller.get_feature_variants(
            attrs[:distinct_id],
            attrs[:groups] || {},
            send_feature_flags_param.person_properties,
            send_feature_flags_param.group_properties,
            send_feature_flags_param.only_evaluate_locally || false
          )
        else
          # Invalid type, treat as false
          feature_variants = nil
        end

        attrs[:feature_variants] = feature_variants if feature_variants
      end

      enqueue(FieldParser.parse_for_capture(attrs))
    end

    # Identifies a user
    #
    # @param [Hash] attrs
    #
    # @option attrs [Hash] :properties User properties (optional)
    # @macro common_attrs
    def identify(attrs)
      symbolize_keys! attrs
      enqueue(FieldParser.parse_for_identify(attrs))
    end

    # Identifies a group
    #
    # @param [Hash] attrs
    #
    # @option attrs [String] :group_type Group type
    # @option attrs [String] :group_key Group key
    # @option attrs [Hash] :properties Group properties (optional)
    # @option attrs [String] :distinct_id Distinct ID (optional)
    # @macro common_attrs
    def group_identify(attrs)
      symbolize_keys! attrs
      enqueue(FieldParser.parse_for_group_identify(attrs))
    end

    # Aliases a user from one id to another
    #
    # @param [Hash] attrs
    #
    # @option attrs [String] :alias The alias to give the distinct id
    # @macro common_attrs
    def alias(attrs)
      symbolize_keys! attrs
      enqueue(FieldParser.parse_for_alias(attrs))
    end

    # @return [Hash] pops the last message from the queue
    def dequeue_last_message
      @queue.pop
    end

    # @return [Fixnum] number of messages in the queue
    def queued_messages
      @queue.length
    end

    # TODO: In future version, rename to `feature_flag_enabled?`
    def is_feature_enabled( # rubocop:disable Naming/PredicateName
      flag_key,
      distinct_id,
      groups: {},
      person_properties: {},
      group_properties: {},
      only_evaluate_locally: false,
      send_feature_flag_events: true
    )
      response = get_feature_flag(
        flag_key,
        distinct_id,
        groups: groups,
        person_properties: person_properties,
        group_properties: group_properties,
        only_evaluate_locally: only_evaluate_locally,
        send_feature_flag_events: send_feature_flag_events
      )
      return nil if response.nil?

      !!response
    end

    # @param [String] flag_key The unique flag key of the feature flag
    # @return [String] The decrypted value of the feature flag payload
    def get_remote_config_payload(flag_key)
      @feature_flags_poller.get_remote_config_payload(flag_key)
    end

    # Returns whether the given feature flag is enabled for the given user or not
    #
    # @param [String] key The key of the feature flag
    # @param [String] distinct_id The distinct id of the user
    # @param [Hash] groups
    # @param [Hash] person_properties key-value pairs of properties to associate with the user.
    # @param [Hash] group_properties
    #
    # @return [String, nil] The value of the feature flag
    #
    # The provided properties are used to calculate feature flags locally, if possible.
    #
    # `groups` are a mapping from group type to group key. So, if you have a group type of "organization"
    # and a group key of "5",
    # you would pass groups={"organization": "5"}.
    # `group_properties` take the format: { group_type_name: { group_properties } }
    # So, for example, if you have the group type "organization" and the group key "5", with the properties name,
    # and employee count, you'll send these as:
    # ```ruby
    #     group_properties: {"organization": {"name": "PostHog", "employees": 11}}
    # ```
    def get_feature_flag(
      key,
      distinct_id,
      groups: {},
      person_properties: {},
      group_properties: {},
      only_evaluate_locally: false,
      send_feature_flag_events: true
    )
      person_properties, group_properties = add_local_person_and_group_properties(
        distinct_id,
        groups,
        person_properties,
        group_properties
      )
      feature_flag_response, flag_was_locally_evaluated, request_id = @feature_flags_poller.get_feature_flag(
        key,
        distinct_id,
        groups,
        person_properties,
        group_properties,
        only_evaluate_locally
      )

      feature_flag_reported_key = "#{key}_#{feature_flag_response}"
      if !@distinct_id_has_sent_flag_calls[distinct_id].include?(feature_flag_reported_key) && send_feature_flag_events
        capture(
          {
            distinct_id: distinct_id,
            event: '$feature_flag_called',
            properties: {
              '$feature_flag' => key,
              '$feature_flag_response' => feature_flag_response,
              'locally_evaluated' => flag_was_locally_evaluated
            }.merge(request_id ? { '$feature_flag_request_id' => request_id } : {}),
            groups: groups
          }
        )
        @distinct_id_has_sent_flag_calls[distinct_id] << feature_flag_reported_key
      end
      feature_flag_response
    end

    # Returns all flags for a given user
    #
    # @param [String] distinct_id The distinct id of the user
    # @param [Hash] groups
    # @param [Hash] person_properties key-value pairs of properties to associate with the user.
    # @param [Hash] group_properties
    #
    # @return [Hash] String (not symbol) key value pairs of flag and their values
    def get_all_flags(
      distinct_id,
      groups: {},
      person_properties: {},
      group_properties: {},
      only_evaluate_locally: false
    )
      person_properties, group_properties = add_local_person_and_group_properties(distinct_id, groups,
                                                                                  person_properties, group_properties)
      @feature_flags_poller.get_all_flags(distinct_id, groups, person_properties, group_properties,
                                          only_evaluate_locally)
    end

    # Returns payload for a given feature flag
    #
    # @param [String] key The key of the feature flag
    # @param [String] distinct_id The distinct id of the user
    # @option [String or boolean] match_value The value of the feature flag to be matched
    # @option [Hash] groups
    # @option [Hash] person_properties key-value pairs of properties to associate with the user.
    # @option [Hash] group_properties
    # @option [Boolean] only_evaluate_locally
    #
    def get_feature_flag_payload(
      key,
      distinct_id,
      match_value: nil,
      groups: {},
      person_properties: {},
      group_properties: {},
      only_evaluate_locally: false
    )
      person_properties, group_properties = add_local_person_and_group_properties(distinct_id, groups,
                                                                                  person_properties, group_properties)
      @feature_flags_poller.get_feature_flag_payload(key, distinct_id, match_value, groups, person_properties,
                                                     group_properties, only_evaluate_locally)
    end

    # Returns all flags and payloads for a given user
    #
    # @return [Hash] A hash with the following keys:
    #   featureFlags: A hash of feature flags
    #   featureFlagPayloads: A hash of feature flag payloads
    #
    # @param [String] distinct_id The distinct id of the user
    # @option [Hash] groups
    # @option [Hash] person_properties key-value pairs of properties to associate with the user.
    # @option [Hash] group_properties
    # @option [Boolean] only_evaluate_locally
    #
    def get_all_flags_and_payloads(
      distinct_id,
      groups: {},
      person_properties: {},
      group_properties: {},
      only_evaluate_locally: false
    )
      person_properties, group_properties = add_local_person_and_group_properties(
        distinct_id, groups, person_properties, group_properties
      )
      response = @feature_flags_poller.get_all_flags_and_payloads(
        distinct_id, groups, person_properties, group_properties, only_evaluate_locally
      )

      response.delete(:requestId) # remove internal information.
      response
    end

    def reload_feature_flags
      unless @personal_api_key
        logger.error(
          'You need to specify a personal_api_key to locally evaluate feature flags'
        )
        return
      end
      @feature_flags_poller.load_feature_flags(true)
    end

    def shutdown
      @feature_flags_poller.shutdown_poller
      flush
    end

    private

    # before_send should run immediately before the event is sent to the queue.
    # @param [Object] action The event to be sent to PostHog
    # @return [null, Object, nil] The processed event or nil if the event should not be sent
    def process_before_send(action)
      return action if action.nil? || action.empty?
      return action unless @before_send

      begin
        processed_action = @before_send.call(action)

        if processed_action.nil?
          logger.warn("Event #{action[:event]} was rejected in beforeSend function")
        elsif processed_action.empty?
          logger.warn("Event #{action[:event]} has no properties after beforeSend function, this is likely an error")
        end

        processed_action
      rescue StandardError => e
        logger.error("Error in beforeSend function - using original event: #{e.message}")
        action
      end
    end

    # private: Enqueues the action.
    #
    # returns Boolean of whether the item was added to the queue.
    def enqueue(action)
      action = process_before_send(action)
      return false if action.nil? || action.empty?

      # add our request id for tracing purposes
      action[:messageId] ||= uid

      if @queue.length < @max_queue_size
        @queue << action
        ensure_worker_running

        true
      else
        logger.warn(
          'Queue is full, dropping events. The :max_queue_size ' \
          'configuration parameter can be increased to prevent this from ' \
          'happening.'
        )
        false
      end
    end

    # private: Checks that the api_key is properly initialized
    def check_api_key!
      raise ArgumentError, 'API key must be initialized' if @api_key.nil?
    end

    def ensure_worker_running
      return if worker_running?

      @worker_mutex.synchronize do
        return if worker_running?

        @worker_thread = Thread.new { @worker.run }
      end
    end

    def worker_running?
      @worker_thread&.alive?
    end

    def add_local_person_and_group_properties(distinct_id, groups, person_properties, group_properties)
      groups ||= {}
      person_properties ||= {}
      group_properties ||= {}

      symbolize_keys! groups
      symbolize_keys! person_properties
      symbolize_keys! group_properties

      group_properties.each_value do |value|
        symbolize_keys! value
      end

      all_person_properties = { distinct_id: distinct_id }.merge(person_properties)

      all_group_properties = {}
      groups&.each do |group_name, group_key|
        all_group_properties[group_name] = {
          '$group_key': group_key
        }.merge((group_properties && group_properties[group_name]) || {})
      end

      [all_person_properties, all_group_properties]
    end
  end
end