class ConnectionPool::TimedStack
#=> raises ConnectionPool::TimeoutError after 5 seconds
ts.pop timeout: 5
conn = ts.pop
ts.push conn
# return a connection
conn = ts.pop
# fetch a connection
ts = TimedStack.new(1) { MyConnection.new }
Examples:
number.
you wish to manage). Connections are created lazily up to a given maximum
The TimedStack manages a pool of homogeneous connections (or any resource
#
def connection_stored?(options = nil)
This is an extension point for TimedStack and is called with a mutex.
#
def connection_stored?(options = nil) !@que.empty? end
def current_time
def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
def empty?
#
def empty? (@created - @que.length) >= @max end
def fetch_connection(options = nil)
This is an extension point for TimedStack and is called with a mutex.
#
def fetch_connection(options = nil) @que.pop&.first end
def idle
#
def idle @que.length end
def idle_connections?(idle_seconds)
This is an extension point for TimedStack and is called with a mutex.
#
def idle_connections?(idle_seconds) connection_stored? && (current_time - @que.first.last > idle_seconds) end
def initialize(size = 0, &block)
Creates a new pool with +size+ connections that are created from the given
#
def initialize(size = 0, &block) @create_block = block @created = 0 @que = [] @max = size @mutex = Thread::Mutex.new @resource = Thread::ConditionVariable.new @shutdown_block = nil end
def length
#
def length @max - @created + @que.length end
def pop(timeout = 0.5, options = {})
the +timeout+ argument (which will be removed in a future release). Other
+:timeout+ is the only checked entry in +options+ and is preferred over
timeout a ConnectionPool::TimeoutError is raised.
immediately returned. If no connection is available within the given
Retrieves a connection from the stack. If a connection is available it is
#
def pop(timeout = 0.5, options = {}) options, timeout = timeout, 0.5 if Hash === timeout timeout = options.fetch :timeout, timeout deadline = current_time + timeout @mutex.synchronize do loop do raise ConnectionPool::PoolShuttingDownError if @shutdown_block if (conn = try_fetch_connection(options)) return conn end connection = try_create(options) return connection if connection to_wait = deadline - current_time raise ConnectionPool::TimeoutError, "Waited #{timeout} sec, #{length}/#{@max} available" if to_wait <= 0 @resource.wait(@mutex, to_wait) end end end
def push(obj, options = {})
Returns +obj+ to the stack. +options+ is ignored in TimedStack but may be
#
def push(obj, options = {}) @mutex.synchronize do if @shutdown_block @created -= 1 unless @created == 0 @shutdown_block.call(obj) else store_connection obj, options end @resource.broadcast end end
def reap(idle_seconds, &block)
#
def reap(idle_seconds, &block) raise ArgumentError, "reap must receive a block" unless block raise ArgumentError, "idle_seconds must be a number" unless idle_seconds.is_a?(Numeric) raise ConnectionPool::PoolShuttingDownError if @shutdown_block idle.times do conn = @mutex.synchronize do raise ConnectionPool::PoolShuttingDownError if @shutdown_block reserve_idle_connection(idle_seconds) end break unless conn block.call(conn) end end
def reserve_idle_connection(idle_seconds)
This method returns the oldest idle connection if it has been idle for more than idle_seconds.
This is an extension point for TimedStack and is called with a mutex.
#
def reserve_idle_connection(idle_seconds) return unless idle_connections?(idle_seconds) @created -= 1 unless @created == 0 @que.shift.first end
def shutdown(reload: false, &block)
shutdown will raise +ConnectionPool::PoolShuttingDownError+ unless
removing it from the pool. Attempting to checkout a connection after
Shuts down the TimedStack by passing each connection to +block+ and then
#
def shutdown(reload: false, &block) raise ArgumentError, "shutdown must receive a block" unless block @mutex.synchronize do @shutdown_block = block @resource.broadcast shutdown_connections @shutdown_block = nil if reload end end
def shutdown_connections(options = nil)
This is an extension point for TimedStack and is called with a mutex.
#
def shutdown_connections(options = nil) while (conn = try_fetch_connection(options)) @created -= 1 unless @created == 0 @shutdown_block.call(conn) end end
def store_connection(obj, options = nil)
This is an extension point for TimedStack and is called with a mutex.
#
def store_connection(obj, options = nil) @que.push [obj, current_time] end
def try_create(options = nil)
This method must create a connection if and only if the total number of
This is an extension point for TimedStack and is called with a mutex.
#
def try_create(options = nil) unless @created == @max object = @create_block.call @created += 1 object end end
def try_fetch_connection(options = nil)
subclasses with expensive match/search algorithms to avoid double-handling
This method must returns a connection from the stack if one exists. Allows
This is an extension point for TimedStack and is called with a mutex.
#
def try_fetch_connection(options = nil) connection_stored?(options) && fetch_connection(options) end