class Mongo::Server::ConnectionPool
@since 2.0.0, largely rewritten in 2.9.0
Represents a connection pool for server connections.
def self.finalize(available_connections, pending_connections, populator)
-
(Proc)- The Finalizer.
Parameters:
-
populator(Populator) -- The populator. -
pending_connections(List) -- The pending connections. -
available_connections(List) -- The available connections.
def self.finalize(available_connections, pending_connections, populator) proc do populator.stop! available_connections.each do |connection| connection.disconnect!(reason: :pool_closed) end available_connections.clear pending_connections.each do |connection| connection.disconnect!(reason: :pool_closed) end pending_connections.clear # Finalizer does not close checked out connections. # Those would have to be garbage collected on their own # and that should close them. end end
def available_count
- Since: - 2.9.0
Returns:
-
(Integer)- Number of available connections.
def available_count raise_if_closed! @lock.synchronize do @available_connections.length end end
def check_in(connection)
- Since: - 2.9.0
Parameters:
-
connection(Mongo::Server::Connection) -- The connection.
def check_in(connection) @lock.synchronize do unless connection.connection_pool == self raise ArgumentError, "Trying to check in a connection which was not checked out by this pool: #{connection} checked out from pool #{connection.connection_pool} (for #{self})" end unless @checked_out_connections.include?(connection) raise ArgumentError, "Trying to check in a connection which is not currently checked out by this pool: #{connection} (for #{self})" end # Note: if an event handler raises, resource will not be signaled. # This means threads waiting for a connection to free up when # the pool is at max size may time out. # Threads that begin waiting after this method completes (with # the exception) should be fine. @checked_out_connections.delete(connection) publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id, self) ) if closed? connection.disconnect!(reason: :pool_closed) return end if connection.closed? # Connection was closed - for example, because it experienced # a network error. Nothing else needs to be done here. @populate_semaphore.signal elsif connection.generation != @generation connection.disconnect!(reason: :stale) @populate_semaphore.signal else connection.record_checkin! @available_connections << connection # Wake up only one thread waiting for an available connection, # since only one connection was checked in. @available_semaphore.signal end end end
def check_out
- Since: - 2.9.0
Raises:
-
(Timeout::Error)- If the connection pool is at maximum size -
(Error::PoolClosedError)- If the pool has been closed.
Returns:
-
(Mongo::Server::Connection)- The checked out connection.
def check_out publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address) ) if closed? publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::POOL_CLOSED ), ) raise Error::PoolClosedError.new(@server.address, self) end deadline = Time.now + wait_timeout connection = nil # It seems that synchronize sets up its own loop, thus a simple break # is insufficient to break the outer loop catch(:done) do loop do # Lock must be taken on each iteration, rather for the method # overall, otherwise other threads will not be able to check in # a connection while this thread is waiting for one. @lock.synchronize do until @available_connections.empty? connection = @available_connections.pop if connection.generation != generation # Stale connections should be disconnected in the clear # method, but if any don't, check again here connection.disconnect!(reason: :stale) @populate_semaphore.signal next end if max_idle_time && connection.last_checkin && Time.now - connection.last_checkin > max_idle_time then connection.disconnect!(reason: :idle) @populate_semaphore.signal next end @pending_connections << connection throw(:done) end # Ruby does not allow a thread to lock a mutex which it already # holds. if unsynchronized_size < max_size connection = create_connection @pending_connections << connection throw(:done) end end wait = deadline - Time.now if wait <= 0 publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::TIMEOUT, ), ) msg = @lock.synchronize do "Timed out attempting to check out a connection " + "from pool for #{@server.address} after #{wait_timeout} sec. " + "Connections in pool: #{@available_connections.length} available, " + "#{@checked_out_connections.length} checked out, " + "#{@pending_connections.length} pending" end raise Error::ConnectionCheckOutTimeout.new(msg, address: @server.address) end @available_semaphore.wait(wait) end end begin connect_connection(connection) rescue Exception # Handshake or authentication failed @lock.synchronize do @pending_connections.delete(connection) end @populate_semaphore.signal publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckOutFailed.new( @server.address, Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR ), ) raise end @lock.synchronize do @checked_out_connections << connection @pending_connections.delete(connection) end publish_cmap_event( Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self), ) connection end
def clear(options = nil)
- Since: - 2.1.0
Returns:
-
(true)- true.
Options Hash:
(**options)-
:stop_populator(true | false) -- Whether to stop -
:lazy(true | false) -- If true, do not close any of
def clear(options = nil) raise_if_closed! if options && options[:stop_populator] stop_populator end @lock.synchronize do @generation += 1 publish_cmap_event( Monitoring::Event::Cmap::PoolCleared.new(@server.address) ) unless options && options[:lazy] until @available_connections.empty? connection = @available_connections.pop connection.disconnect!(reason: :stale) @populate_semaphore.signal end end end true end
def close(options = nil)
- Since: - 2.9.0
Returns:
-
(true)- Always true.
Options Hash:
(**options)-
:force(true | false) -- Also close all checked out
def close(options = nil) return if closed? options ||= {} stop_populator @lock.synchronize do until @available_connections.empty? connection = @available_connections.pop connection.disconnect!(reason: :pool_closed) end if options[:force] until @checked_out_connections.empty? connection = @checked_out_connections.take(1).first connection.disconnect!(reason: :pool_closed) @checked_out_connections.delete(connection) end end # mark pool as closed before releasing lock so # no connections can be created, checked in, or checked out @closed = true end publish_cmap_event( Monitoring::Event::Cmap::PoolClosed.new(@server.address, self) ) true end
def close_idle_sockets
- Since: - 2.5.0
def close_idle_sockets return if closed? return unless max_idle_time @lock.synchronize do i = 0 while i < @available_connections.length connection = @available_connections[i] if last_checkin = connection.last_checkin if (Time.now - last_checkin) > max_idle_time connection.disconnect!(reason: :idle) @available_connections.delete_at(i) @populate_semaphore.signal next end end i += 1 end end end
def closed?
- Since: - 2.9.0
Returns:
-
(true | false)- Whether the pool is closed.
def closed? !!@closed end
def connect_connection(connection)
Attempts to connect (handshake and auth) the connection. If an error is
def connect_connection(connection) begin connection.connect! rescue Exception connection.disconnect!(reason: :error) raise end end
def create_and_add_connection
-
(Mongo::Error)- An error encountered during connection connect
Returns:
-
(true | false)- True if a connection was created and
def create_and_add_connection connection = nil @lock.synchronize do if !closed? && unsynchronized_size < min_size connection = create_connection @pending_connections << connection else return false end end begin connect_connection(connection) rescue Exception @lock.synchronize do @pending_connections.delete(connection) end raise end @lock.synchronize do @available_connections << connection @pending_connections.delete(connection) # wake up one thread waiting for connections, since one was created @available_semaphore.signal end true end
def create_connection
def create_connection connection = Connection.new(@server, options.merge(generation: generation, connection_pool: self)) end
def initialize(server, options = {})
- Since: - 2.0.0, API changed in 2.9.0
Options Hash:
(**options)-
:max_idle_time(Float) -- The time, in seconds, -
:wait_queue_timeout(Float) -- Deprecated. -
:wait_timeout(Float) -- The time to wait, in -
:min_pool_size(Integer) -- Deprecated. -
:min_size(Integer) -- The minimum pool size. -
:max_pool_size(Integer) -- Deprecated. -
:max_size(Integer) -- The maximum pool size.
Parameters:
-
options(Hash) -- The connection pool options. -
server(Server) -- The server which this connection pool is for.
def initialize(server, options = {}) unless server.is_a?(Server) raise ArgumentError, 'First argument must be a Server instance' end options = options.dup if options[:min_size] && options[:min_pool_size] && options[:min_size] != options[:min_pool_size] raise ArgumentError, "Min size #{options[:min_size]} is not identical to min pool size #{options[:min_pool_size]}" end if options[:max_size] && options[:max_pool_size] && options[:max_size] != options[:max_pool_size] raise ArgumentError, "Max size #{options[:max_size]} is not identical to max pool size #{options[:max_pool_size]}" end if options[:wait_timeout] && options[:wait_queue_timeout] && options[:wait_timeout] != options[:wait_queue_timeout] raise ArgumentError, "Wait timeout #{options[:wait_timeout]} is not identical to wait queue timeout #{options[:wait_queue_timeout]}" end options[:min_size] ||= options[:min_pool_size] options.delete(:min_pool_size) options[:max_size] ||= options[:max_pool_size] options.delete(:max_pool_size) if options[:min_size] && options[:max_size] && options[:min_size] > options[:max_size] then raise ArgumentError, "Cannot have min size #{options[:min_size]} exceed max size #{options[:max_size]}" end if options[:wait_queue_timeout] options[:wait_timeout] ||= options[:wait_queue_timeout] end options.delete(:wait_queue_timeout) @server = server @options = options.freeze @generation = 1 @closed = false # A connection owned by this pool should be either in the # available connections array (which is used as a stack) # or in the checked out connections set. @available_connections = available_connections = [] @checked_out_connections = Set.new @pending_connections = Set.new # Mutex used for synchronizing access to @available_connections and # @checked_out_connections. The pool object is thread-safe, thus # all methods that retrieve or modify instance variables generally # must do so under this lock. @lock = Mutex.new # Condition variable broadcast when a connection is added to # @available_connections, to wake up any threads waiting for an # available connection when pool is at max size @available_semaphore = Semaphore.new # Background thread reponsible for maintaining the size of # the pool to at least min_size @populator = Populator.new(self, options) @populate_semaphore = Semaphore.new ObjectSpace.define_finalizer(self, self.class.finalize(@available_connections, @pending_connections, @populator)) publish_cmap_event( Monitoring::Event::Cmap::PoolCreated.new(@server.address, options, self) ) @populator.run! if min_size > 0 end
def inspect
- Since: - 2.0.0
Returns:
-
(String)- The pool inspection.
Other tags:
- Example: Inspect the pool. -
def inspect if closed? "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} closed>" else "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>" end end
def max_idle_time
- Since: - 2.9.0
Returns:
-
(Float | nil)- The max socket idle time in seconds.
def max_idle_time @max_idle_time ||= options[:max_idle_time] end
def max_size
- Since: - 2.9.0
Returns:
-
(Integer)- The maximum size of the connection pool.
def max_size @max_size ||= options[:max_size] || [DEFAULT_MAX_SIZE, min_size].max end
def min_size
- Since: - 2.9.0
Returns:
-
(Integer)- The minimum size of the connection pool.
def min_size @min_size ||= options[:min_size] || DEFAULT_MIN_SIZE end
def populate
- Api: - private
Raises:
-
(Error::AuthError, Error)- The second socket-related error raised if a retry
Returns:
-
(true | false)- Whether this method should be called again
def populate return false if closed? begin return create_and_add_connection rescue Error::SocketError, Error::SocketTimeoutError => e # an error was encountered while connecting the connection, # ignore this first error and try again. log_warn("Populator failed to connect a connection for #{address}: #{e.class}: #{e}. It will retry.") end return create_and_add_connection rescue Error::AuthError, Error # wake up one thread waiting for connections, since one could not # be created here, and can instead be created in flow @available_semaphore.signal raise end
def raise_if_closed!
- Since: - 2.9.0
Raises:
-
(Error::PoolClosedError)- If the pool has been closed.
def raise_if_closed! if closed? raise Error::PoolClosedError.new(@server.address, self) end end
def size
- Since: - 2.9.0
Returns:
-
(Integer)- Size of the connection pool.
def size raise_if_closed! @lock.synchronize do unsynchronized_size end end
def stop_populator
- Api: - private
def stop_populator @populator.stop! @lock.synchronize do # If stop_populator is called while populate is running, there may be # connections waiting to be connected, connections which have not yet # been moved to available_connections, or connections moved to available_connections # but not deleted from pending_connections. These should be cleaned up. until @pending_connections.empty? connection = @pending_connections.take(1).first connection.disconnect! @pending_connections.delete(connection) end end end
def summary
- Since: - 2.11.0
Other tags:
- Api: - experimental
Other tags:
- Note: - This method is experimental and subject to change.
def summary @lock.synchronize do "#<ConnectionPool size=#{unsynchronized_size} (#{min_size}-#{max_size}) " + "used=#{@checked_out_connections.length} avail=#{@available_connections.length} pending=#{@pending_connections.length}>" end end
def unsynchronized_size
already holding the lock as Ruby does not allow a thread holding a
This method should only be used by other pool methods when they are
Returns the size of the connection pool without acquiring the lock.
def unsynchronized_size @available_connections.length + @checked_out_connections.length + @pending_connections.length end
def wait_timeout
- Since: - 2.9.0
Returns:
-
(Float)- The queue wait timeout.
def wait_timeout @wait_timeout ||= options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT end
def with_connection
- Since: - 2.0.0
Returns:
-
(Object)- The result of the block.
Other tags:
- Example: Execute with a connection. -
def with_connection raise_if_closed! connection = check_out yield(connection) ensure if connection check_in(connection) end end