class ElasticAPM::CentralConfig
@api private
def assign(update)
def assign(update) # For each updated option, store the original value, # unless already stored update.each_key do |key| @modified_options[key] ||= config.get(key.to_sym)&.value end # If the new update doesn't set a previously modified option, # revert it to the original @modified_options.each_key do |key| next if update.key?(key) update[key] = @modified_options.delete(key) end @config.replace_options(update) end
def fetch_and_apply_config
def fetch_and_apply_config @promise = Concurrent::Promise .execute { fetch_config } .on_success { |resp| handle_success(resp) } .rescue { |err| handle_error(err) } end
def fetch_config
def fetch_config resp = perform_request # rubocop:disable Lint/DuplicateBranch case resp.status when 200..299 resp when 300..399 resp when 400..499 raise ClientError, resp when 500..599 raise ServerError, resp end # rubocop:enable Lint/DuplicateBranch end
def handle_error(error)
def handle_error(error) # For tests, WebMock failures don't have real responses response = error.response if error.respond_to?(:response) debug( 'Failed fetching config: %s, trying again in %d seconds', response&.body, DEFAULT_MAX_AGE ) assign({}) schedule_next_fetch(response) end
def handle_forking!
def handle_forking! stop start end
def handle_success(resp)
def handle_success(resp) if (etag = resp.headers['Etag']) @etag = etag end if resp.status == 304 debug 'Received 304 Not Modified' else if resp.body && !resp.body.empty? update = JSON.parse(resp.body.to_s) assign(update) end if update&.any? info 'Updated config from Kibana' debug 'Modified: %s', update.inspect debug 'Modified original options: %s', @modified_options.inspect end end schedule_next_fetch(resp) true rescue Exception => e error 'Failed to apply remote config, %s', e.inspect debug { e.backtrace.join('\n') } end
def headers
def headers { 'If-None-Match': @etag } end
def initialize(config)
def initialize(config) @config = config @modified_options = {} @http = Transport::Connection::Http.new(config) @etag = 1 end
def perform_request
def perform_request @http.get(server_url, headers: headers) end
def schedule_next_fetch(resp = nil)
def schedule_next_fetch(resp = nil) headers = resp&.headers seconds = if headers && headers['Cache-Control'] CacheControl.new(headers['Cache-Control']).max_age else DEFAULT_MAX_AGE end if seconds < 5 debug "Next fetch is too low (#{seconds}s) - increasing to default" seconds = 5 end @scheduled_task = Concurrent::ScheduledTask .execute(seconds) { fetch_and_apply_config } end
def server_url
def server_url @server_url ||= config.server_url + '/config/v1/agents' \ "?service.name=#{CGI.escape(config.service_name)}" \ "&service.environment=#{CGI.escape(config.environment || '')}" end
def start
def start return unless config.central_config? debug 'Starting CentralConfig' fetch_and_apply_config end
def stop
def stop debug 'Stopping CentralConfig' @scheduled_task&.cancel end