class EventMachine::Synchrony::ConnectionPool

def acquire(fiber)

- if pool is full, yield the current fiber until connection is available
- if connection is available, pass it back to the calling block
Acquire a lock on a connection and assign it to executing fiber
def acquire(fiber)
  if conn = @available.pop
    @reserved[fiber.object_id] = conn
    conn
  else
    Fiber.yield @pending.push fiber
    acquire(fiber)
  end
end

def execute(async)

connection to service the request.
block. This will block indefinitely until there is an available
Choose first available connection and pass it to the supplied
def execute(async)
  f = Fiber.current
  begin
    conn = acquire(f)
    yield conn
  ensure
    release(f) if not async
  end
end

def initialize(opts, &block)

def initialize(opts, &block)
  @reserved  = {}   # map of in-progress connections
  @available = []   # pool of free connections
  @pending   = []   # pending reservations (FIFO)
  opts[:size].times do
    @available.push(block.call) if block_given?
  end
end

def method_missing(method, *args, &blk)


data is available, or request is complete)
once it is complete (assumption: fiber will yield until
yield the connection within execute method and release
pool release whenever the request is complete. Otherwise
hijack the callbacks and errbacks to fire a connection
If the requesting method begins with "a" prefix, then

Allow the pool to behave as the underlying connection
def method_missing(method, *args, &blk)
  async = (method[0,1] == "a")
  execute(async) do |conn|
    df = conn.__send__(method, *args, &blk)
    if async
      fiber = Fiber.current
      df.callback { release(fiber) }
      df.errback { release(fiber) }
    end
    df
  end
end

def pool_status

Returns:
  • (Hash) - Current utilization.
def pool_status
  {
    available: @available.size,
    reserved: @reserved.size,
    pending: @pending.size
  }
end

def release(fiber)

immediately try to run acquire on the pool)
resume any other pending connections (which will
Release connection assigned to the supplied fiber and
def release(fiber)
  @available.push(@reserved.delete(fiber.object_id))
  if pending = @pending.shift
    pending.resume
  end
end