class ActiveRecord::ConnectionAdapters::ConnectionPool::Queue
with which it shares a Monitor.
Threadsafe, fair, LIFO queue. Meant to be used by ConnectionPool
def add(element)
def add(element) synchronize do @queue.push element @cond.signal end end
def any?
def any? !@queue.empty? end
def any_waiting?
def any_waiting? synchronize do @num_waiting > 0 end end
def can_remove_no_wait?
connections is strictly greater than the number of waiting
waiting if and only if the number of currently available
A thread can remove an element from the queue without
def can_remove_no_wait? @queue.size > @num_waiting end
def clear
def clear synchronize do @queue.clear end end
def delete(element)
def delete(element) synchronize do @queue.delete(element) end end
def initialize(lock = Monitor.new)
def initialize(lock = Monitor.new) @lock = lock @cond = @lock.new_cond @num_waiting = 0 @queue = [] end
def internal_poll(timeout)
def internal_poll(timeout) no_wait_poll || (timeout && wait_poll(timeout)) end
def no_wait_poll
available elements is strictly greater than the number of
Remove and return the head of the queue if the number of
def no_wait_poll remove if can_remove_no_wait? end
def num_waiting
Returns the number of threads currently waiting on this
def num_waiting synchronize do @num_waiting end end
def poll(timeout = nil)
- ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element
Raises:
become available.
available, waiting up to +timeout+ seconds for an element to
If +timeout+ is given, block if there is no element
is, don't jump ahead in line). Otherwise, return +nil+.
greater than the number of threads currently waiting (that
queue if the number of available elements is strictly
If +timeout+ is not given, remove and return the head of the
Remove the head of the queue.
def poll(timeout = nil) synchronize { internal_poll(timeout) } end
def remove
def remove @queue.pop end
def synchronize(&block)
def synchronize(&block) @lock.synchronize(&block) end
def wait_poll(timeout)
Waits on the queue up to +timeout+ seconds, then removes and
def wait_poll(timeout) @num_waiting += 1 t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) elapsed = 0 loop do ActiveSupport::Dependencies.interlock.permit_concurrent_loads do @cond.wait(timeout - elapsed) end return remove if any? elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 if elapsed >= timeout msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" % [timeout, elapsed] raise ConnectionTimeoutError, msg end end ensure @num_waiting -= 1 end