lib/concurrent-ruby/concurrent/atomic/read_write_lock.rb
require 'thread' require 'concurrent/atomic/atomic_fixnum' require 'concurrent/errors' require 'concurrent/synchronization/object' require 'concurrent/synchronization/lock' module Concurrent # Ruby read-write lock implementation # # Allows any number of concurrent readers, but only one concurrent writer # (And if the "write" lock is taken, any readers who come along will have to wait) # # If readers are already active when a writer comes along, the writer will wait for # all the readers to finish before going ahead. # Any additional readers that come when the writer is already waiting, will also # wait (so writers are not starved). # # This implementation is based on `java.util.concurrent.ReentrantReadWriteLock`. # # @example # lock = Concurrent::ReadWriteLock.new # lock.with_read_lock { data.retrieve } # lock.with_write_lock { data.modify! } # # @note Do **not** try to acquire the write lock while already holding a read lock # **or** try to acquire the write lock while you already have it. # This will lead to deadlock # # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html java.util.concurrent.ReentrantReadWriteLock class ReadWriteLock < Synchronization::Object # @!visibility private WAITING_WRITER = 1 << 15 # @!visibility private RUNNING_WRITER = 1 << 29 # @!visibility private MAX_READERS = WAITING_WRITER - 1 # @!visibility private MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1 safe_initialization! # Implementation notes: # A goal is to make the uncontended path for both readers/writers lock-free # Only if there is reader-writer or writer-writer contention, should locks be used # Internal state is represented by a single integer ("counter"), and updated # using atomic compare-and-swap operations # When the counter is 0, the lock is free # Each reader increments the counter by 1 when acquiring a read lock # (and decrements by 1 when releasing the read lock) # The counter is increased by (1 << 15) for each writer waiting to acquire the # write lock, and by (1 << 29) if the write lock is taken # Create a new `ReadWriteLock` in the unlocked state. def initialize super() @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadLock = Synchronization::Lock.new @WriteLock = Synchronization::Lock.new end # Execute a block operation within a read lock. # # @yield the task to be performed within the lock. # # @return [Object] the result of the block operation. # # @raise [ArgumentError] when no block is given. # @raise [Concurrent::ResourceLimitError] if the maximum number of readers # is exceeded. def with_read_lock raise ArgumentError.new('no block given') unless block_given? acquire_read_lock begin yield ensure release_read_lock end end # Execute a block operation within a write lock. # # @yield the task to be performed within the lock. # # @return [Object] the result of the block operation. # # @raise [ArgumentError] when no block is given. # @raise [Concurrent::ResourceLimitError] if the maximum number of readers # is exceeded. def with_write_lock raise ArgumentError.new('no block given') unless block_given? acquire_write_lock begin yield ensure release_write_lock end end # Acquire a read lock. If a write lock has been acquired will block until # it is released. Will not block if other read locks have been acquired. # # @return [Boolean] true if the lock is successfully acquired # # @raise [Concurrent::ResourceLimitError] if the maximum number of readers # is exceeded. def acquire_read_lock while true c = @Counter.value raise ResourceLimitError.new('Too many reader threads') if max_readers?(c) # If a writer is waiting when we first queue up, we need to wait if waiting_writer?(c) @ReadLock.wait_until { !waiting_writer? } # after a reader has waited once, they are allowed to "barge" ahead of waiting writers # but if a writer is *running*, the reader still needs to wait (naturally) while true c = @Counter.value if running_writer?(c) @ReadLock.wait_until { !running_writer? } else return if @Counter.compare_and_set(c, c+1) end end else break if @Counter.compare_and_set(c, c+1) end end true end # Release a previously acquired read lock. # # @return [Boolean] true if the lock is successfully released def release_read_lock while true c = @Counter.value if @Counter.compare_and_set(c, c-1) # If one or more writers were waiting, and we were the last reader, wake a writer up if waiting_writer?(c) && running_readers(c) == 1 @WriteLock.signal end break end end true end # Acquire a write lock. Will block and wait for all active readers and writers. # # @return [Boolean] true if the lock is successfully acquired # # @raise [Concurrent::ResourceLimitError] if the maximum number of writers # is exceeded. def acquire_write_lock while true c = @Counter.value raise ResourceLimitError.new('Too many writer threads') if max_writers?(c) if c == 0 # no readers OR writers running # if we successfully swap the RUNNING_WRITER bit on, then we can go ahead break if @Counter.compare_and_set(0, RUNNING_WRITER) elsif @Counter.compare_and_set(c, c+WAITING_WRITER) while true # Now we have successfully incremented, so no more readers will be able to increment # (they will wait instead) # However, readers OR writers could decrement right here, OR another writer could increment @WriteLock.wait_until do # So we have to do another check inside the synchronized section # If a writer OR reader is running, then go to sleep c = @Counter.value !running_writer?(c) && !running_readers?(c) end # We just came out of a wait # If we successfully turn the RUNNING_WRITER bit on with an atomic swap, # Then we are OK to stop waiting and go ahead # Otherwise go back and wait again c = @Counter.value break if !running_writer?(c) && !running_readers?(c) && @Counter.compare_and_set(c, c+RUNNING_WRITER-WAITING_WRITER) end break end end true end # Release a previously acquired write lock. # # @return [Boolean] true if the lock is successfully released def release_write_lock return true unless running_writer? c = @Counter.update { |counter| counter - RUNNING_WRITER } @ReadLock.broadcast @WriteLock.signal if waiting_writers(c) > 0 true end # Queries if the write lock is held by any thread. # # @return [Boolean] true if the write lock is held else false` def write_locked? @Counter.value >= RUNNING_WRITER end # Queries whether any threads are waiting to acquire the read or write lock. # # @return [Boolean] true if any threads are waiting for a lock else false def has_waiters? waiting_writer?(@Counter.value) end private # @!visibility private def running_readers(c = @Counter.value) c & MAX_READERS end # @!visibility private def running_readers?(c = @Counter.value) (c & MAX_READERS) > 0 end # @!visibility private def running_writer?(c = @Counter.value) c >= RUNNING_WRITER end # @!visibility private def waiting_writers(c = @Counter.value) (c & MAX_WRITERS) / WAITING_WRITER end # @!visibility private def waiting_writer?(c = @Counter.value) c >= WAITING_WRITER end # @!visibility private def max_readers?(c = @Counter.value) (c & MAX_READERS) == MAX_READERS end # @!visibility private def max_writers?(c = @Counter.value) (c & MAX_WRITERS) == MAX_WRITERS end end end