class Sequel::ThreadedConnectionPool

This is the default connection pool used by Sequel.
A connection pool allowing multi-threaded access to a pool of connections.

def _size

The calling code should already have the mutex before calling this.
The total number of connections opened, either available or allocated.
def _size
  @allocated.length + @available_connections.length
end

def acquire(thread)

or raise PoolTimeout if a connection could not be acquired within the timeout.
This should return a connection is one is available within the timeout,

calling this.
is available. The calling code should NOT already have the mutex when
Assigns a connection to the supplied thread, if one
def acquire(thread)
  if conn = assign_connection(thread)
    return conn
  end
  timeout = @timeout
  timer = Sequel.start_timer
  if conn = acquire_available(thread, timeout)
    return conn
  end
  until conn = assign_connection(thread)
    elapsed = Sequel.elapsed_seconds_since(timer)
    # :nocov:
    raise_pool_timeout(elapsed) if elapsed > timeout
    # It's difficult to get to this point, it can only happen if there is a race condition
    # where a connection cannot be acquired even after the thread is signalled by the condition variable
    if conn = acquire_available(thread, timeout - elapsed)
      return conn
    end
    # :nocov:
  end
  conn
end

def acquire_available(thread, timeout)

Acquire a connection if one is already available, or waiting until it becomes available.
def acquire_available(thread, timeout)
  sync do
    # Check if connection was checked in between when assign_connection failed and now.
    # This is very unlikely, but necessary to prevent a situation where the waiter
    # will wait for a connection even though one has already been checked in.
    # :nocov:
    if conn = next_available
      return(@allocated[thread] = conn)
    end
    # :nocov:
    @waiter.wait(@mutex, timeout)
    # Connection still not available, could be because a connection was disconnected,
    # may have to retry assign_connection to see if a new connection can be made.
    if conn = next_available
      return(@allocated[thread] = conn)
    end
  end
end

def all_connections

the method's block returns, the pool is locked.
it is yielding all of the available connections, which means that until
threads, as it is not safe to operate on them. This holds the mutex while
this thread. This will not yield connections currently allocated to other
Yield all of the available connections, and the one currently allocated to
def all_connections
  hold do |c|
    sync do
      yield c
      @available_connections.each{|conn| yield conn}
    end
  end
end

def assign_connection(thread)

The caller should NOT have the mutex before calling this.
Assign a connection to the thread, or return nil if one cannot be assigned.
def assign_connection(thread)
  # Thread safe as instance variable is only assigned to local variable
  # and not operated on outside mutex.
  allocated = @allocated
  do_make_new = false
  to_disconnect = nil
  sync do
    if conn = next_available
      return(allocated[thread] = conn)
    end
    if (n = _size) >= (max = @max_size)
      allocated.keys.each do |t|
        unless t.alive?
          (to_disconnect ||= []) << allocated.delete(t)
        end
      end
      n = nil
    end
    if (n || _size) < max
      do_make_new = allocated[thread] = true
    end
  end
  if to_disconnect
    to_disconnect.each{|dconn| disconnect_connection(dconn)}
  end
  # Connect to the database outside of the connection pool mutex,
  # as that can take a long time and the connection pool mutex
  # shouldn't be locked while the connection takes place.
  if do_make_new
    begin
      conn = make_new(:default)
      sync{allocated[thread] = conn}
    ensure
      unless conn
        sync{allocated.delete(thread)}
      end
    end
  end
  conn
end

def checkin_connection(conn)

The calling code should already have the mutex before calling this.
Return a connection to the pool of available connections, returns the connection.
def checkin_connection(conn)
  @available_connections << conn
  conn
end

def disconnect(opts=OPTS)

creates new connections to the database.
Once a connection is requested using #hold, the connection pool

