class Mongo::Server::ConnectionPool

@since 2.0.0, largely rewritten in 2.9.0
Represents a connection pool for server connections.

def self.finalize(available_connections, pending_connections, populator)

Returns:
  • (Proc) - The Finalizer.

Parameters:
  • populator (Populator) -- The populator.
  • pending_connections (List) -- The pending connections.
  • available_connections (List) -- The available connections.
def self.finalize(available_connections, pending_connections, populator)
  proc do
    populator.stop!
    available_connections.each do |connection|
      connection.disconnect!(reason: :pool_closed)
    end
    available_connections.clear
    pending_connections.each do |connection|
      connection.disconnect!(reason: :pool_closed)
    end
    pending_connections.clear
    # Finalizer does not close checked out connections.
    # Those would have to be garbage collected on their own
    # and that should close them.
  end
end

def available_count

Other tags:
    Since: - 2.9.0

Returns:
  • (Integer) - Number of available connections.
def available_count
  raise_if_closed!
  @lock.synchronize do
    @available_connections.length
  end
end

def check_in(connection)

Other tags:
    Since: - 2.9.0

Parameters:
  • connection (Mongo::Server::Connection) -- The connection.
def check_in(connection)
  @lock.synchronize do
    unless connection.connection_pool == self
      raise ArgumentError, "Trying to check in a connection which was not checked out by this pool: #{connection} checked out from pool #{connection.connection_pool} (for #{self})"
    end
    unless @checked_out_connections.include?(connection)
      raise ArgumentError, "Trying to check in a connection which is not currently checked out by this pool: #{connection} (for #{self})"
    end
    # Note: if an event handler raises, resource will not be signaled.
    # This means threads waiting for a connection to free up when
    # the pool is at max size may time out.
    # Threads that begin waiting after this method completes (with
    # the exception) should be fine.
    @checked_out_connections.delete(connection)
    publish_cmap_event(
      Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id, self)
    )
    if closed?
      connection.disconnect!(reason: :pool_closed)
      return
    end
    if connection.closed?
      # Connection was closed - for example, because it experienced
      # a network error. Nothing else needs to be done here.
      @populate_semaphore.signal
    elsif connection.generation != @generation
      connection.disconnect!(reason: :stale)
      @populate_semaphore.signal
    else
      connection.record_checkin!
      @available_connections << connection
      # Wake up only one thread waiting for an available connection,
      # since only one connection was checked in.
      @available_semaphore.signal
    end
  end
end

def check_out

Other tags:
    Since: - 2.9.0

Raises:
  • (Timeout::Error) - If the connection pool is at maximum size
  • (Error::PoolClosedError) - If the pool has been closed.

Returns:
  • (Mongo::Server::Connection) - The checked out connection.
