class RedisClient
def blocking_call(timeout, *command, **kwargs)
def blocking_call(timeout, *command, **kwargs) command = @command_builder.generate(command, kwargs) error = nil result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, timeout) end rescue ReadTimeoutError => error break end if error raise error elsif block_given? yield result else result end end
def blocking_call_v(timeout, command)
def blocking_call_v(timeout, command) command = @command_builder.generate(command) error = nil result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, timeout) end rescue ReadTimeoutError => error break end if error raise error elsif block_given? yield result else result end end
def build_transaction
def build_transaction transaction = Multi.new(@command_builder) transaction.call("MULTI") yield transaction transaction.call("EXEC") transaction end
def call(*command, **kwargs)
def call(*command, **kwargs) command = @command_builder.generate(command, kwargs) result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def call_once(*command, **kwargs)
def call_once(*command, **kwargs) command = @command_builder.generate(command, kwargs) result = ensure_connected(retryable: false) do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def call_once_v(command)
def call_once_v(command) command = @command_builder.generate(command) result = ensure_connected(retryable: false) do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def call_v(command)
def call_v(command) command = @command_builder.generate(command) result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def close
def close @raw_connection&.close self end
def config(**kwargs)
def config(**kwargs) Config.new(client_implementation: self, **kwargs) end
def connect
def connect @pid = PIDCache.pid if @raw_connection @middlewares.connect(config) do @raw_connection.reconnect end else @raw_connection = @middlewares.connect(config) do config.driver.new( config, connect_timeout: connect_timeout, read_timeout: read_timeout, write_timeout: write_timeout, ) end end prelude = config.connection_prelude.dup if id prelude << ["CLIENT", "SETNAME", id] end # The connection prelude is deliberately not sent to Middlewares if config.sentinel? prelude << ["ROLE"] role, = @middlewares.call_pipelined(prelude, config) do @raw_connection.call_pipelined(prelude, nil).last end config.check_role!(role) else unless prelude.empty? @middlewares.call_pipelined(prelude, config) do @raw_connection.call_pipelined(prelude, nil) end end end rescue FailoverError, CannotConnectError => error error._set_config(config) raise error rescue ConnectionError => error connect_error = CannotConnectError.with_config(error.message, config) connect_error.set_backtrace(error.backtrace) raise connect_error rescue CommandError => error if error.message.match?(/ERR unknown command ['`]HELLO['`]/) raise UnsupportedServer, "redis-client requires Redis 6+ with HELLO command available (#{config.server_url})" else raise end end
def connected?
def connected? @raw_connection&.revalidate end
def db
def db config.db end
def default_driver
def default_driver unless @default_driver @driver_definitions.each_key do |name| if @default_driver = driver(name) break end rescue LoadError end end @default_driver end
def default_driver=(name)
def default_driver=(name) @default_driver = driver(name) end
def disable_reconnection(&block)
def disable_reconnection(&block) ensure_connected(retryable: false, &block) end
def driver(name)
def driver(name) return name if name.is_a?(Class) name = name.to_sym unless @driver_definitions.key?(name) raise ArgumentError, "Unknown driver #{name.inspect}, expected one of: `#{@driver_definitions.keys.inspect}`" end @drivers[name] ||= @driver_definitions[name]&.call end
def ensure_connected(retryable: true)
def ensure_connected(retryable: true) close if !config.inherit_socket && @pid != PIDCache.pid if @disable_reconnection if block_given? yield @raw_connection else @raw_connection end elsif retryable tries = 0 connection = nil preferred_error = nil begin connection = raw_connection if block_given? yield connection else connection end rescue ConnectionError, ProtocolError => error preferred_error ||= error close if error.is_a?(CircuitBreaker::OpenCircuitError) raise preferred_error else preferred_error = error end if !@disable_reconnection && config.retry_connecting?(tries, error) tries += 1 retry else raise preferred_error end end else previous_disable_reconnection = @disable_reconnection connection = ensure_connected begin @disable_reconnection = true yield connection rescue ConnectionError, ProtocolError close raise ensure @disable_reconnection = previous_disable_reconnection end end end
def host
def host config.host unless config.path end
def hscan(key, *args, **kwargs, &block)
def hscan(key, *args, **kwargs, &block) unless block_given? return to_enum(__callee__, key, *args, **kwargs) end args = @command_builder.generate(["HSCAN", key, 0] + args, kwargs) scan_pairs(2, args, &block) end
def id
def id config.id end
def initialize(config, **)
def initialize(config, **) super @middlewares = config.middlewares_stack.new(self) @raw_connection = nil @disable_reconnection = false end
def inspect
def inspect id_string = " id=#{id}" if id "#<#{self.class.name} #{config.server_url}#{id_string}>" end
def measure_round_trip_delay
def measure_round_trip_delay ensure_connected do |connection| @middlewares.call(["PING"], config) do connection.measure_round_trip_delay end end end
def multi(watch: nil, &block)
def multi(watch: nil, &block) transaction = nil results = if watch # WATCH is stateful, so we can't reconnect if it's used, the whole transaction # has to be redone. ensure_connected(retryable: false) do |connection| call("WATCH", *watch) begin if transaction = build_transaction(&block) commands = transaction._commands results = @middlewares.call_pipelined(commands, config) do connection.call_pipelined(commands, nil) end.last else call("UNWATCH") [] end rescue call("UNWATCH") if connected? && watch raise end end else transaction = build_transaction(&block) if transaction._empty? [] else ensure_connected(retryable: transaction._retryable?) do |connection| commands = transaction._commands @middlewares.call_pipelined(commands, config) do connection.call_pipelined(commands, nil) end.last end end end if transaction transaction._coerce!(results) else results end end
def new(arg = nil, **kwargs)
def new(arg = nil, **kwargs) if arg.is_a?(Config::Common) super else super(config(**(arg || {}), **kwargs)) end end
def now
def now Process.clock_gettime(Process::CLOCK_MONOTONIC) end
def now_ms
def now_ms Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) end
def password
def password config.password end
def path
def path config.path end
def pipelined(exception: true)
def pipelined(exception: true) pipeline = Pipeline.new(@command_builder) yield pipeline if pipeline._size == 0 [] else results = ensure_connected(retryable: pipeline._retryable?) do |connection| commands = pipeline._commands @middlewares.call_pipelined(commands, config) do connection.call_pipelined(commands, pipeline._timeouts, exception: exception) end end pipeline._coerce!(results) end end
def port
def port config.port unless config.path end
def pubsub
def pubsub sub = PubSub.new(ensure_connected, @command_builder) @raw_connection = nil sub end
def raw_connection
def raw_connection if @raw_connection.nil? || !@raw_connection.revalidate connect end @raw_connection end
def read_timeout=(timeout)
def read_timeout=(timeout) super @raw_connection&.read_timeout = timeout end
def register(middleware)
def register(middleware) Middlewares.include(middleware) end
def register_driver(name, &block)
def register_driver(name, &block) @driver_definitions[name] = block end
def scan(*args, **kwargs, &block)
def scan(*args, **kwargs, &block) unless block_given? return to_enum(__callee__, *args, **kwargs) end args = @command_builder.generate(["SCAN", 0] + args, kwargs) scan_list(1, args, &block) end
def scan_list(cursor_index, command, &block)
def scan_list(cursor_index, command, &block) cursor = 0 while cursor != "0" command[cursor_index] = cursor cursor, elements = call(*command) elements.each(&block) end nil end
def scan_pairs(cursor_index, command)
def scan_pairs(cursor_index, command) cursor = 0 while cursor != "0" command[cursor_index] = cursor cursor, elements = call(*command) index = 0 size = elements.size while index < size yield elements[index], elements[index + 1] index += 2 end end nil end
def sentinel(**kwargs)
def sentinel(**kwargs) SentinelConfig.new(client_implementation: self, **kwargs) end
def server_url
def server_url config.server_url end
def size
def size 1 end
def sscan(key, *args, **kwargs, &block)
def sscan(key, *args, **kwargs, &block) unless block_given? return to_enum(__callee__, key, *args, **kwargs) end args = @command_builder.generate(["SSCAN", key, 0] + args, kwargs) scan_list(2, args, &block) end
def timeout
def timeout config.read_timeout end
def timeout=(timeout)
def timeout=(timeout) super @raw_connection&.read_timeout = timeout @raw_connection&.write_timeout = timeout end
def username
def username config.username end
def with(_options = nil)
def with(_options = nil) yield self end
def write_timeout=(timeout)
def write_timeout=(timeout) super @raw_connection&.write_timeout = timeout end
def zscan(key, *args, **kwargs, &block)
def zscan(key, *args, **kwargs, &block) unless block_given? return to_enum(__callee__, key, *args, **kwargs) end args = @command_builder.generate(["ZSCAN", key, 0] + args, kwargs) scan_pairs(2, args, &block) end