class Redis::Client

def _parse_driver(driver)

def _parse_driver(driver)
  driver = driver.to_s if driver.is_a?(Symbol)
  if driver.is_a?(String)
    begin
      require_relative "connection/#{driver}"
    rescue LoadError, NameError
      begin
        require "redis/connection/#{driver}"
      rescue LoadError, NameError => error
        raise "Cannot load driver #{driver.inspect}: #{error.message}"
      end
    end
    driver = Connection.const_get(driver.capitalize)
  end
  driver
end

def _parse_options(options)

def _parse_options(options)
  return options if options[:_parsed]
  defaults = DEFAULTS.dup
  options = options.dup
  defaults.each_key do |key|
    # Fill in defaults if needed
    defaults[key] = defaults[key].call if defaults[key].respond_to?(:call)
    # Symbolize only keys that are needed
    options[key] = options[key.to_s] if options.key?(key.to_s)
  end
  url = options[:url]
  url = defaults[:url] if url.nil?
  # Override defaults from URL if given
  if url
    require "uri"
    uri = URI(url)
    case uri.scheme
    when "unix"
      defaults[:path] = uri.path
    when "redis", "rediss"
      defaults[:scheme]   = uri.scheme
      defaults[:host]     = uri.host.sub(/\A\[(.*)\]\z/, '\1') if uri.host
      defaults[:port]     = uri.port if uri.port
      defaults[:username] = CGI.unescape(uri.user) if uri.user && !uri.user.empty?
      defaults[:password] = CGI.unescape(uri.password) if uri.password && !uri.password.empty?
      defaults[:db]       = uri.path[1..-1].to_i if uri.path
      defaults[:role] = :master
    else
      raise ArgumentError, "invalid uri scheme '#{uri.scheme}'"
    end
    defaults[:ssl] = true if uri.scheme == "rediss"
  end
  # Use default when option is not specified or nil
  defaults.each_key do |key|
    options[key] = defaults[key] if options[key].nil?
  end
  if options[:path]
    # Unix socket
    options[:scheme] = "unix"
    options.delete(:host)
    options.delete(:port)
  else
    # TCP socket
    options[:host] = options[:host].to_s
    options[:port] = options[:port].to_i
  end
  if options.key?(:timeout)
    options[:connect_timeout] ||= options[:timeout]
    options[:read_timeout]    ||= options[:timeout]
    options[:write_timeout]   ||= options[:timeout]
  end
  options[:connect_timeout] = Float(options[:connect_timeout])
  options[:read_timeout]    = Float(options[:read_timeout])
  options[:write_timeout]   = Float(options[:write_timeout])
  options[:reconnect_attempts] = options[:reconnect_attempts].to_i
  options[:reconnect_delay] = options[:reconnect_delay].to_f
  options[:reconnect_delay_max] = options[:reconnect_delay_max].to_f
  options[:db] = options[:db].to_i
  options[:driver] = _parse_driver(options[:driver]) || Connection.drivers.last
  case options[:tcp_keepalive]
  when Hash
    %i[time intvl probes].each do |key|
      unless options[:tcp_keepalive][key].is_a?(Integer)
        raise "Expected the #{key.inspect} key in :tcp_keepalive to be an Integer"
      end
    end
  when Integer
    if options[:tcp_keepalive] >= 60
      options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 20, intvl: 10, probes: 2 }
    elsif options[:tcp_keepalive] >= 30
      options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 10, intvl: 5, probes: 2 }
    elsif options[:tcp_keepalive] >= 5
      options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 2, intvl: 2, probes: 1 }
    end
  end
  options[:_parsed] = true
  options
end

def call(command)

def call(command)
  reply = process([command]) { read }
  raise reply if reply.is_a?(CommandError)
  if block_given? && reply != 'QUEUED'
    yield reply
  else
    reply
  end
end

def call_loop(command, timeout = 0)

def call_loop(command, timeout = 0)
  error = nil
  result = with_socket_timeout(timeout) do
    process([command]) do
      loop do
        reply = read
        if reply.is_a?(CommandError)
          error = reply
          break
        else
          yield reply
        end
      end
    end
  end
  # Raise error when previous block broke out of the loop.
  raise error if error
  # Result is set to the value that the provided block used to break.
  result