def check_out
  publish_cmap_event(
    Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address)
  )
  if closed?
    publish_cmap_event(
      Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
        @server.address,
        Monitoring::Event::Cmap::ConnectionCheckOutFailed::POOL_CLOSED
      ),
    )
    raise Error::PoolClosedError.new(@server.address, self)
  end
  deadline = Time.now + wait_timeout
  connection = nil
  # It seems that synchronize sets up its own loop, thus a simple break
  # is insufficient to break the outer loop
  catch(:done) do
    loop do
      # Lock must be taken on each iteration, rather for the method
      # overall, otherwise other threads will not be able to check in
      # a connection while this thread is waiting for one.
      @lock.synchronize do
        until @available_connections.empty?
          connection = @available_connections.pop
          if connection.generation != generation
            # Stale connections should be disconnected in the clear
            # method, but if any don't, check again here
            connection.disconnect!(reason: :stale)
            @populate_semaphore.signal
            next
          end
          if max_idle_time && connection.last_checkin &&
            Time.now - connection.last_checkin > max_idle_time
          then
            connection.disconnect!(reason: :idle)
            @populate_semaphore.signal
            next
          end
          @pending_connections << connection
          throw(:done)
        end
        # Ruby does not allow a thread to lock a mutex which it already
        # holds.
        if unsynchronized_size < max_size
          connection = create_connection
          @pending_connections << connection
          throw(:done)
        end
      end
      wait = deadline - Time.now
      if wait <= 0
        publish_cmap_event(
          Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
            @server.address,
            Monitoring::Event::Cmap::ConnectionCheckOutFailed::TIMEOUT,
          ),
        )
        msg = @lock.synchronize do
          "Timed out attempting to check out a connection " +
            "from pool for #{@server.address} after #{wait_timeout} sec. " +
            "Connections in pool: #{@available_connections.length} available, " +
            "#{@checked_out_connections.length} checked out, " +
            "#{@pending_connections.length} pending"
        end
        raise Error::ConnectionCheckOutTimeout.new(msg, address: @server.address)
      end
      @available_semaphore.wait(wait)
    end
  end
  begin
    connect_connection(connection)
  rescue Exception
    # Handshake or authentication failed
    @lock.synchronize do
      @pending_connections.delete(connection)
    end
    @populate_semaphore.signal
    publish_cmap_event(
      Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
        @server.address,
        Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR
      ),
    )
    raise
  end
  @lock.synchronize do
    @checked_out_connections << connection
    @pending_connections.delete(connection)
  end
  publish_cmap_event(
    Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self),
  )
  connection
end

def clear(options = nil)

Other tags:
    Since: - 2.1.0

Returns:
  • (true) - true.

Options Hash: (**options)
  • :stop_populator (true | false) -- Whether to stop
  • :lazy (true | false) -- If true, do not close any of
def clear(options = nil)
  raise_if_closed!
  if options && options[:stop_populator]
    stop_populator
  end
  @lock.synchronize do
    @generation += 1
    publish_cmap_event(
      Monitoring::Event::Cmap::PoolCleared.new(@server.address)
    )
    unless options && options[:lazy]
      until @available_connections.empty?
        connection = @available_connections.pop
        connection.disconnect!(reason: :stale)
        @populate_semaphore.signal
      end
    end
  end
  true
end

def close(options = nil)

Other tags:
    Since: - 2.9.0

Returns:
  • (true) - Always true.

Options Hash: (**options)
  • :force (true | false) -- Also close all checked out
def close(options = nil)
  return if closed?
  options ||= {}
  stop_populator
  @lock.synchronize do
    until @available_connections.empty?
      connection = @available_connections.pop
      connection.disconnect!(reason: :pool_closed)
    end
    if options[:force]
      until @checked_out_connections.empty?
        connection = @checked_out_connections.take(1).first
        connection.disconnect!(reason: :pool_closed)
        @checked_out_connections.delete(connection)
      end
    end
    # mark pool as closed before releasing lock so
    # no connections can be created, checked in, or checked out
    @closed = true
  end
  publish_cmap_event(
    Monitoring::Event::Cmap::PoolClosed.new(@server.address, self)
  )
  true
end

def close_idle_sockets

Other tags:
    Since: - 2.5.0
def close_idle_sockets
  return if closed?
  return unless max_idle_time
  @lock.synchronize do
    i = 0
    while i < @available_connections.length
      connection = @available_connections[i]
      if last_checkin = connection.last_checkin
        if (Time.now - last_checkin) > max_idle_time
          connection.disconnect!(reason: :idle)
          @available_connections.delete_at(i)
          @populate_semaphore.signal
          next
        end
      end
      i += 1
    end
  end
end

def closed?

Other tags:
    Since: - 2.9.0

Returns:
  • (true | false) - Whether the pool is closed.
def closed?
  !!@closed
end

def connect_connection(connection)

encountered, closes the connection and raises the error.
Attempts to connect (handshake and auth) the connection. If an error is
def connect_connection(connection)
  begin
    connection.connect!
  rescue Exception
    connection.disconnect!(reason: :error)
    raise
  end
end

def create_and_add_connection

