lib/redis/connection/synchrony.rb
require "redis/connection/command_helper" require "redis/connection/registry" require "em-synchrony" require "hiredis/reader" class Redis module Connection class RedisClient < EventMachine::Connection include EventMachine::Deferrable def post_init @req = nil @reader = ::Hiredis::Reader.new end def connection_completed succeed end def receive_data(data) @reader.feed(data) begin until (reply = @reader.gets) == false @req.succeed [:reply, reply] end rescue RuntimeError => err @req.fail [:error, ::Redis::ProtocolError.new(err.message)] end end def read @req = EventMachine::DefaultDeferrable.new EventMachine::Synchrony.sync @req end def send(data) callback { send_data data } end def unbind if @req @req.fail [:error, Errno::ECONNRESET] @req = nil else fail end end end class Synchrony include Redis::Connection::CommandHelper def initialize @timeout = 5_000_000 @state = :disconnected @connection = nil end def connected? @state == :connected end def timeout=(usecs) @timeout = usecs end def connect(host, port, timeout) conn = EventMachine.connect(host, port, RedisClient) do |c| c.pending_connect_timeout = [Float(timeout / 1_000_000), 0.1].max end setup_connect_callbacks(conn, Fiber.current) end def connect_unix(path, timeout) conn = EventMachine.connect_unix_domain(path, RedisClient) setup_connect_callbacks(conn, Fiber.current) end def disconnect @state = :disconnected @connection.close_connection @connection = nil end def write(command) @connection.send(build_command(*command).join(COMMAND_DELIMITER)) end def read type, payload = @connection.read if type == :reply payload elsif type == :error raise payload else raise "Unknown type #{type.inspect}" end end private def setup_connect_callbacks(conn, f) conn.callback do @connection = conn @state = :connected f.resume conn end conn.errback do @connection = conn f.resume :refused end r = Fiber.yield raise Errno::ECONNREFUSED if r == :refused r end end end end Redis::Connection.drivers << Redis::Connection::Synchrony