module EventMachine::Protocols::Memcache
def self.connect host = 'localhost', port = 11211
def self.connect host = 'localhost', port = 11211 EM.connect host, port, self, host, port end
def connection_completed
- Private: -
def connection_completed @get_cbs = [] @set_cbs = [] @del_cbs = [] @values = {} @reconnecting = false @connected = true succeed # set_delimiter "\r\n" # set_line_mode end
def delete key, expires = 0, &cb
cache.del(:b){ puts "deleted the value!" }
cache.del :a
Delete the value associated with a key
def delete key, expires = 0, &cb callback{ send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n" @del_cbs << cb if cb } end
def get *keys
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
cache.get(:a){ |v| p v }
Get the value associated with one or multiple keys
def get *keys raise ArgumentError unless block_given? callback{ keys = keys.map{|k| k.to_s.gsub(/\s/,'_') } send_data "get #{keys.join(' ')}\r\n" @get_cbs << [keys, proc{ |values| yield *keys.map{ |k| values[k] } }] } end
def get_hash *keys
cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }
Gets multiple values as a hash
def get_hash *keys raise ArgumentError unless block_given? get *keys do |*values| yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] } end end
def initialize host, port = 11211
- Private: -
def initialize host, port = 11211 @host, @port = host, port end
def process_cmd line
- Private: -
def process_cmd line case line.strip when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes> bytes = Integer($3) # set_binary_mode bytes+2 # @cur_key = $1 if @buffer.size >= bytes + 2 @values[$1] = @buffer.slice!(0,bytes) @buffer.slice!(0,2) # \r\n else raise ParserError end when Cend # END if entry = @get_cbs.shift keys, cb = entry cb.call(@values) end @values = {} when Cstored # STORED if cb = @set_cbs.shift cb.call(true) end when Cdeleted # DELETED if cb = @del_cbs.shift cb.call(true) end when Cunknown # NOT_FOUND if cb = @del_cbs.shift cb.call(false) end else p [:MEMCACHE_UNKNOWN, line] end end
def receive_data data
- Private: -
def receive_data data (@buffer||='') << data while index = @buffer.index(Cdelimiter) begin line = @buffer.slice!(0,index+2) process_cmd line rescue ParserError @buffer[0...0] = line break end end end
def send_cmd cmd, key, flags = 0, exptime = 0, bytes = 0, noreply = false
def send_cmd cmd, key, flags = 0, exptime = 0, bytes = 0, noreply = false send_data "#{cmd} #{key} #{flags} #{exptime} #{bytes}#{noreply ? ' noreply' : ''}\r\n" end
def set key, val, exptime = 0, &cb
cache.set(:missing, 'abc'){ puts "stored the value!" }
cache.set :a, 'hello'
Set the value for a given key
def set key, val, exptime = 0, &cb callback{ val = val.to_s send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given? send_data val send_data Cdelimiter @set_cbs << cb if cb } end
def unbind
- Private: -
def unbind if @connected or @reconnecting EM.add_timer(1){ reconnect @host, @port } @connected = false @reconnecting = true @deferred_status = nil else raise 'Unable to connect to memcached server' end end