class RedisClient
Experimental RBS support (using type sampling data from the type_fusion
project).
# sig/redis_client.rbs class RedisClient def raw_connection: () -> untyped def sscan: (String key, *Array[] args, **Hash kwargs, ) -> untyped end
def blocking_call(timeout, *command, **kwargs)
def blocking_call(timeout, *command, **kwargs) command = @command_builder.generate(command, kwargs) error = nil result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, timeout) end rescue ReadTimeoutError => error break end if error raise error elsif block_given? yield result else result end end
def blocking_call_v(timeout, command)
def blocking_call_v(timeout, command) command = @command_builder.generate(command) error = nil result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, timeout) end rescue ReadTimeoutError => error break end if error raise error elsif block_given? yield result else result end end
def build_transaction
def build_transaction transaction = Multi.new(@command_builder) transaction.call("MULTI") yield transaction transaction.call("EXEC") transaction end
def call(*command, **kwargs)
def call(*command, **kwargs) command = @command_builder.generate(command, kwargs) result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def call_once(*command, **kwargs)
def call_once(*command, **kwargs) command = @command_builder.generate(command, kwargs) result = ensure_connected(retryable: false) do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def call_once_v(command)
def call_once_v(command) command = @command_builder.generate(command) result = ensure_connected(retryable: false) do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def call_v(command)
def call_v(command) command = @command_builder.generate(command) result = ensure_connected do |connection| @middlewares.call(command, config) do connection.call(command, nil) end end if block_given? yield result else result end end
def close
def close @raw_connection&.close self end
def config(**kwargs)
def config(**kwargs) Config.new(client_implementation: self, **kwargs) end
def connect
def connect @pid = PIDCache.pid if @raw_connection @middlewares.connect(config) do @raw_connection.reconnect end else @raw_connection = @middlewares.connect(config) do config.driver.new( config, connect_timeout: connect_timeout, read_timeout: read_timeout, write_timeout: write_timeout, ) end end prelude = config.connection_prelude.dup if id prelude << ["CLIENT", "SETNAME", id] end # The connection prelude is deliberately not sent to Middlewares if config.sentinel? prelude << ["ROLE"] role, = @middlewares.call_pipelined(prelude, config) do @raw_connection.call_pipelined(prelude, nil).last end config.check_role!(role) else unless prelude.empty? @middlewares.call_pipelined(prelude, config) do @raw_connection.call_pipelined(prelude, nil) end end end rescue FailoverError, CannotConnectError raise rescue ConnectionError => error raise CannotConnectError, error.message, error.backtrace rescue CommandError => error if error.message.match?(/ERR unknown command ['`]HELLO['`]/) raise UnsupportedServer, "redis-client requires Redis 6+ with HELLO command available (#{config.server_url})" else raise end end
def connected?
def connected? @raw_connection&.connected? end
def default_driver
def default_driver unless @default_driver @driver_definitions.each_key do |name| if @default_driver = driver(name) break end rescue LoadError end end @default_driver end
def default_driver=(name)
def default_driver=(name) @default_driver = driver(name) end
def driver(name)
def driver(name) return name if name.is_a?(Class) name = name.to_sym unless @driver_definitions.key?(name) raise ArgumentError, "Unknown driver #{name.inspect}, expected one of: `#{@driver_definitions.keys.inspect}`" end @drivers[name] ||= @driver_definitions[name]&.call end
def ensure_connected(retryable: true)
def ensure_connected(retryable: true) close if !config.inherit_socket && @pid != PIDCache.pid if @disable_reconnection if block_given? yield @raw_connection else @raw_connection end elsif retryable tries = 0 connection = nil begin connection = raw_connection if block_given? yield connection else connection end rescue ConnectionError, ProtocolError => error close if !@disable_reconnection && config.retry_connecting?(tries, error) tries += 1 retry else raise end end else previous_disable_reconnection = @disable_reconnection connection = ensure_connected begin @disable_reconnection = true yield connection rescue ConnectionError, ProtocolError close raise ensure @disable_reconnection = previous_disable_reconnection end end end
def hscan(key, *args, **kwargs, &block)
def hscan(key, *args, **kwargs, &block) unless block_given? return to_enum(__callee__, key, *args, **kwargs) end args = @command_builder.generate(["HSCAN", key, 0] + args, kwargs) scan_pairs(2, args, &block) end
def initialize(config, **)
def initialize(config, **) super @middlewares = config.middlewares_stack.new(self) @raw_connection = nil @disable_reconnection = false end
def inspect
def inspect id_string = " id=#{id}" if id "#<#{self.class.name} #{config.server_url}#{id_string}>" end
def multi(watch: nil, &block)
def multi(watch: nil, &block) transaction = nil results = if watch # WATCH is stateful, so we can't reconnect if it's used, the whole transaction # has to be redone. ensure_connected(retryable: false) do |connection| call("WATCH", *watch) begin if transaction = build_transaction(&block) commands = transaction._commands results = @middlewares.call_pipelined(commands, config) do connection.call_pipelined(commands, nil) end.last else call("UNWATCH") [] end rescue call("UNWATCH") if connected? && watch raise end end else transaction = build_transaction(&block) if transaction._empty? [] else ensure_connected(retryable: transaction._retryable?) do |connection| commands = transaction._commands @middlewares.call_pipelined(commands, config) do connection.call_pipelined(commands, nil) end.last end end end if transaction transaction._coerce!(results) else results end end
def new(arg = nil, **kwargs)
def new(arg = nil, **kwargs) if arg.is_a?(Config::Common) super else super(config(**(arg || {}), **kwargs)) end end
def pipelined
def pipelined pipeline = Pipeline.new(@command_builder) yield pipeline if pipeline._size == 0 [] else results = ensure_connected(retryable: pipeline._retryable?) do |connection| commands = pipeline._commands @middlewares.call_pipelined(commands, config) do connection.call_pipelined(commands, pipeline._timeouts) end end pipeline._coerce!(results) end end
def pubsub
def pubsub sub = PubSub.new(ensure_connected, @command_builder) @raw_connection = nil sub end
def raw_connection
Experimental RBS support (using type sampling data from the type_fusion
project).
def raw_connection: () -> untyped
This signature was generated using 2 samples from 1 application.
def raw_connection if @raw_connection.nil? || !@raw_connection.revalidate connect end @raw_connection end
def read_timeout=(timeout)
def read_timeout=(timeout) super raw_connection.read_timeout = timeout if connected? end
def register(middleware)
def register(middleware) Middlewares.include(middleware) end
def register_driver(name, &block)
def register_driver(name, &block) @driver_definitions[name] = block end
def scan(*args, **kwargs, &block)
def scan(*args, **kwargs, &block) unless block_given? return to_enum(__callee__, *args, **kwargs) end args = @command_builder.generate(["SCAN", 0] + args, kwargs) scan_list(1, args, &block) end
def scan_list(cursor_index, command, &block)
def scan_list(cursor_index, command, &block) cursor = 0 while cursor != "0" command[cursor_index] = cursor cursor, elements = call(*command) elements.each(&block) end nil end
def scan_pairs(cursor_index, command)
def scan_pairs(cursor_index, command) cursor = 0 while cursor != "0" command[cursor_index] = cursor cursor, elements = call(*command) index = 0 size = elements.size while index < size yield elements[index], elements[index + 1] index += 2 end end nil end
def sentinel(**kwargs)
def sentinel(**kwargs) SentinelConfig.new(client_implementation: self, **kwargs) end
def size
def size 1 end
def sscan(key, *args, **kwargs, &block)
Experimental RBS support (using type sampling data from the type_fusion
project).
def sscan: (String key, * args, ** kwargs, ) -> untyped
This signature was generated using 2 samples from 1 application.
def sscan(key, *args, **kwargs, &block) unless block_given? return to_enum(__callee__, key, *args, **kwargs) end args = @command_builder.generate(["SSCAN", key, 0] + args, kwargs) scan_list(2, args, &block) end
def timeout=(timeout)
def timeout=(timeout) super raw_connection.read_timeout = raw_connection.write_timeout = timeout if connected? end
def with(_options = nil)
def with(_options = nil) yield self end
def write_timeout=(timeout)
def write_timeout=(timeout) super raw_connection.write_timeout = timeout if connected? end
def zscan(key, *args, **kwargs, &block)
def zscan(key, *args, **kwargs, &block) unless block_given? return to_enum(__callee__, key, *args, **kwargs) end args = @command_builder.generate(["ZSCAN", key, 0] + args, kwargs) scan_pairs(2, args, &block) end