lib/dalli/protocol/base.rb



# frozen_string_literal: true

require 'forwardable'
require 'socket'
require 'timeout'

module Dalli
  module Protocol
    ##
    # Base class for a single Memcached server, containing logic common to all
    # protocols.  Contains logic for managing connection state to the server and value
    # handling.
    ##
    class Base
      extend Forwardable

      attr_accessor :weight, :options

      def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default?
      def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout,
                     :socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error

      def initialize(attribs, client_options = {})
        hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs)
        warn_uri_credentials(user_creds)
        @options = client_options.merge(user_creds)
        @raw_mode = client_options[:raw]
        @value_marshaller = @raw_mode ? StringMarshaller.new(@options) : ValueMarshaller.new(@options)
        @connection_manager = ConnectionManager.new(hostname, port, socket_type, @options)
      end

      # Returns true if client is in raw mode (no serialization/compression).
      # In raw mode, we can skip requesting bitflags from the server.
      def raw_mode?
        @raw_mode
      end

      # Chokepoint method for error handling and ensuring liveness
      def request(opkey, *args)
        verify_state(opkey)

        begin
          @connection_manager.start_request!
          response = send(opkey, *args)

          # pipelined_get/pipelined_get_interleaved emit query but don't read the response(s)
          @connection_manager.finish_request! unless %i[pipelined_get pipelined_get_interleaved].include?(opkey)

          response
        rescue Dalli::MarshalError => e
          log_marshal_err(args.first, e)
          raise
        rescue Dalli::DalliError
          raise
        rescue StandardError => e
          log_unexpected_err(e)
          close
          raise
        end
      end

      ##
      # Boolean method used by clients of this class to determine if this
      # particular memcached instance is available for use.
      def alive?
        ensure_connected!
      rescue Dalli::NetworkError
        # ensure_connected! raises a NetworkError if connection fails.  We
        # want to capture that error and convert it to a boolean value here.
        false
      end

      def lock!; end

      def unlock!; end

      # Start reading key/value pairs from this connection. This is usually called
      # after a series of GETKQ commands. A NOOP is sent, and the server begins
      # flushing responses for kv pairs that were found.
      #
      # Returns nothing.
      def pipeline_response_setup
        verify_pipelined_state(:getkq)
        write_noop
        # Use ensure_ready instead of reset to preserve any data already buffered
        # during interleaved pipelined get draining
        response_buffer.ensure_ready
      end

      # Attempt to receive and parse as many key/value pairs as possible
      # from this server. After #pipeline_response_setup, this should be invoked
      # repeatedly whenever this server's socket is readable until
      # #pipeline_complete?.
      #
      # When a block is given, yields (key, value, cas) for each response,
      # avoiding intermediate Hash allocation. Returns nil.
      # Without a block, returns a Hash of { key => [value, cas] }.
      # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
      def pipeline_next_responses(&block)
        reconnect_on_pipeline_complete!
        values = nil

        response_buffer.read

        status, cas, key, value = response_buffer.process_single_getk_response
        # status is not nil only if we have a full response to parse
        # in the buffer
        until status.nil?
          # If the status is ok and key is nil, then this is the response
          # to the noop at the end of the pipeline
          finish_pipeline && break if status && key.nil?

          # If the status is ok and the key is not nil, then this is a
          # getkq response with a value that we want to set in the response hash
          unless key.nil?
            if block
              yield key, value, cas
            else
              values ||= {}
              values[key] = [value, cas]
            end
          end

          # Get the next response from the buffer
          status, cas, key, value = response_buffer.process_single_getk_response
        end

        values || {}
      rescue SystemCallError, *TIMEOUT_ERRORS, *SSL_ERRORS, EOFError => e
        @connection_manager.error_on_request!(e)
      end
      # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

      # Abort current pipelined get. Generally used to signal an external
      # timeout during pipelined get.  The underlying socket is
      # disconnected, and the exception is swallowed.
      #
      # Returns nothing.
      def pipeline_abort
        response_buffer.clear
        @connection_manager.abort_request!
        return true unless connected?

        # Closes the connection, which ensures that our connection
        # is in a clean state for future requests
        @connection_manager.error_on_request!('External timeout')
      rescue NetworkError
        true
      end

      # Did the last call to #pipeline_response_setup complete successfully?
      def pipeline_complete?
        !response_buffer.in_progress?
      end

      def quiet?
        Thread.current[::Dalli::QUIET]
      end
      alias multi? quiet?

      # NOTE: Additional public methods should be overridden in Dalli::Threadsafe

      private

      URI_CREDENTIAL_WARNING = 'Dalli 5.0 removed SASL authentication. ' \
                               'Credentials in memcached:// URIs are ignored.'
      private_constant :URI_CREDENTIAL_WARNING

      def warn_uri_credentials(user_creds)
        return if user_creds[:username].nil? && user_creds[:password].nil?

        Dalli.logger.warn(URI_CREDENTIAL_WARNING)
      end

      ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze
      private_constant :ALLOWED_QUIET_OPS

      def verify_allowed_quiet!(opkey)
        return if ALLOWED_QUIET_OPS.include?(opkey)

        raise Dalli::NotPermittedMultiOpError, "The operation #{opkey} is not allowed in a quiet block."
      end

      ##
      # Checks to see if we can execute the specified operation.  Checks
      # whether the connection is in use, and whether the command is allowed
      ##
      def verify_state(opkey)
        @connection_manager.confirm_ready!
        verify_allowed_quiet!(opkey) if quiet?

        # The ensure_connected call has the side effect of connecting the
        # underlying socket if it is not connected, or there's been a disconnect
        # because of timeout or other error.  Method raises an error
        # if it can't connect
        raise_down_error unless ensure_connected!
      end

      def verify_pipelined_state(_opkey)
        @connection_manager.confirm_in_progress!
        raise_down_error unless connected?
      end

      # The socket connection to the underlying server is initialized as a side
      # effect of this call.  In fact, this is the ONLY place where that
      # socket connection is initialized.
      #
      # Both this method and connect need to be in this class so we can do auth
      # as required
      #
      # Since this is invoked exclusively in verify_state!, we don't need to worry about
      # thread safety.  Using it elsewhere may require revisiting that assumption.
      def ensure_connected!
        return true if connected?
        return false unless reconnect_down_server?

        connect # This call needs to be in this class so we can do auth
        connected?
      end

      def cache_nils?(opts)
        return false unless opts.is_a?(Hash)

        opts[:cache_nils] ? true : false
      end

      def connect
        @connection_manager.establish_connection
        @version = version
        up!
      end

      def pipelined_get(keys)
        # Clear buffer to remove any stale data from interrupted operations.
        # Use clear (not reset) to keep pipeline_complete? = true, which is
        # the expected state before pipeline_response_setup is called.
        response_buffer.clear

        req = +''
        keys.each do |key|
          req << quiet_get_request(key)
        end
        # Could send noop here instead of in pipeline_response_setup
        write(req)
      end

      # For large batches, interleave writing requests with draining responses.
      # This prevents socket buffer deadlock when sending many keys.
      # Populates the provided results hash with any responses drained during send.
      def pipelined_get_interleaved(keys, chunk_size, results)
        # Initialize the response buffer for draining during send phase
        response_buffer.ensure_ready

        keys.each_slice(chunk_size) do |chunk|
          # Build and write this chunk of requests
          req = +''
          chunk.each do |key|
            req << quiet_get_request(key)
          end
          write(req)
          @connection_manager.flush

          # Drain any available responses directly into results hash
          drain_pipeline_responses(results)
        end
      end

      # Non-blocking read and processing of any available pipeline responses.
      # Used during interleaved pipelined gets to prevent buffer deadlock.
      # Populates the provided results hash directly to avoid allocation overhead.
      def drain_pipeline_responses(results)
        return unless connected?

        # Non-blocking check if socket has data available
        return unless sock.wait_readable(0)

        # Read available data without blocking
        response_buffer.read

        # Process any complete responses in the buffer
        loop do
          status, cas, key, value = response_buffer.process_single_getk_response
          break if status.nil? # No complete response available

          results[key] = [value, cas] unless key.nil?
        end
      rescue SystemCallError, Dalli::NetworkError
        # Ignore errors during drain - they'll be handled in fetch_responses
        nil
      end

      def response_buffer
        @response_buffer ||= ResponseBuffer.new(@connection_manager, response_processor)
      end

      # Called after the noop response is received at the end of a set
      # of pipelined gets
      def finish_pipeline
        response_buffer.clear
        @connection_manager.finish_request!

        true # to simplify response
      end

      def reconnect_on_pipeline_complete!
        @connection_manager.reconnect! 'pipelined get has completed' if pipeline_complete?
      end

      def log_marshal_err(key, err)
        Dalli.logger.error "Marshalling error for key '#{key}': #{err.message}"
        Dalli.logger.error 'You are trying to cache a Ruby object which cannot be serialized to memcached.'
      end

      def log_unexpected_err(err)
        Dalli.logger.error "Unexpected exception during Dalli request: #{err.class.name}: #{err.message}"
        Dalli.logger.error err.backtrace.join("\n\t")
      end
    end
  end
end