class RedisClient::SentinelConfig
def check_role!(role)
def check_role!(role) if @role == :master unless role == "master" sleep SENTINEL_DELAY raise FailoverError, "Expected to connect to a master, but the server is a replica" end else unless role == "slave" sleep SENTINEL_DELAY raise FailoverError, "Expected to connect to a replica, but the server is a master" end end end
def config
def config @mutex.synchronize do @config ||= if @role == :master resolve_master else resolve_replica end end end
def each_sentinel
def each_sentinel last_error = nil @sentinel_configs.dup.each do |sentinel_config| sentinel_client = sentinel_client(sentinel_config) success = true begin yield sentinel_client rescue RedisClient::Error => error last_error = error success = false sleep SENTINEL_DELAY ensure if success @sentinel_configs.unshift(@sentinel_configs.delete(sentinel_config)) end end end raise last_error if last_error end
def host
def host config.host end
def initialize(name:, sentinels:, role: :master, **client_config)
def initialize(name:, sentinels:, role: :master, **client_config) unless %i(master replica slave).include?(role) raise ArgumentError, "Expected role to be either :master or :replica, got: #{role.inspect}" end @to_list_of_hash = @to_hash = nil @extra_config = {} if client_config[:protocol] == 2 @extra_config[:protocol] = client_config[:protocol] @to_list_of_hash = lambda do |may_be_a_list| if may_be_a_list.is_a?(Array) may_be_a_list.map { |l| l.each_slice(2).to_h } else may_be_a_list end end end @name = name @sentinel_configs = sentinels_to_configs(sentinels) @sentinels = {}.compare_by_identity @role = role @mutex = Mutex.new @config = nil client_config[:reconnect_attempts] ||= DEFAULT_RECONNECT_ATTEMPTS @client_config = client_config || {} super(**client_config) end
def path
def path nil end
def port
def port config.port end
def refresh_sentinels(sentinel_client)
def refresh_sentinels(sentinel_client) sentinel_response = sentinel_client.call("SENTINEL", "sentinels", @name, &@to_list_of_hash) sentinels = sentinel_response.map do |sentinel| { host: sentinel.fetch("ip"), port: Integer(sentinel.fetch("port")) } end new_sentinels = sentinels.select do |sentinel| @sentinel_configs.none? do |sentinel_config| sentinel_config.host == sentinel.fetch(:host) && sentinel_config.port == sentinel.fetch(:port) end end @sentinel_configs.concat sentinels_to_configs(new_sentinels) end
def reset
def reset @mutex.synchronize do @config = nil end end
def resolve_master
def resolve_master each_sentinel do |sentinel_client| host, port = sentinel_client.call("SENTINEL", "get-master-addr-by-name", @name) next unless host && port refresh_sentinels(sentinel_client) return Config.new(host: host, port: Integer(port), **@client_config) end rescue ConnectionError raise ConnectionError, "No sentinels available" else raise ConnectionError, "Couldn't locate a replica for role: #{@name}" end
def resolve_replica
def resolve_replica each_sentinel do |sentinel_client| replicas = sentinel_client.call("SENTINEL", "replicas", @name, &@to_list_of_hash) replicas.reject! do |r| flags = r["flags"].to_s.split(",") flags.include?("s_down") || flags.include?("o_down") end next if replicas.empty? replica = replicas.sample return Config.new(host: replica["ip"], port: Integer(replica["port"]), **@client_config) end rescue ConnectionError raise ConnectionError, "No sentinels available" else raise ConnectionError, "Couldn't locate a replica for role: #{@name}" end
def retry_connecting?(attempt, error)
def retry_connecting?(attempt, error) reset unless error.is_a?(TimeoutError) super end
def sentinel?
def sentinel? true end
def sentinel_client(sentinel_config)
def sentinel_client(sentinel_config) @sentinels[sentinel_config] ||= sentinel_config.new_client end
def sentinels
def sentinels @mutex.synchronize do @sentinel_configs.dup end end
def sentinels_to_configs(sentinels)
def sentinels_to_configs(sentinels) sentinels.map do |sentinel| case sentinel when String Config.new(**@extra_config, url: sentinel) else Config.new(**@extra_config, **sentinel) end end end