class Async::Semaphore
@public Since ‘stable-v1`.
A synchronization primitive, which limits access to a given resource.
def acquire
@yields {...} When the semaphore can be acquired.
If no block is provided, you must call release manually.
Acquire the semaphore, block if we are at the limit.
def acquire wait @count += 1 return unless block_given? begin return yield ensure self.release end end
def async(*arguments, parent: (@parent or Task.current), **options)
def async(*arguments, parent: (@parent or Task.current), **options) wait parent.async(**options) do |task| @count += 1 begin yield task, *arguments ensure self.release end end end
def blocking?
def blocking? @count >= @limit end
def empty?
def empty? @count.zero? end
def initialize(limit = 1, parent: nil)
@parameter limit [Integer] The maximum number of times the semaphore can be acquired before it blocks.
def initialize(limit = 1, parent: nil) @count = 0 @limit = limit @waiting = List.new @parent = parent end
def limit= limit
On increasing the limit, some tasks may be immediately resumed. On decreasing the limit, some tasks may execute until the count is < than the limit.
Allow setting the limit. This is useful for cases where the semaphore is used to limit the number of concurrent tasks, but the number of tasks is not known in advance or needs to be modified.
def limit= limit difference = limit - @limit @limit = limit # We can't suspend if difference > 0 difference.times do break unless node = @waiting.first node.resume end end end
def release
def release @count -= 1 while (@limit - @count) > 0 and node = @waiting.first node.resume end end
def wait
def wait return unless blocking? @waiting.stack(FiberNode.new(Fiber.current)) do Fiber.scheduler.transfer while blocking? end end