class Redis::Connection::Ruby
def self.connect(config)
def self.connect(config) if config[:scheme] == "unix" raise ArgumentError, "SSL incompatible with unix sockets" if config[:ssl] sock = UNIXSocket.connect(config[:path], config[:connect_timeout]) elsif config[:scheme] == "rediss" || config[:ssl] sock = SSLSocket.connect(config[:host], config[:port], config[:connect_timeout], config[:ssl_params]) else sock = TCPSocket.connect(config[:host], config[:port], config[:connect_timeout]) end instance = new(sock) instance.timeout = config[:read_timeout] instance.write_timeout = config[:write_timeout] instance.set_tcp_keepalive config[:tcp_keepalive] instance.set_tcp_nodelay if sock.is_a? TCPSocket instance end
def connected?
def connected? !!@sock end
def disconnect
def disconnect @sock.close rescue ensure @sock = nil end
def format_bulk_reply(line)
def format_bulk_reply(line) bulklen = line.to_i return if bulklen == -1 reply = encode(@sock.read(bulklen)) @sock.read(2) # Discard CRLF. reply end
def format_error_reply(line)
def format_error_reply(line) CommandError.new(line.strip) end
def format_integer_reply(line)
def format_integer_reply(line) line.to_i end
def format_multi_bulk_reply(line)
def format_multi_bulk_reply(line) n = line.to_i return if n == -1 Array.new(n) { read } end
def format_reply(reply_type, line)
def format_reply(reply_type, line) case reply_type when MINUS then format_error_reply(line) when PLUS then format_status_reply(line) when COLON then format_integer_reply(line) when DOLLAR then format_bulk_reply(line) when ASTERISK then format_multi_bulk_reply(line) else raise ProtocolError, reply_type end end
def format_status_reply(line)
def format_status_reply(line) line.strip end
def get_tcp_keepalive
def get_tcp_keepalive { time: @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE).int, intvl: @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL).int, probes: @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT).int } end
def get_tcp_keepalive
def get_tcp_keepalive { } end
def initialize(sock)
def initialize(sock) @sock = sock end
def read
def read line = @sock.gets reply_type = line.slice!(0, 1) format_reply(reply_type, line) rescue Errno::EAGAIN raise TimeoutError rescue OpenSSL::SSL::SSLError => ssl_error if ssl_error.message.match?(/SSL_read: unexpected eof while reading/i) raise EOFError, ssl_error.message else raise end end
def set_tcp_keepalive(keepalive)
def set_tcp_keepalive(keepalive) return unless keepalive.is_a?(Hash) @sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, keepalive[:time]) @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, keepalive[:intvl]) @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, keepalive[:probes]) end
def set_tcp_keepalive(keepalive); end
def set_tcp_keepalive(keepalive); end
def set_tcp_nodelay
def set_tcp_nodelay @sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) end
def set_tcp_nodelay; end
def set_tcp_nodelay; end
def timeout=(timeout)
def timeout=(timeout) @sock.timeout = timeout if @sock.respond_to?(:timeout=) end
def write(command)
def write(command) @sock.write(build_command(command)) end
def write_timeout=(timeout)
def write_timeout=(timeout) @sock.write_timeout = timeout end