# frozen_string_literal: true
module SplitIoClient
module Engine
class Synchronizer
include SplitIoClient::Cache::Fetchers
include SplitIoClient::Cache::Senders
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
def initialize(
repositories,
config,
params
)
@splits_repository = repositories[:splits]
@segments_repository = repositories[:segments]
@impressions_repository = repositories[:impressions]
@events_repository = repositories[:events]
@config = config
@split_fetcher = params[:split_fetcher]
@segment_fetcher = params[:segment_fetcher]
@impressions_api = params[:impressions_api]
@impression_counter = params[:imp_counter]
@telemetry_synchronizer = params[:telemetry_synchronizer]
@impressions_sender_adapter = params[:impressions_sender_adapter]
@unique_keys_tracker = params[:unique_keys_tracker]
@splits_sync_backoff = Engine::BackOff.new(10, 0, 60)
@segments_sync_backoff = Engine::BackOff.new(10, 0, 60)
end
def sync_all(asynchronous = true)
unless asynchronous
return sync_splits_and_segments
end
@config.threads[:sync_all_thread] = Thread.new do
sync_splits_and_segments
end
true
end
def start_periodic_data_recording
unless @config.consumer?
impressions_sender
events_sender
start_telemetry_sync_task
end
impressions_count_sender
start_unique_keys_tracker_task
end
def start_periodic_fetch
@split_fetcher.call
@segment_fetcher.call
end
def stop_periodic_fetch
@split_fetcher.stop_splits_thread
@segment_fetcher.stop_segments_thread
end
def fetch_splits(target_change_number)
return if target_change_number <= @splits_repository.get_change_number.to_i
fetch_options = { cache_control_headers: true, till: nil }
result = attempt_splits_sync(target_change_number,
fetch_options,
@config.on_demand_fetch_max_retries,
@config.on_demand_fetch_retry_delay_seconds,
false)
attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
if result[:success]
@segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
@config.logger.debug("Refresh completed in #{attempts} attempts.") if @config.debug_enabled
return
end
fetch_options[:till] = target_change_number
result = attempt_splits_sync(target_change_number,
fetch_options,
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
nil,
true)
attempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - result[:remaining_attempts]
if result[:success]
@segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
@config.logger.debug("Refresh completed bypassing the CDN in #{attempts} attempts.") if @config.debug_enabled
else
@config.logger.debug("No changes fetched after #{attempts} attempts with CDN bypassed.") if @config.debug_enabled
end
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end
def fetch_segment(name, target_change_number)
return if target_change_number <= @segments_repository.get_change_number(name).to_i
fetch_options = { cache_control_headers: true, till: nil }
result = attempt_segment_sync(name,
target_change_number,
fetch_options,
@config.on_demand_fetch_max_retries,
@config.on_demand_fetch_retry_delay_seconds,
false)
attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
if result[:success]
@config.logger.debug("Segment #{name} refresh completed in #{attempts} attempts.") if @config.debug_enabled
return
end
fetch_options = { cache_control_headers: true, till: target_change_number }
result = attempt_segment_sync(name,
target_change_number,
fetch_options,
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
nil,
true)
attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
if result[:success]
@config.logger.debug("Segment #{name} refresh completed bypassing the CDN in #{attempts} attempts.") if @config.debug_enabled
else
@config.logger.debug("No changes fetched for segment #{name} after #{attempts} attempts with CDN bypassed.") if @config.debug_enabled
end
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end
private
def attempt_segment_sync(name, target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
remaining_attempts = max_retries
@segments_sync_backoff.reset
loop do
remaining_attempts -= 1
@segment_fetcher.fetch_segment(name, fetch_options)
return sync_result(true, remaining_attempts) if target_cn <= @segments_repository.get_change_number(name).to_i
return sync_result(false, remaining_attempts) if remaining_attempts <= 0
delay = with_backoff ? @segments_sync_backoff.interval : retry_delay_seconds
sleep(delay)
end
end
def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
remaining_attempts = max_retries
@splits_sync_backoff.reset
loop do
remaining_attempts -= 1
result = @split_fetcher.fetch_splits(fetch_options)
return sync_result(true, remaining_attempts, result[:segment_names]) if target_cn <= @splits_repository.get_change_number
return sync_result(false, remaining_attempts, result[:segment_names]) if remaining_attempts <= 0
delay = with_backoff ? @splits_sync_backoff.interval : retry_delay_seconds
sleep(delay)
end
end
# Starts thread which loops constantly and sends impressions to the Split API
def impressions_sender
ImpressionsSender.new(@impressions_repository, @config, @impressions_api).call unless @config.impressions_mode == :none
end
# Starts thread which loops constantly and sends events to the Split API
def events_sender
EventsSender.new(@events_repository, @config).call
end
# Starts thread which loops constantly and sends impressions count to the Split API
def impressions_count_sender
ImpressionsCountSender.new(@config, @impression_counter, @impressions_sender_adapter).call unless @config.impressions_mode == :debug
end
def start_telemetry_sync_task
Telemetry::SyncTask.new(@config, @telemetry_synchronizer).call
end
def start_unique_keys_tracker_task
@unique_keys_tracker.call
end
def sync_result(success, remaining_attempts, segment_names = nil)
{ success: success, remaining_attempts: remaining_attempts, segment_names: segment_names }
end
def sync_splits_and_segments
@config.logger.debug('Synchronizing feature flags and segments ...') if @config.debug_enabled
splits_result = @split_fetcher.fetch_splits
splits_result[:success] && @segment_fetcher.fetch_segments
end
end
end
end