Raises:
  • (Mongo::Error) - An error encountered during connection connect

Returns:
  • (true | false) - True if a connection was created and
def create_and_add_connection
  connection = nil
  @lock.synchronize do
    if !closed? && unsynchronized_size < min_size
      connection = create_connection
      @pending_connections << connection
    else
      return false
    end
  end
  begin
    connect_connection(connection)
  rescue Exception
    @lock.synchronize do
      @pending_connections.delete(connection)
    end
    raise
  end
  @lock.synchronize do
    @available_connections << connection
    @pending_connections.delete(connection)
    # wake up one thread waiting for connections, since one was created
    @available_semaphore.signal
  end
  true
end

def create_connection

def create_connection
  connection = Connection.new(@server, options.merge(generation: generation,
    connection_pool: self))
end

def initialize(server, options = {})

Other tags:
    Since: - 2.0.0, API changed in 2.9.0

Options Hash: (**options)
  • :max_idle_time (Float) -- The time, in seconds,
  • :wait_queue_timeout (Float) -- Deprecated.
  • :wait_timeout (Float) -- The time to wait, in
  • :min_pool_size (Integer) -- Deprecated.
  • :min_size (Integer) -- The minimum pool size.
  • :max_pool_size (Integer) -- Deprecated.
  • :max_size (Integer) -- The maximum pool size.

Parameters:
  • options (Hash) -- The connection pool options.
  • server (Server) -- The server which this connection pool is for.
def initialize(server, options = {})
  unless server.is_a?(Server)
    raise ArgumentError, 'First argument must be a Server instance'
  end
  options = options.dup
  if options[:min_size] && options[:min_pool_size] && options[:min_size] != options[:min_pool_size]
    raise ArgumentError, "Min size #{options[:min_size]} is not identical to min pool size #{options[:min_pool_size]}"
  end
  if options[:max_size] && options[:max_pool_size] && options[:max_size] != options[:max_pool_size]
    raise ArgumentError, "Max size #{options[:max_size]} is not identical to max pool size #{options[:max_pool_size]}"
  end
  if options[:wait_timeout] && options[:wait_queue_timeout] && options[:wait_timeout] != options[:wait_queue_timeout]
    raise ArgumentError, "Wait timeout #{options[:wait_timeout]} is not identical to wait queue timeout #{options[:wait_queue_timeout]}"
  end
  options[:min_size] ||= options[:min_pool_size]
  options.delete(:min_pool_size)
  options[:max_size] ||= options[:max_pool_size]
  options.delete(:max_pool_size)
  if options[:min_size] && options[:max_size] &&
    options[:min_size] > options[:max_size]
  then
    raise ArgumentError, "Cannot have min size #{options[:min_size]} exceed max size #{options[:max_size]}"
  end
  if options[:wait_queue_timeout]
    options[:wait_timeout] ||= options[:wait_queue_timeout]
  end
  options.delete(:wait_queue_timeout)
  @server = server
  @options = options.freeze
  @generation = 1
  @closed = false
  # A connection owned by this pool should be either in the
  # available connections array (which is used as a stack)
  # or in the checked out connections set.
  @available_connections = available_connections = []
  @checked_out_connections = Set.new
  @pending_connections = Set.new
  # Mutex used for synchronizing access to @available_connections and
  # @checked_out_connections. The pool object is thread-safe, thus
  # all methods that retrieve or modify instance variables generally
  # must do so under this lock.
  @lock = Mutex.new
  # Condition variable broadcast when a connection is added to
  # @available_connections, to wake up any threads waiting for an
  # available connection when pool is at max size
  @available_semaphore = Semaphore.new
  # Background thread reponsible for maintaining the size of
  # the pool to at least min_size
  @populator = Populator.new(self, options)
  @populate_semaphore = Semaphore.new
  ObjectSpace.define_finalizer(self, self.class.finalize(@available_connections, @pending_connections, @populator))
  publish_cmap_event(
    Monitoring::Event::Cmap::PoolCreated.new(@server.address, options, self)
  )
  @populator.run! if min_size > 0
