lib/splitclient-rb/sse/workers/splits_worker.rb



# frozen_string_literal: true

module SplitIoClient
  module SSE
    module Workers
      class SplitsWorker
        def initialize(synchronizer, config, feature_flags_repository, telemetry_runtime_producer,
                       segment_fetcher, rule_based_segment_repository)
          @synchronizer = synchronizer
          @config = config
          @feature_flags_repository = feature_flags_repository
          @queue = Queue.new
          @running = Concurrent::AtomicBoolean.new(false)
          @telemetry_runtime_producer = telemetry_runtime_producer
          @segment_fetcher = segment_fetcher
          @rule_based_segment_repository = rule_based_segment_repository
        end

        def start
          if @running.value
            @config.logger.debug('feature_flags_worker already running.')
            return
          end

          @running.make_true
          perform_thread
        end

        def stop
          unless @running.value
            @config.logger.debug('feature_flags_worker not running.')
            return
          end

          @running.make_false
          Helpers::ThreadHelper.stop(:split_update_worker, @config)
        end

        def add_to_queue(notification)
          @config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}")
          @queue.push(notification)
        end

        private

        def perform_thread
          @config.threads[:split_update_worker] = Thread.new do
            @config.logger.debug('starting feature_flags_worker ...') if @config.debug_enabled
            perform
          end
        end

        def perform
          while (notification = @queue.pop)
            @config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}")
            case notification.data['type']
            when SSE::EventSource::EventTypes::SPLIT_UPDATE
              success = update_feature_flag(notification)
              @synchronizer.fetch_splits(notification.data['changeNumber'], 0) unless success
            when SSE::EventSource::EventTypes::RB_SEGMENT_UPDATE
              success = update_rule_based_segment(notification)
              @synchronizer.fetch_splits(0, notification.data['changeNumber']) unless success
            when SSE::EventSource::EventTypes::SPLIT_KILL
              kill_feature_flag(notification)
            end
          end
        end

        def update_feature_flag(notification)
          return true if @feature_flags_repository.get_change_number.to_i >= notification.data['changeNumber']
          return false unless !notification.data['d'].nil? && @feature_flags_repository.get_change_number == notification.data['pcn']

          new_split = update_feature_flag_repository(notification)
          fetch_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_split, 'IN_SEGMENT'), @feature_flags_repository)
          if fetch_rule_based_segments_if_not_exists(Helpers::Util.segment_names_by_object(new_split, 'IN_RULE_BASED_SEGMENT'),
                                                     notification.data['changeNumber'])
            return true
          end

          @telemetry_runtime_producer.record_updates_from_sse(Telemetry::Domain::Constants::SPLITS)

          true
        rescue StandardError => e
          @config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled

          false
        end

        def update_feature_flag_repository(notification)
          new_split = return_object_from_json(notification)
          SplitIoClient::Helpers::RepositoryHelper.update_feature_flag_repository(@feature_flags_repository, [new_split],
                                                                                  notification.data['changeNumber'], @config, false)
          new_split
        end

        def update_rule_based_segment(notification)
          return true if @rule_based_segment_repository.get_change_number.to_i >= notification.data['changeNumber']
          return false unless !notification.data['d'].nil? &&
                              @rule_based_segment_repository.get_change_number == notification.data['pcn']

          new_rb_segment = return_object_from_json(notification)
          SplitIoClient::Helpers::RepositoryHelper.update_rule_based_segment_repository(@rule_based_segment_repository,
                                                                                        [new_rb_segment],
                                                                                        notification.data['changeNumber'], @config)
          fetch_segments_if_not_exists(Helpers::Util.segment_names_in_rb_segment(new_rb_segment, 'IN_SEGMENT'),
                                       @rule_based_segment_repository)

          # @telemetry_runtime_producer.record_updates_from_sse(Telemetry::Domain::Constants::SPLITS)

          true
        rescue StandardError => e
          @config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled

          false
        end

        def kill_feature_flag(notification)
          return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']

          @config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}")
          @feature_flags_repository.kill(notification.data['changeNumber'],
                                         notification.data['splitName'],
                                         notification.data['defaultTreatment'])
          @synchronizer.fetch_splits(notification.data['changeNumber'], 0)
        end

        def return_object_from_json(notification)
          object_json = Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d'])
          JSON.parse(object_json, symbolize_names: true)
        end

        def fetch_segments_if_not_exists(segment_names, object_repository)
          return if segment_names.nil?

          object_repository.set_segment_names(segment_names)
          @segment_fetcher.fetch_segments_if_not_exists(segment_names)
        end

        def fetch_rule_based_segments_if_not_exists(segment_names, change_number)
          return false if segment_names.nil? || segment_names.empty? || @rule_based_segment_repository.contains?(segment_names.to_a)

          @synchronizer.fetch_splits(0, change_number)

          true
        end
      end
    end
  end
end