class Dalli::Protocol::Base
#
handling.
protocols. Contains logic for managing connection state to the server and value
Base class for a single Memcached server, containing logic common to all
#
def alive?
Boolean method used by clients of this class to determine if this
#
def alive? ensure_connected! rescue Dalli::NetworkError # ensure_connected! raises a NetworkError if connection fails. We # want to capture that error and convert it to a boolean value here. false end
def cache_nils?(opts)
def cache_nils?(opts) return false unless opts.is_a?(Hash) opts[:cache_nils] ? true : false end
def connect
def connect @connection_manager.establish_connection @version = version up! end
def drain_pipeline_responses(results)
Used during interleaved pipelined gets to prevent buffer deadlock.
Non-blocking read and processing of any available pipeline responses.
def drain_pipeline_responses(results) return unless connected? # Non-blocking check if socket has data available return unless sock.wait_readable(0) # Read available data without blocking response_buffer.read # Process any complete responses in the buffer loop do status, cas, key, value = response_buffer.process_single_getk_response break if status.nil? # No complete response available results[key] = [value, cas] unless key.nil? end rescue SystemCallError, Dalli::NetworkError # Ignore errors during drain - they'll be handled in fetch_responses nil end
def ensure_connected!
Since this is invoked exclusively in verify_state!, we don't need to worry about
as required
Both this method and connect need to be in this class so we can do auth
socket connection is initialized.
effect of this call. In fact, this is the ONLY place where that
The socket connection to the underlying server is initialized as a side
def ensure_connected! return true if connected? return false unless reconnect_down_server? connect # This call needs to be in this class so we can do auth connected? end
def finish_pipeline
Called after the noop response is received at the end of a set
def finish_pipeline response_buffer.clear @connection_manager.finish_request! true # to simplify response end
def initialize(attribs, client_options = {})
def initialize(attribs, client_options = {}) hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs) warn_uri_credentials(user_creds) @options = client_options.merge(user_creds) @raw_mode = client_options[:raw] @value_marshaller = @raw_mode ? StringMarshaller.new(@options) : ValueMarshaller.new(@options) @connection_manager = ConnectionManager.new(hostname, port, socket_type, @options) end
def lock!; end
def lock!; end
def log_marshal_err(key, err)
def log_marshal_err(key, err) Dalli.logger.error "Marshalling error for key '#{key}': #{err.message}" Dalli.logger.error 'You are trying to cache a Ruby object which cannot be serialized to memcached.' end
def log_unexpected_err(err)
def log_unexpected_err(err) Dalli.logger.error "Unexpected exception during Dalli request: #{err.class.name}: #{err.message}" Dalli.logger.error err.backtrace.join("\n\t") end
def pipeline_abort
disconnected, and the exception is swallowed.
timeout during pipelined get. The underlying socket is
Abort current pipelined get. Generally used to signal an external
def pipeline_abort response_buffer.clear @connection_manager.abort_request! return true unless connected? # Closes the connection, which ensures that our connection # is in a clean state for future requests @connection_manager.error_on_request!('External timeout') rescue NetworkError true end
def pipeline_complete?
def pipeline_complete? !response_buffer.in_progress? end
def pipeline_next_responses(&block)
Without a block, returns a Hash of { key => [value, cas] }.
avoiding intermediate Hash allocation. Returns nil.
When a block is given, yields (key, value, cas) for each response,
#pipeline_complete?.
repeatedly whenever this server's socket is readable until
from this server. After #pipeline_response_setup, this should be invoked
Attempt to receive and parse as many key/value pairs as possible
def pipeline_next_responses(&block) reconnect_on_pipeline_complete! values = nil response_buffer.read status, cas, key, value = response_buffer.process_single_getk_response # status is not nil only if we have a full response to parse # in the buffer until status.nil? # If the status is ok and key is nil, then this is the response # to the noop at the end of the pipeline finish_pipeline && break if status && key.nil? # If the status is ok and the key is not nil, then this is a # getkq response with a value that we want to set in the response hash unless key.nil? if block yield key, value, cas else values ||= {} values[key] = [value, cas] end end # Get the next response from the buffer status, cas, key, value = response_buffer.process_single_getk_response end values || {} rescue SystemCallError, *TIMEOUT_ERRORS, *SSL_ERRORS, EOFError => e @connection_manager.error_on_request!(e) end
def pipeline_response_setup
flushing responses for kv pairs that were found.
after a series of GETKQ commands. A NOOP is sent, and the server begins
Start reading key/value pairs from this connection. This is usually called
def pipeline_response_setup verify_pipelined_state(:getkq) write_noop # Use ensure_ready instead of reset to preserve any data already buffered # during interleaved pipelined get draining response_buffer.ensure_ready end
def pipelined_get(keys)
def pipelined_get(keys) # Clear buffer to remove any stale data from interrupted operations. # Use clear (not reset) to keep pipeline_complete? = true, which is # the expected state before pipeline_response_setup is called. response_buffer.clear req = +'' keys.each do |key| req << quiet_get_request(key) end # Could send noop here instead of in pipeline_response_setup write(req) end
def pipelined_get_interleaved(keys, chunk_size, results)
This prevents socket buffer deadlock when sending many keys.
For large batches, interleave writing requests with draining responses.
def pipelined_get_interleaved(keys, chunk_size, results) # Initialize the response buffer for draining during send phase response_buffer.ensure_ready keys.each_slice(chunk_size) do |chunk| # Build and write this chunk of requests req = +'' chunk.each do |key| req << quiet_get_request(key) end write(req) @connection_manager.flush # Drain any available responses directly into results hash drain_pipeline_responses(results) end end
def quiet?
def quiet? Thread.current[::Dalli::QUIET] end
def raw_mode?
Returns true if client is in raw mode (no serialization/compression).
def raw_mode? @raw_mode end
def reconnect_on_pipeline_complete!
def reconnect_on_pipeline_complete! @connection_manager.reconnect! 'pipelined get has completed' if pipeline_complete? end
def request(opkey, *args)
def request(opkey, *args) verify_state(opkey) begin @connection_manager.start_request! response = send(opkey, *args) # pipelined_get/pipelined_get_interleaved emit query but don't read the response(s) @connection_manager.finish_request! unless %i[pipelined_get pipelined_get_interleaved].include?(opkey) response rescue Dalli::MarshalError => e log_marshal_err(args.first, e) raise rescue Dalli::DalliError raise rescue StandardError => e log_unexpected_err(e) close raise end end
def response_buffer
def response_buffer @response_buffer ||= ResponseBuffer.new(@connection_manager, response_processor) end
def unlock!; end
def unlock!; end
def verify_allowed_quiet!(opkey)
def verify_allowed_quiet!(opkey) return if ALLOWED_QUIET_OPS.include?(opkey) raise Dalli::NotPermittedMultiOpError, "The operation #{opkey} is not allowed in a quiet block." end
def verify_pipelined_state(_opkey)
def verify_pipelined_state(_opkey) @connection_manager.confirm_in_progress! raise_down_error unless connected? end
def verify_state(opkey)
whether the connection is in use, and whether the command is allowed
Checks to see if we can execute the specified operation. Checks
#
def verify_state(opkey) @connection_manager.confirm_ready! verify_allowed_quiet!(opkey) if quiet? # The ensure_connected call has the side effect of connecting the # underlying socket if it is not connected, or there's been a disconnect # because of timeout or other error. Method raises an error # if it can't connect raise_down_error unless ensure_connected! end
def warn_uri_credentials(user_creds)
def warn_uri_credentials(user_creds) return if user_creds[:username].nil? && user_creds[:password].nil? Dalli.logger.warn(URI_CREDENTIAL_WARNING) end