lib/concurrent/thread_safe/util/striped64.rb



require 'concurrent/thread_safe/util'
require 'concurrent/thread_safe/util/power_of_two_tuple'
require 'concurrent/thread_safe/util/volatile'
require 'concurrent/thread_safe/util/xor_shift_random'

module Concurrent

  # @!visibility private
  module ThreadSafe

    # @!visibility private
    module Util

      # A Ruby port of the Doug Lea's jsr166e.Striped64 class version 1.6
      # available in public domain.
      #
      # Original source code available here:
      # http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?revision=1.6
      #
      # Class holding common representation and mechanics for classes supporting
      # dynamic striping on 64bit values.
      #
      # This class maintains a lazily-initialized table of atomically updated
      # variables, plus an extra +base+ field. The table size is a power of two.
      # Indexing uses masked per-thread hash codes. Nearly all methods on this
      # class are private, accessed directly by subclasses.
      #
      # Table entries are of class +Cell+; a variant of AtomicLong padded to
      # reduce cache contention on most processors. Padding is overkill for most
      # Atomics because they are usually irregularly scattered in memory and thus
      # don't interfere much with each other. But Atomic objects residing in
      # arrays will tend to be placed adjacent to each other, and so will most
      # often share cache lines (with a huge negative performance impact) without
      # this precaution.
      #
      # In part because +Cell+s are relatively large, we avoid creating them until
      # they are needed. When there is no contention, all updates are made to the
      # +base+ field. Upon first contention (a failed CAS on +base+ update), the
      # table is initialized to size 2. The table size is doubled upon further
      # contention until reaching the nearest power of two greater than or equal
      # to the number of CPUS. Table slots remain empty (+nil+) until they are
      # needed.
      #
      # A single spinlock (+busy+) is used for initializing and resizing the
      # table, as well as populating slots with new +Cell+s. There is no need for
      # a blocking lock: When the lock is not available, threads try other slots
      # (or the base). During these retries, there is increased contention and
      # reduced locality, which is still better than alternatives.
      #
      # Per-thread hash codes are initialized to random values. Contention and/or
      # table collisions are indicated by failed CASes when performing an update
      # operation (see method +retry_update+). Upon a collision, if the table size
      # is less than the capacity, it is doubled in size unless some other thread
      # holds the lock. If a hashed slot is empty, and lock is available, a new
      # +Cell+ is created. Otherwise, if the slot exists, a CAS is tried. Retries
      # proceed by "double hashing", using a secondary hash (XorShift) to try to
      # find a free slot.
      #
      # The table size is capped because, when there are more threads than CPUs,
      # supposing that each thread were bound to a CPU, there would exist a
      # perfect hash function mapping threads to slots that eliminates collisions.
      # When we reach capacity, we search for this mapping by randomly varying the
      # hash codes of colliding threads. Because search is random, and collisions
      # only become known via CAS failures, convergence can be slow, and because
      # threads are typically not bound to CPUS forever, may not occur at all.
      # However, despite these limitations, observed contention rates are
      # typically low in these cases.
      #
      # It is possible for a +Cell+ to become unused when threads that once hashed
      # to it terminate, as well as in the case where doubling the table causes no
      # thread to hash to it under expanded mask. We do not try to detect or
      # remove such cells, under the assumption that for long-running instances,
      # observed contention levels will recur, so the cells will eventually be
      # needed again; and for short-lived ones, it does not matter.
      #
      # @!visibility private
      class Striped64

        # Padded variant of AtomicLong supporting only raw accesses plus CAS.
        # The +value+ field is placed between pads, hoping that the JVM doesn't
        # reorder them.
        #
        # Optimisation note: It would be possible to use a release-only
        # form of CAS here, if it were provided.
        #
        # @!visibility private
        class Cell < Concurrent::AtomicReference

          alias_method :cas, :compare_and_set

          def cas_computed
            cas(current_value = value, yield(current_value))
          end

          # @!visibility private
          def self.padding
            # TODO: this only adds padding after the :value slot, need to find a way to add padding before the slot
            # TODO (pitr-ch 28-Jul-2018): the padding instance vars may not be created
            # hide from yardoc in a method
            attr_reader *(12.times.collect{ |i| "padding_#{i}".to_sym })
          end
          padding
        end

        extend Volatile
        attr_volatile :cells, # Table of cells. When non-null, size is a power of 2.
          :base,  # Base value, used mainly when there is no contention, but also as a fallback during table initialization races. Updated via CAS.
          :busy   # Spinlock (locked via CAS) used when resizing and/or creating Cells.

        alias_method :busy?, :busy

        def initialize
          super()
          self.busy = false
          self.base = 0
        end

        # Handles cases of updates involving initialization, resizing,
        # creating new Cells, and/or contention. See above for
        # explanation. This method suffers the usual non-modularity
        # problems of optimistic retry code, relying on rechecked sets of
        # reads.
        #
        # Arguments:
        # [+x+]
        #   the value
        # [+hash_code+]
        #   hash code used
        # [+x+]
        #   false if CAS failed before call
        def retry_update(x, hash_code, was_uncontended) # :yields: current_value
          hash     = hash_code
          collided = false # True if last slot nonempty
          while true
            if current_cells = cells
              if !(cell = current_cells.volatile_get_by_hash(hash))
                if busy?
                  collided = false
                else # Try to attach new Cell
                  if try_to_install_new_cell(Cell.new(x), hash) # Optimistically create and try to insert new cell
                    break
                  else
                    redo # Slot is now non-empty
                  end
                end
              elsif !was_uncontended # CAS already known to fail
                was_uncontended = true # Continue after rehash
              elsif cell.cas_computed {|current_value| yield current_value}
                break
              elsif current_cells.size >= CPU_COUNT || cells != current_cells # At max size or stale
                collided = false
              elsif collided && expand_table_unless_stale(current_cells)
                collided = false
                redo # Retry with expanded table
              else
                collided = true
              end
              hash = XorShiftRandom.xorshift(hash)

            elsif try_initialize_cells(x, hash) || cas_base_computed {|current_base| yield current_base}
              break
            end
          end
          self.hash_code = hash
        end

        private
        # Static per-thread hash code key. Shared across all instances to
        # reduce Thread locals pollution and because adjustments due to
        # collisions in one table are likely to be appropriate for
        # others.
        THREAD_LOCAL_KEY = "#{name}.hash_code".to_sym

        # A thread-local hash code accessor. The code is initially
        # random, but may be set to a different value upon collisions.
        def hash_code
          Thread.current[THREAD_LOCAL_KEY] ||= XorShiftRandom.get
        end

        def hash_code=(hash)
          Thread.current[THREAD_LOCAL_KEY] = hash
        end

        # Sets base and all +cells+ to the given value.
        def internal_reset(initial_value)
          current_cells = cells
          self.base     = initial_value
          if current_cells
            current_cells.each do |cell|
              cell.value = initial_value if cell
            end
          end
        end

        def cas_base_computed
          cas_base(current_base = base, yield(current_base))
        end

        def free?
          !busy?
        end

        def try_initialize_cells(x, hash)
          if free? && !cells
            try_in_busy do
              unless cells # Recheck under lock
                new_cells = PowerOfTwoTuple.new(2)
                new_cells.volatile_set_by_hash(hash, Cell.new(x))
                self.cells = new_cells
              end
            end
          end
        end

        def expand_table_unless_stale(current_cells)
          try_in_busy do
            if current_cells == cells # Recheck under lock
              new_cells = current_cells.next_in_size_table
              current_cells.each_with_index {|x, i| new_cells.volatile_set(i, x)}
              self.cells = new_cells
            end
          end
        end

        def try_to_install_new_cell(new_cell, hash)
          try_in_busy do
            # Recheck under lock
            if (current_cells = cells) && !current_cells.volatile_get(i = current_cells.hash_to_index(hash))
              current_cells.volatile_set(i, new_cell)
            end
          end
        end

        def try_in_busy
          if cas_busy(false, true)
            begin
              yield
            ensure
              self.busy = false
            end
          end
        end
      end
    end
  end
end