end

def call_pipeline(pipeline)

def call_pipeline(pipeline)
  return [] if pipeline.futures.empty?
  with_reconnect pipeline.with_reconnect? do
    begin
      pipeline.finish(call_pipelined(pipeline)).tap do
        self.db = pipeline.db if pipeline.db
      end
    rescue ConnectionError => e
      return nil if pipeline.shutdown?
      # Assume the pipeline was sent in one piece, but execution of
      # SHUTDOWN caused none of the replies for commands that were executed
      # prior to it from coming back around.
      raise e
    end
  end
end

def call_pipelined(pipeline)

def call_pipelined(pipeline)
  return [] if pipeline.futures.empty?
  # The method #ensure_connected (called from #process) reconnects once on
  # I/O errors. To make an effort in making sure that commands are not
  # executed more than once, only allow reconnection before the first reply
  # has been read. When an error occurs after the first reply has been
  # read, retrying would re-execute the entire pipeline, thus re-issuing
  # already successfully executed commands. To circumvent this, don't retry
  # after the first reply has been read successfully.
  commands = pipeline.commands
  result = Array.new(commands.size)
  reconnect = @reconnect
  begin
    exception = nil
    process(commands) do
      pipeline.timeouts.each_with_index do |timeout, i|
        reply = if timeout
          with_socket_timeout(timeout) { read }
        else
          read
        end
        result[i] = reply
        @reconnect = false
        exception = reply if exception.nil? && reply.is_a?(CommandError)
      end
    end
    raise exception if exception
  ensure
    @reconnect = reconnect
  end
  result
end

def call_with_timeout(command, extra_timeout, &blk)

def call_with_timeout(command, extra_timeout, &blk)
  timeout = extra_timeout == 0 ? 0 : self.timeout + extra_timeout
  with_socket_timeout(timeout) do
    call(command, &blk)
  end
rescue ConnectionError
  retry
end

def call_without_timeout(command, &blk)

def call_without_timeout(command, &blk)
  call_with_timeout(command, 0, &blk)
end

def connect

def connect
  @pid = Process.pid
  # Don't try to reconnect when the connection is fresh
  with_reconnect(false) do
    establish_connection
    if password
      if username
        begin
          call [:auth, username, password]
        rescue CommandError => err # Likely on Redis < 6
          case err.message
          when /ERR wrong number of arguments for 'auth' command/
            call [:auth, password]
          when /WRONGPASS invalid username-password pair/
            begin
              call [:auth, password]
            rescue CommandError
              raise err
            end
            ::Redis.deprecate!(
              "[redis-rb] The Redis connection was configured with username #{username.inspect}, but" \
              " the provided password was for the default user. This will start failing in redis-rb 5.0.0."
            )
          else
            raise
          end
        end
      else
        call [:auth, password]
      end
    end
    call [:readonly] if @options[:readonly]
    call [:select, db] if db != 0
    call [:client, :setname, @options[:id]] if @options[:id]
    @connector.check(self)
  end
  self
end

def connect_timeout

def connect_timeout
  @options[:connect_timeout]
end

def connected?

def connected?
  !!(connection && connection.connected?)
end

def db

def db
  @options[:db]
end

def db=(db)

def db=(db)
  @options[:db] = db.to_i
end

def disconnect

def disconnect
  connection.disconnect if connected?
end

def driver

def driver
  @options[:driver]
end

def ensure_connected

def ensure_connected
  disconnect if @pending_reads > 0
  attempts = 0
  begin
    attempts += 1
    if connected?
      unless inherit_socket? || Process.pid == @pid
        raise InheritedError,
              "Tried to use a connection from a child process without reconnecting. " \
              "You need to reconnect to Redis after forking " \
              "or set :inherit_socket to true."
      end
    else
      connect
    end
    yield
  rescue BaseConnectionError
    disconnect
    if attempts <= @options[:reconnect_attempts] && @reconnect
      sleep_t = [(@options[:reconnect_delay] * 2**(attempts - 1)),
                 @options[:reconnect_delay_max]].min
      Kernel.sleep(sleep_t)
      retry
    else
      raise
    end
  rescue Exception
    disconnect
    raise
  end
