class EventMachine::Synchrony::ConnectionPool
def acquire(fiber)
- if connection is available, pass it back to the calling block
Acquire a lock on a connection and assign it to executing fiber
def acquire(fiber) if conn = @available.pop @reserved[fiber.object_id] = conn conn else Fiber.yield @pending.push fiber acquire(fiber) end end
def execute(async)
block. This will block indefinitely until there is an available
Choose first available connection and pass it to the supplied
def execute(async) f = Fiber.current begin conn = acquire(f) yield conn ensure release(f) if not async end end
def initialize(opts, &block)
def initialize(opts, &block) @reserved = {} # map of in-progress connections @available = [] # pool of free connections @pending = [] # pending reservations (FIFO) opts[:size].times do @available.push(block.call) if block_given? end end
def method_missing(method, *args, &blk)
data is available, or request is complete)
once it is complete (assumption: fiber will yield until
yield the connection within execute method and release
pool release whenever the request is complete. Otherwise
hijack the callbacks and errbacks to fire a connection
If the requesting method begins with "a" prefix, then
Allow the pool to behave as the underlying connection
def method_missing(method, *args, &blk) async = (method[0,1] == "a") execute(async) do |conn| df = conn.__send__(method, *args, &blk) if async fiber = Fiber.current df.callback { release(fiber) } df.errback { release(fiber) } end df end end
def pool_status
-
(Hash)
- Current utilization.
def pool_status { available: @available.size, reserved: @reserved.size, pending: @pending.size } end
def release(fiber)
resume any other pending connections (which will
Release connection assigned to the supplied fiber and
def release(fiber) @available.push(@reserved.delete(fiber.object_id)) if pending = @pending.shift pending.resume end end