class Sequel::ThreadedConnectionPool
This is the default connection pool used by Sequel.
A connection pool allowing multi-threaded access to a pool of connections.
def _size
The total number of connections opened, either available or allocated.
def _size @allocated.length + @available_connections.length end
def acquire(thread)
This should return a connection is one is available within the timeout,
calling this.
is available. The calling code should NOT already have the mutex when
Assigns a connection to the supplied thread, if one
def acquire(thread) if conn = assign_connection(thread) return conn end timeout = @timeout timer = Sequel.start_timer if conn = acquire_available(thread, timeout) return conn end until conn = assign_connection(thread) elapsed = Sequel.elapsed_seconds_since(timer) # :nocov: raise_pool_timeout(elapsed) if elapsed > timeout # It's difficult to get to this point, it can only happen if there is a race condition # where a connection cannot be acquired even after the thread is signalled by the condition variable if conn = acquire_available(thread, timeout - elapsed) return conn end # :nocov: end conn end
def acquire_available(thread, timeout)
def acquire_available(thread, timeout) sync do # Check if connection was checked in between when assign_connection failed and now. # This is very unlikely, but necessary to prevent a situation where the waiter # will wait for a connection even though one has already been checked in. # :nocov: if conn = next_available return(@allocated[thread] = conn) end # :nocov: @waiter.wait(@mutex, timeout) # Connection still not available, could be because a connection was disconnected, # may have to retry assign_connection to see if a new connection can be made. if conn = next_available return(@allocated[thread] = conn) end end end
def all_connections
it is yielding all of the available connections, which means that until
threads, as it is not safe to operate on them. This holds the mutex while
this thread. This will not yield connections currently allocated to other
Yield all of the available connections, and the one currently allocated to
def all_connections hold do |c| sync do yield c @available_connections.each{|conn| yield conn} end end end
def assign_connection(thread)
Assign a connection to the thread, or return nil if one cannot be assigned.
def assign_connection(thread) # Thread safe as instance variable is only assigned to local variable # and not operated on outside mutex. allocated = @allocated do_make_new = false to_disconnect = nil sync do if conn = next_available return(allocated[thread] = conn) end if (n = _size) >= (max = @max_size) allocated.keys.each do |t| unless t.alive? (to_disconnect ||= []) << allocated.delete(t) end end n = nil end if (n || _size) < max do_make_new = allocated[thread] = true end end if to_disconnect to_disconnect.each{|dconn| disconnect_connection(dconn)} end # Connect to the database outside of the connection pool mutex, # as that can take a long time and the connection pool mutex # shouldn't be locked while the connection takes place. if do_make_new begin conn = make_new(:default) sync{allocated[thread] = conn} ensure unless conn sync{allocated.delete(thread)} end end end conn end
def checkin_connection(conn)
Return a connection to the pool of available connections, returns the connection.
def checkin_connection(conn) @available_connections << conn conn end
def disconnect(opts=OPTS)
Once a connection is requested using #hold, the connection pool
pass the servers: {} option when connecting to the database.
This connection pool does not, for performance reasons. To use the sharded pool,
currently in use, use the ShardedThreadedConnectionPool, which can do that.
being used. If you want to be able to disconnect connections that are
disconnecting from the database, assuming that no connections are currently
Removes all connections currently available. This method has the effect of
def disconnect(opts=OPTS) conns = nil sync do conns = @available_connections.dup @available_connections.clear @waiter.signal end conns.each{|conn| disconnect_connection(conn)} end
def hold(server=nil)
is available or the timeout expires. If the timeout expires before a
number of connections, Pool#hold will block until a connection
If no connection is immediately available and the pool is already using the maximum
the same thread without blocking.
Pool#hold is re-entrant, meaning it can be called recursively in
pool.hold {|conn| conn.execute('DROP TABLE posts')}
block:
available, creates a new connection. Passes the connection to the supplied
Chooses the first available connection, or if none are
def hold(server=nil) t = Sequel.current if conn = owned_connection(t) return yield(conn) end begin conn = acquire(t) yield conn rescue Sequel::DatabaseDisconnectError, *@error_classes => e if disconnect_error?(e) oconn = conn conn = nil disconnect_connection(oconn) if oconn sync do @allocated.delete(t) @waiter.signal end end raise ensure if conn sync{release(t)} if @connection_handling == :disconnect disconnect_connection(conn) end end end end
def initialize(db, opts = OPTS)
:pool_timeout :: The amount of seconds to wait to acquire a connection
will open (default 4)
:max_connections :: The maximum number of connections the connection pool
The following additional options are respected:
def initialize(db, opts = OPTS) super @max_size = Integer(opts[:max_connections] || 4) raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1 @mutex = Mutex.new @connection_handling = opts[:connection_handling] @available_connections = [] @allocated = {} @allocated.compare_by_identity @timeout = Float(opts[:pool_timeout] || 5) @waiter = ConditionVariable.new end
def next_available
is not currently an available connection. The calling code should already
Return the next available connection in the pool, or nil if there
def next_available case @connection_handling when :stack @available_connections.pop else @available_connections.shift end end
def owned_connection(thread)
Returns the connection owned by the supplied thread,
def owned_connection(thread) sync{@allocated[thread]} end
def pool_type
def pool_type :threaded end
def preconnect(concurrent = false)
Create the maximum number of connections immediately. The calling code should
def preconnect(concurrent = false) enum = (max_size - _size).times conns = if concurrent enum.map{Thread.new{make_new(:default)}}.map(&:value) else enum.map{make_new(:default)} end sync{conns.each{|conn| checkin_connection(conn)}} end
def raise_pool_timeout(elapsed)
Raise a PoolTimeout error showing the current timeout, the elapsed time, and the
def raise_pool_timeout(elapsed) name = db.opts[:name] raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}" end
def release(thread)
Releases the connection assigned to the supplied thread back to the pool.
def release(thread) conn = @allocated.delete(thread) unless @connection_handling == :disconnect checkin_connection(conn) end @waiter.signal # Ensure that after signalling the condition, some other thread is given the # opportunity to acquire the mutex. # See <https://github.com/socketry/async/issues/99> for more context. sleep(0) nil end
def size
The total number of connections opened, either available or allocated.
def size @mutex.synchronize{_size} end
def sync
Yield to the block while inside the mutex. The calling code should NOT
def sync @mutex.synchronize{yield} end