end

def establish_connection

def establish_connection
  server = @connector.resolve.dup
  @options[:host] = server[:host]
  @options[:port] = Integer(server[:port]) if server.include?(:port)
  @connection = @options[:driver].connect(@options)
  @pending_reads = 0
rescue TimeoutError,
       SocketError,
       Errno::EADDRNOTAVAIL,
       Errno::ECONNREFUSED,
       Errno::EHOSTDOWN,
       Errno::EHOSTUNREACH,
       Errno::ENETUNREACH,
       Errno::ENOENT,
       Errno::ETIMEDOUT,
       Errno::EINVAL => error
  raise CannotConnectError, "Error connecting to Redis on #{location} (#{error.class})"
end

def host

def host
  @options[:host]
end

def id

def id
  @options[:id] || "#{@options[:ssl] ? 'rediss' : @options[:scheme]}://#{location}/#{db}"
end

def inherit_socket?

def inherit_socket?
  @options[:inherit_socket]
end

def initialize(options = {})

def initialize(options = {})
  @options = _parse_options(options)
  @reconnect = true
  @logger = @options[:logger]
  @connection = nil
  @command_map = {}
  @pending_reads = 0
  @connector =
    if !@options[:sentinels].nil?
      Connector::Sentinel.new(@options)
    elsif options.include?(:connector) && options[:connector].respond_to?(:new)
      options.delete(:connector).new(@options)
    else
      Connector.new(@options)
    end
end

def io

def io
  yield
rescue TimeoutError => e1
  # Add a message to the exception without destroying the original stack
  e2 = TimeoutError.new("Connection timed out")
  e2.set_backtrace(e1.backtrace)
  raise e2
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL, EOFError => e
  raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last]
end

def location

def location
  path || "#{host}:#{port}"
end

def logging(commands)

def logging(commands)
  return yield unless @logger&.debug?
  begin
    commands.each do |name, *args|
      logged_args = args.map do |a|
        if a.respond_to?(:inspect) then a.inspect
        elsif a.respond_to?(:to_s) then a.to_s
        else
          # handle poorly-behaved descendants of BasicObject
          klass = a.instance_exec { (class << self; self end).superclass }
          "\#<#{klass}:#{a.__id__}>"
        end
      end
      @logger.debug("[Redis] command=#{name.to_s.upcase} args=#{logged_args.join(' ')}")
    end
    t1 = Time.now
    yield
  ensure
    @logger.debug("[Redis] call_time=%0.2f ms" % ((Time.now - t1) * 1000)) if t1
  end
end

def password

def password
  @options[:password]
end

def path

def path
  @options[:path]
end

def port

def port
  @options[:port]
end

def process(commands)

def process(commands)
  logging(commands) do
    ensure_connected do
      commands.each do |command|
        if command_map[command.first]
          command = command.dup
          command[0] = command_map[command.first]
        end
        write(command)
      end
      yield if block_given?
    end
  end
end

def read

def read
  io do
    value = connection.read
    @pending_reads -= 1
    value
  end
end

def read_timeout

def read_timeout
  @options[:read_timeout]
end

def reconnect

def reconnect
  disconnect
  connect
end

def scheme

def scheme
  @options[:scheme]
end

def timeout

def timeout
  @options[:read_timeout]
end

def username

def username
  @options[:username]
end

def with_reconnect(val = true)

def with_reconnect(val = true)
  original, @reconnect = @reconnect, val
  yield
ensure
  @reconnect = original
end

def with_socket_timeout(timeout)

def with_socket_timeout(timeout)
  connect unless connected?
  original = @options[:read_timeout]
  begin
    connection.timeout = timeout
    @options[:read_timeout] = timeout # for reconnection
    yield
  ensure
    connection.timeout = self.timeout if connected?
    @options[:read_timeout] = original
  end
end

def without_reconnect(&blk)

def without_reconnect(&blk)
  with_reconnect(false, &blk)
end

def without_socket_timeout(&blk)

def without_socket_timeout(&blk)
  with_socket_timeout(0, &blk)
end

def write(command)

def write(command)
  io do
    @pending_reads += 1
    connection.write(command)
  end
end