lib/redis/client.rb



class Redis
  class Client
    MINUS    = "-".freeze
    PLUS     = "+".freeze
    COLON    = ":".freeze
    DOLLAR   = "$".freeze
    ASTERISK = "*".freeze

    attr_accessor :db, :host, :port, :password, :logger
    attr :timeout

    def initialize(options = {})
      @host = options[:host] || "127.0.0.1"
      @port = (options[:port] || 6379).to_i
      @db = (options[:db] || 0).to_i
      @timeout = (options[:timeout] || 5).to_i
      @password = options[:password]
      @logger = options[:logger]
      @sock = nil
    end

    def connect
      connect_to(@host, @port)
      call(:auth, @password) if @password
      call(:select, @db) if @db != 0
      @sock
    end

    def id
      "redis://#{host}:#{port}/#{db}"
    end

    def call(*args)
      process(args) do
        read
      end
    end

    def call_loop(*args)
      process(args) do
        loop { yield(read) }
      end
    end

    def call_pipelined(commands)
      process(*commands) do
        Array.new(commands.size) { read }
      end
    end

    def call_without_timeout(*args)
      without_socket_timeout do
        call(*args)
      end
    rescue Errno::ECONNRESET
      retry
    end

    def process(*commands)
      logging(commands) do
        ensure_connected do
          @sock.write(join_commands(commands))
          yield if block_given?
        end
      end
    end

    def connected?
      !! @sock
    end

    def disconnect
      return unless connected?

      begin
        @sock.close
      rescue
      ensure
        @sock = nil
      end
    end

    def reconnect
      disconnect
      connect
    end

    def read
      # We read the first byte using read() mainly because gets() is
      # immune to raw socket timeouts.
      begin
        reply_type = @sock.read(1)
      rescue Errno::EAGAIN

        # We want to make sure it reconnects on the next command after the
        # timeout. Otherwise the server may reply in the meantime leaving
        # the protocol in a desync status.
        disconnect

        raise Errno::EAGAIN, "Timeout reading from the socket"
      end

      raise Errno::ECONNRESET, "Connection lost" unless reply_type

      format_reply(reply_type, @sock.gets)
    end

    def without_socket_timeout
      connect unless connected?

      begin
        self.timeout = 0
        yield
      ensure
        self.timeout = @timeout if connected?
      end
    end

  protected

    def build_command(name, *args)
      command = []
      command << "*#{args.size + 1}"
      command << "$#{string_size name}"
      command << name

      args.each do |arg|
        arg = arg.to_s
        command << "$#{string_size arg}"
        command << arg
      end

      command
    end

    def deprecated(old, new = nil, trace = caller[0])
      message = "The method #{old} is deprecated and will be removed in 2.0"
      message << " - use #{new} instead" if new
      Redis.deprecate(message, trace)
    end

    COMMAND_DELIMITER = "\r\n"

    def join_commands(commands)
      commands.map do |command|
        build_command(*command).join(COMMAND_DELIMITER) + COMMAND_DELIMITER
      end.join(COMMAND_DELIMITER) + COMMAND_DELIMITER
    end

    if "".respond_to?(:bytesize)
      def string_size(string)
        string.to_s.bytesize
      end
    else
      def string_size(string)
        string.to_s.size
      end
    end

    def format_reply(reply_type, line)
      case reply_type
      when MINUS    then format_error_reply(line)
      when PLUS     then format_status_reply(line)
      when COLON    then format_integer_reply(line)
      when DOLLAR   then format_bulk_reply(line)
      when ASTERISK then format_multi_bulk_reply(line)
      else raise ProtocolError.new(reply_type)
      end
    end

    def format_error_reply(line)
      raise "-" + line.strip
    end

    def format_status_reply(line)
      line.strip
    end

    def format_integer_reply(line)
      line.to_i
    end

    def format_bulk_reply(line)
      bulklen = line.to_i
      return if bulklen == -1
      reply = encode(@sock.read(bulklen))
      @sock.read(2) # Discard CRLF.
      reply
    end

    def format_multi_bulk_reply(line)
      n = line.to_i
      return if n == -1

      Array.new(n) { read }
    end

    def logging(commands)
      return yield unless @logger && @logger.debug?

      begin
        commands.each do |name, *args|
          @logger.debug("Redis >> #{name.to_s.upcase} #{args.join(" ")}")
        end

        t1 = Time.now
        yield
      ensure
        @logger.debug("Redis >> %0.2fms" % ((Time.now - t1) * 1000))
      end
    end

    def connect_to(host, port)
      with_timeout(@timeout) do
        @sock = TCPSocket.new(host, port)
      end

      @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

      # If the timeout is set we set the low level socket options in order
      # to make sure a blocking read will return after the specified number
      # of seconds. This hack is from memcached ruby client.
      self.timeout = @timeout

    rescue Errno::ECONNREFUSED
      raise Errno::ECONNREFUSED, "Unable to connect to Redis on #{host}:#{port}"
    end

    def timeout=(timeout)
      secs   = Integer(timeout)
      usecs  = Integer((timeout - secs) * 1_000_000)
      optval = [secs, usecs].pack("l_2")

      begin
        @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
        @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
      rescue Errno::ENOPROTOOPT
      end
    end

    def ensure_connected
      connect unless connected?

      begin
        yield
      rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF
        if reconnect
          yield
        else
          raise Errno::ECONNRESET
        end
      end
    end

    class ThreadSafe < self
      def initialize(*args)
        require "monitor"

        super(*args)
        @mutex = ::Monitor.new
      end

      def synchronize(&block)
        @mutex.synchronize(&block)
      end

      def ensure_connected(&block)
        synchronize { super }
      end
    end

    begin
      require "system_timer"

      def with_timeout(seconds, &block)
        SystemTimer.timeout_after(seconds, &block)
      end

    rescue LoadError
      warn "WARNING: using the built-in Timeout class which is known to have issues when used for opening connections. Install the SystemTimer gem if you want to make sure the Redis client will not hang." unless RUBY_VERSION >= "1.9" || RUBY_PLATFORM =~ /java/

      require "timeout"

      def with_timeout(seconds, &block)
        Timeout.timeout(seconds, &block)
      end
    end

    if defined?(Encoding)
      def encode(string)
        string.force_encoding(Encoding::default_external)
      end
    else
      def encode(string)
        string
      end
    end
  end
end