class Async::Semaphore
@public Since ‘stable-v1`.
A synchronization primitive, which limits access to a given resource.
def acquire
@yields {...} When the semaphore can be acquired.
If no block is provided, you must call release manually.
Acquire the semaphore, block if we are at the limit.
def acquire wait @count += 1 return unless block_given? begin return yield ensure self.release end end
def async(*arguments, parent: (@parent or Task.current), **options)
def async(*arguments, parent: (@parent or Task.current), **options) wait parent.async(**options) do |task| @count += 1 begin yield task, *arguments ensure self.release end end end
def blocking?
def blocking? @count >= @limit end
def empty?
def empty? @count.zero? end
def initialize(limit = 1, parent: nil)
@parameter limit [Integer] The maximum number of times the semaphore can be acquired before it blocks.
def initialize(limit = 1, parent: nil) @count = 0 @limit = limit @waiting = [] @parent = parent end
def release
def release @count -= 1 while (@limit - @count) > 0 and fiber = @waiting.shift if fiber.alive? Fiber.scheduler.resume(fiber) end end end
def wait
def wait fiber = Fiber.current if blocking? @waiting << fiber Fiber.scheduler.transfer while blocking? end rescue Exception @waiting.delete(fiber) raise end