pass the servers: {} option when connecting to the database.
This connection pool does not, for performance reasons. To use the sharded pool,
currently in use, use the ShardedThreadedConnectionPool, which can do that.
being used. If you want to be able to disconnect connections that are
disconnecting from the database, assuming that no connections are currently
Removes all connections currently available. This method has the effect of
def disconnect(opts=OPTS)
  conns = nil
  sync do
    conns = @available_connections.dup
    @available_connections.clear
    @waiter.signal
  end
  conns.each{|conn| disconnect_connection(conn)}
end

def hold(server=nil)

connection can be acquired, a Sequel::PoolTimeout is raised.
is available or the timeout expires. If the timeout expires before a
number of connections, Pool#hold will block until a connection
If no connection is immediately available and the pool is already using the maximum

the same thread without blocking.
Pool#hold is re-entrant, meaning it can be called recursively in

pool.hold {|conn| conn.execute('DROP TABLE posts')}

block:
available, creates a new connection. Passes the connection to the supplied
Chooses the first available connection, or if none are
def hold(server=nil)
  t = Sequel.current
  if conn = owned_connection(t)
    return yield(conn)
  end
  begin
    conn = acquire(t)
    yield conn
  rescue Sequel::DatabaseDisconnectError, *@error_classes => e
    if disconnect_error?(e)
      oconn = conn
      conn = nil
      disconnect_connection(oconn) if oconn
      sync do 
        @allocated.delete(t)
        @waiter.signal
      end
    end
    raise
  ensure
    if conn
      sync{release(t)}
      if @connection_handling == :disconnect
        disconnect_connection(conn)
      end
    end
  end
end

def initialize(db, opts = OPTS)

before raising a PoolTimeout error (default 5)
:pool_timeout :: The amount of seconds to wait to acquire a connection
will open (default 4)
:max_connections :: The maximum number of connections the connection pool
The following additional options are respected:
def initialize(db, opts = OPTS)
  super
  @max_size = Integer(opts[:max_connections] || 4)
  raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
  @mutex = Mutex.new  
  @connection_handling = opts[:connection_handling]
  @available_connections = []
  @allocated = {}
  @allocated.compare_by_identity
  @timeout = Float(opts[:pool_timeout] || 5)
  @waiter = ConditionVariable.new
end

def next_available

have the mutex before calling this.
is not currently an available connection. The calling code should already
Return the next available connection in the pool, or nil if there
def next_available
  case @connection_handling
  when :stack
    @available_connections.pop
  else
    @available_connections.shift
  end
end

def owned_connection(thread)

if any. The calling code should NOT already have the mutex before calling this.
Returns the connection owned by the supplied thread,
def owned_connection(thread)
  sync{@allocated[thread]}
end

def pool_type

def pool_type
  :threaded
end

def preconnect(concurrent = false)

NOT have the mutex before calling this.
Create the maximum number of connections immediately. The calling code should
def preconnect(concurrent = false)
  enum = (max_size - _size).times
  conns = if concurrent
    enum.map{Thread.new{make_new(:default)}}.map(&:value)
  else
    enum.map{make_new(:default)}
  end
  sync{conns.each{|conn| checkin_connection(conn)}}
end

def raise_pool_timeout(elapsed)

database's name (if any).
Raise a PoolTimeout error showing the current timeout, the elapsed time, and the
def raise_pool_timeout(elapsed)
  name = db.opts[:name]
  raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}"
end

def release(thread)

The calling code should already have the mutex before calling this.
Releases the connection assigned to the supplied thread back to the pool.
def release(thread)
  conn = @allocated.delete(thread)
  unless @connection_handling == :disconnect
    checkin_connection(conn)
  end
  @waiter.signal
  
  # Ensure that after signalling the condition, some other thread is given the
  # opportunity to acquire the mutex.
  # See <https://github.com/socketry/async/issues/99> for more context.
  sleep(0)
  
  nil
end

def size

The calling code should not have the mutex before calling this.
The total number of connections opened, either available or allocated.
def size
  @mutex.synchronize{_size}
end

def sync

already have the mutex before calling this.
Yield to the block while inside the mutex. The calling code should NOT
def sync
  @mutex.synchronize{yield}
end