end

def inspect

Other tags:
    Since: - 2.0.0

Returns:
  • (String) - The pool inspection.

Other tags:
    Example: Inspect the pool. -
def inspect
  if closed?
    "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
      "wait_timeout=#{wait_timeout} closed>"
  else
    "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
      "wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>"
  end
end

def max_idle_time

Other tags:
    Since: - 2.9.0

Returns:
  • (Float | nil) - The max socket idle time in seconds.
def max_idle_time
  @max_idle_time ||= options[:max_idle_time]
end

def max_size

Other tags:
    Since: - 2.9.0

Returns:
  • (Integer) - The maximum size of the connection pool.
def max_size
  @max_size ||= options[:max_size] || [DEFAULT_MAX_SIZE, min_size].max
end

def min_size

Other tags:
    Since: - 2.9.0

Returns:
  • (Integer) - The minimum size of the connection pool.
def min_size
  @min_size ||= options[:min_size] || DEFAULT_MIN_SIZE
end

def populate

Other tags:
    Api: - private

Raises:
  • (Error::AuthError, Error) - The second socket-related error raised if a retry

Returns:
  • (true | false) - Whether this method should be called again
def populate
  return false if closed?
  begin
    return create_and_add_connection
  rescue Error::SocketError, Error::SocketTimeoutError => e
    # an error was encountered while connecting the connection,
    # ignore this first error and try again.
    log_warn("Populator failed to connect a connection for #{address}: #{e.class}: #{e}. It will retry.")
  end
  return create_and_add_connection
rescue Error::AuthError, Error
  # wake up one thread waiting for connections, since one could not
  # be created here, and can instead be created in flow
  @available_semaphore.signal
  raise
end

def raise_if_closed!

Other tags:
    Since: - 2.9.0

Raises:
  • (Error::PoolClosedError) - If the pool has been closed.
def raise_if_closed!
  if closed?
    raise Error::PoolClosedError.new(@server.address, self)
  end
end

def size

Other tags:
    Since: - 2.9.0

Returns:
  • (Integer) - Size of the connection pool.
def size
  raise_if_closed!
  @lock.synchronize do
    unsynchronized_size
  end
end

def stop_populator

Other tags:
    Api: - private
def stop_populator
  @populator.stop!
  @lock.synchronize do
    # If stop_populator is called while populate is running, there may be
    # connections waiting to be connected, connections which have not yet
    # been moved to available_connections, or connections moved to available_connections
    # but not deleted from pending_connections. These should be cleaned up.
    until @pending_connections.empty?
      connection = @pending_connections.take(1).first
      connection.disconnect!
      @pending_connections.delete(connection)
    end
  end
end

def summary

Other tags:
    Since: - 2.11.0

Other tags:
    Api: - experimental

Other tags:
    Note: - This method is experimental and subject to change.
def summary
  @lock.synchronize do
    "#<ConnectionPool size=#{unsynchronized_size} (#{min_size}-#{max_size}) " +
      "used=#{@checked_out_connections.length} avail=#{@available_connections.length} pending=#{@pending_connections.length}>"
  end
end

def unsynchronized_size

lock to acquire this lock again.
already holding the lock as Ruby does not allow a thread holding a
This method should only be used by other pool methods when they are
Returns the size of the connection pool without acquiring the lock.
def unsynchronized_size
  @available_connections.length + @checked_out_connections.length + @pending_connections.length
end

def wait_timeout

Other tags:
    Since: - 2.9.0

Returns:
  • (Float) - The queue wait timeout.
def wait_timeout
  @wait_timeout ||= options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT
end

def with_connection

Other tags:
    Since: - 2.0.0

Returns:
  • (Object) - The result of the block.

Other tags:
    Example: Execute with a connection. -
def with_connection
  raise_if_closed!
  connection = check_out
  yield(connection)
ensure
  if connection
    check_in(connection)
  end
end