lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb



require 'thread'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/atomic/atomic_fixnum'
require 'concurrent/errors'
require 'concurrent/synchronization/object'
require 'concurrent/synchronization/lock'
require 'concurrent/atomic/lock_local_var'

module Concurrent

  # Re-entrant read-write lock implementation
  #
  # Allows any number of concurrent readers, but only one concurrent writer
  # (And while the "write" lock is taken, no read locks can be obtained either.
  # Hence, the write lock can also be called an "exclusive" lock.)
  #
  # If another thread has taken a read lock, any thread which wants a write lock
  # will block until all the readers release their locks. However, once a thread
  # starts waiting to obtain a write lock, any additional readers that come along
  # will also wait (so writers are not starved).
  #
  # A thread can acquire both a read and write lock at the same time. A thread can
  # also acquire a read lock OR a write lock more than once. Only when the read (or
  # write) lock is released as many times as it was acquired, will the thread
  # actually let it go, allowing other threads which might have been waiting
  # to proceed. Therefore the lock can be upgraded by first acquiring
  # read lock and then write lock and that the lock can be downgraded by first
  # having both read and write lock a releasing just the write lock.
  #
  # If both read and write locks are acquired by the same thread, it is not strictly
  # necessary to release them in the same order they were acquired. In other words,
  # the following code is legal:
  #
  # @example
  #   lock = Concurrent::ReentrantReadWriteLock.new
  #   lock.acquire_write_lock
  #   lock.acquire_read_lock
  #   lock.release_write_lock
  #   # At this point, the current thread is holding only a read lock, not a write
  #   # lock. So other threads can take read locks, but not a write lock.
  #   lock.release_read_lock
  #   # Now the current thread is not holding either a read or write lock, so
  #   # another thread could potentially acquire a write lock.
  #
  # This implementation was inspired by `java.util.concurrent.ReentrantReadWriteLock`.
  #
  # @example
  #   lock = Concurrent::ReentrantReadWriteLock.new
  #   lock.with_read_lock  { data.retrieve }
  #   lock.with_write_lock { data.modify! }
  #
  # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html java.util.concurrent.ReentrantReadWriteLock
  class ReentrantReadWriteLock < Synchronization::Object

    # Implementation notes:
    #
    # A goal is to make the uncontended path for both readers/writers mutex-free
    # Only if there is reader-writer or writer-writer contention, should mutexes be used
    # Otherwise, a single CAS operation is all we need to acquire/release a lock
    #
    # Internal state is represented by a single integer ("counter"), and updated
    #   using atomic compare-and-swap operations
    # When the counter is 0, the lock is free
    # Each thread which has one OR MORE read locks increments the counter by 1
    #   (and decrements by 1 when releasing the read lock)
    # The counter is increased by (1 << 15) for each writer waiting to acquire the
    #   write lock, and by (1 << 29) if the write lock is taken
    #
    # Additionally, each thread uses a thread-local variable to count how many times
    #   it has acquired a read lock, AND how many times it has acquired a write lock.
    # It uses a similar trick; an increment of 1 means a read lock was taken, and
    #   an increment of (1 << 15) means a write lock was taken
    # This is what makes re-entrancy possible
    #
    # 2 rules are followed to ensure good liveness properties:
    # 1) Once a writer has queued up and is waiting for a write lock, no other thread
    #    can take a lock without waiting
    # 2) When a write lock is released, readers are given the "first chance" to wake
    #    up and acquire a read lock
    # Following these rules means readers and writers tend to "take turns", so neither
    #   can starve the other, even under heavy contention

    # @!visibility private
    READER_BITS    = 15
    # @!visibility private
    WRITER_BITS    = 14

    # Used with @Counter:
    # @!visibility private
    WAITING_WRITER = 1 << READER_BITS
    # @!visibility private
    RUNNING_WRITER = 1 << (READER_BITS + WRITER_BITS)
    # @!visibility private
    MAX_READERS    = WAITING_WRITER - 1
    # @!visibility private
    MAX_WRITERS    = RUNNING_WRITER - MAX_READERS - 1

    # Used with @HeldCount:
    # @!visibility private
    WRITE_LOCK_HELD = 1 << READER_BITS
    # @!visibility private
    READ_LOCK_MASK  = WRITE_LOCK_HELD - 1
    # @!visibility private
    WRITE_LOCK_MASK = MAX_WRITERS

    safe_initialization!

    # Create a new `ReentrantReadWriteLock` in the unlocked state.
    def initialize
      super()
      @Counter    = AtomicFixnum.new(0)       # single integer which represents lock state
      @ReadQueue  = Synchronization::Lock.new # used to queue waiting readers
      @WriteQueue = Synchronization::Lock.new # used to queue waiting writers
      @HeldCount  = LockLocalVar.new(0) # indicates # of R & W locks held by this thread
    end

    # Execute a block operation within a read lock.
    #
    # @yield the task to be performed within the lock.
    #
    # @return [Object] the result of the block operation.
    #
    # @raise [ArgumentError] when no block is given.
    # @raise [Concurrent::ResourceLimitError] if the maximum number of readers
    #   is exceeded.
    def with_read_lock
      raise ArgumentError.new('no block given') unless block_given?
      acquire_read_lock
      begin
        yield
      ensure
        release_read_lock
      end
    end

    # Execute a block operation within a write lock.
    #
    # @yield the task to be performed within the lock.
    #
    # @return [Object] the result of the block operation.
    #
    # @raise [ArgumentError] when no block is given.
    # @raise [Concurrent::ResourceLimitError] if the maximum number of readers
    #   is exceeded.
    def with_write_lock
      raise ArgumentError.new('no block given') unless block_given?
      acquire_write_lock
      begin
        yield
      ensure
        release_write_lock
      end
    end

    # Acquire a read lock. If a write lock is held by another thread, will block
    # until it is released.
    #
    # @return [Boolean] true if the lock is successfully acquired
    #
    # @raise [Concurrent::ResourceLimitError] if the maximum number of readers
    #   is exceeded.
    def acquire_read_lock
      if (held = @HeldCount.value) > 0
        # If we already have a lock, there's no need to wait
        if held & READ_LOCK_MASK == 0
          # But we do need to update the counter, if we were holding a write
          #   lock but not a read lock
          @Counter.update { |c| c + 1 }
        end
        @HeldCount.value = held + 1
        return true
      end

      while true
        c = @Counter.value
        raise ResourceLimitError.new('Too many reader threads') if max_readers?(c)

        # If a writer is waiting OR running when we first queue up, we need to wait
        if waiting_or_running_writer?(c)
          # Before going to sleep, check again with the ReadQueue mutex held
          @ReadQueue.synchronize do
            @ReadQueue.ns_wait if waiting_or_running_writer?
          end
          # Note: the above 'synchronize' block could have used #wait_until,
          #   but that waits repeatedly in a loop, checking the wait condition
          #   each time it wakes up (to protect against spurious wakeups)
          # But we are already in a loop, which is only broken when we successfully
          #   acquire the lock! So we don't care about spurious wakeups, and would
          #   rather not pay the extra overhead of using #wait_until

          # After a reader has waited once, they are allowed to "barge" ahead of waiting writers
          # But if a writer is *running*, the reader still needs to wait (naturally)
          while true
            c = @Counter.value
            if running_writer?(c)
              @ReadQueue.synchronize do
                @ReadQueue.ns_wait if running_writer?
              end
            elsif @Counter.compare_and_set(c, c+1)
              @HeldCount.value = held + 1
              return true
            end
          end
        elsif @Counter.compare_and_set(c, c+1)
          @HeldCount.value = held + 1
          return true
        end
      end
    end

    # Try to acquire a read lock and return true if we succeed. If it cannot be
    # acquired immediately, return false.
    #
    # @return [Boolean] true if the lock is successfully acquired
    def try_read_lock
      if (held = @HeldCount.value) > 0
        if held & READ_LOCK_MASK == 0
          # If we hold a write lock, but not a read lock...
          @Counter.update { |c| c + 1 }
        end
        @HeldCount.value = held + 1
        return true
      else
        c = @Counter.value
        if !waiting_or_running_writer?(c) && @Counter.compare_and_set(c, c+1)
          @HeldCount.value = held + 1
          return true
        end
      end
      false
    end

    # Release a previously acquired read lock.
    #
    # @return [Boolean] true if the lock is successfully released
    def release_read_lock
      held = @HeldCount.value = @HeldCount.value - 1
      rlocks_held = held & READ_LOCK_MASK
      if rlocks_held == 0
        c = @Counter.update { |counter| counter - 1 }
        # If one or more writers were waiting, and we were the last reader, wake a writer up
        if waiting_or_running_writer?(c) && running_readers(c) == 0
          @WriteQueue.signal
        end
      elsif rlocks_held == READ_LOCK_MASK
        raise IllegalOperationError, "Cannot release a read lock which is not held"
      end
      true
    end

    # Acquire a write lock. Will block and wait for all active readers and writers.
    #
    # @return [Boolean] true if the lock is successfully acquired
    #
    # @raise [Concurrent::ResourceLimitError] if the maximum number of writers
    #   is exceeded.
    def acquire_write_lock
      if (held = @HeldCount.value) >= WRITE_LOCK_HELD
        # if we already have a write (exclusive) lock, there's no need to wait
        @HeldCount.value = held + WRITE_LOCK_HELD
        return true
      end

      while true
        c = @Counter.value
        raise ResourceLimitError.new('Too many writer threads') if max_writers?(c)

        # To go ahead and take the lock without waiting, there must be no writer
        #   running right now, AND no writers who came before us still waiting to
        #   acquire the lock
        # Additionally, if any read locks have been taken, we must hold all of them
        if held > 0 && @Counter.compare_and_set(1, c+RUNNING_WRITER)
          # If we are the only one reader and successfully swap the RUNNING_WRITER bit on, then we can go ahead
          @HeldCount.value = held + WRITE_LOCK_HELD
          return true
        elsif @Counter.compare_and_set(c, c+WAITING_WRITER)
          while true
            # Now we have successfully incremented, so no more readers will be able to increment
            #   (they will wait instead)
            # However, readers OR writers could decrement right here
            @WriteQueue.synchronize do
              # So we have to do another check inside the synchronized section
              # If a writer OR another reader is running, then go to sleep
              c = @Counter.value
              @WriteQueue.ns_wait if running_writer?(c) || running_readers(c) != held
            end
            # Note: if you are thinking of replacing the above 'synchronize' block
            # with #wait_until, read the comment in #acquire_read_lock first!

            # We just came out of a wait
            # If we successfully turn the RUNNING_WRITER bit on with an atomic swap,
            #   then we are OK to stop waiting and go ahead
            # Otherwise go back and wait again
            c = @Counter.value
            if !running_writer?(c) &&
               running_readers(c) == held &&
               @Counter.compare_and_set(c, c+RUNNING_WRITER-WAITING_WRITER)
              @HeldCount.value = held + WRITE_LOCK_HELD
              return true
            end
          end
        end
      end
    end

    # Try to acquire a write lock and return true if we succeed. If it cannot be
    # acquired immediately, return false.
    #
    # @return [Boolean] true if the lock is successfully acquired
    def try_write_lock
      if (held = @HeldCount.value) >= WRITE_LOCK_HELD
        @HeldCount.value = held + WRITE_LOCK_HELD
        return true
      else
        c = @Counter.value
        if !waiting_or_running_writer?(c) &&
           running_readers(c) == held &&
           @Counter.compare_and_set(c, c+RUNNING_WRITER)
           @HeldCount.value = held + WRITE_LOCK_HELD
          return true
        end
      end
      false
    end

    # Release a previously acquired write lock.
    #
    # @return [Boolean] true if the lock is successfully released
    def release_write_lock
      held = @HeldCount.value = @HeldCount.value - WRITE_LOCK_HELD
      wlocks_held = held & WRITE_LOCK_MASK
      if wlocks_held == 0
        c = @Counter.update { |counter| counter - RUNNING_WRITER }
        @ReadQueue.broadcast
        @WriteQueue.signal if waiting_writers(c) > 0
      elsif wlocks_held == WRITE_LOCK_MASK
        raise IllegalOperationError, "Cannot release a write lock which is not held"
      end
      true
    end

    private

    # @!visibility private
    def running_readers(c = @Counter.value)
      c & MAX_READERS
    end

    # @!visibility private
    def running_readers?(c = @Counter.value)
      (c & MAX_READERS) > 0
    end

    # @!visibility private
    def running_writer?(c = @Counter.value)
      c >= RUNNING_WRITER
    end

    # @!visibility private
    def waiting_writers(c = @Counter.value)
      (c & MAX_WRITERS) >> READER_BITS
    end

    # @!visibility private
    def waiting_or_running_writer?(c = @Counter.value)
      c >= WAITING_WRITER
    end

    # @!visibility private
    def max_readers?(c = @Counter.value)
      (c & MAX_READERS) == MAX_READERS
    end

    # @!visibility private
    def max_writers?(c = @Counter.value)
      (c & MAX_WRITERS) == MAX_WRITERS
    end
  end
end