require 'concurrent/thread_safe/util'
require 'concurrent/thread_safe/util/power_of_two_tuple'
require 'concurrent/thread_safe/util/volatile'
require 'concurrent/thread_safe/util/xor_shift_random'
module Concurrent
# @!visibility private
module ThreadSafe
# @!visibility private
module Util
# A Ruby port of the Doug Lea's jsr166e.Striped64 class version 1.6
# available in public domain.
#
# Original source code available here:
# http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?revision=1.6
#
# Class holding common representation and mechanics for classes supporting
# dynamic striping on 64bit values.
#
# This class maintains a lazily-initialized table of atomically updated
# variables, plus an extra +base+ field. The table size is a power of two.
# Indexing uses masked per-thread hash codes. Nearly all methods on this
# class are private, accessed directly by subclasses.
#
# Table entries are of class +Cell+; a variant of AtomicLong padded to
# reduce cache contention on most processors. Padding is overkill for most
# Atomics because they are usually irregularly scattered in memory and thus
# don't interfere much with each other. But Atomic objects residing in
# arrays will tend to be placed adjacent to each other, and so will most
# often share cache lines (with a huge negative performance impact) without
# this precaution.
#
# In part because +Cell+s are relatively large, we avoid creating them until
# they are needed. When there is no contention, all updates are made to the
# +base+ field. Upon first contention (a failed CAS on +base+ update), the
# table is initialized to size 2. The table size is doubled upon further
# contention until reaching the nearest power of two greater than or equal
# to the number of CPUS. Table slots remain empty (+nil+) until they are
# needed.
#
# A single spinlock (+busy+) is used for initializing and resizing the
# table, as well as populating slots with new +Cell+s. There is no need for
# a blocking lock: When the lock is not available, threads try other slots
# (or the base). During these retries, there is increased contention and
# reduced locality, which is still better than alternatives.
#
# Per-thread hash codes are initialized to random values. Contention and/or
# table collisions are indicated by failed CASes when performing an update
# operation (see method +retry_update+). Upon a collision, if the table size
# is less than the capacity, it is doubled in size unless some other thread
# holds the lock. If a hashed slot is empty, and lock is available, a new
# +Cell+ is created. Otherwise, if the slot exists, a CAS is tried. Retries
# proceed by "double hashing", using a secondary hash (XorShift) to try to
# find a free slot.
#
# The table size is capped because, when there are more threads than CPUs,
# supposing that each thread were bound to a CPU, there would exist a
# perfect hash function mapping threads to slots that eliminates collisions.
# When we reach capacity, we search for this mapping by randomly varying the
# hash codes of colliding threads. Because search is random, and collisions
# only become known via CAS failures, convergence can be slow, and because
# threads are typically not bound to CPUS forever, may not occur at all.
# However, despite these limitations, observed contention rates are
# typically low in these cases.
#
# It is possible for a +Cell+ to become unused when threads that once hashed
# to it terminate, as well as in the case where doubling the table causes no
# thread to hash to it under expanded mask. We do not try to detect or
# remove such cells, under the assumption that for long-running instances,
# observed contention levels will recur, so the cells will eventually be
# needed again; and for short-lived ones, it does not matter.
#
# @!visibility private
class Striped64
# Padded variant of AtomicLong supporting only raw accesses plus CAS.
# The +value+ field is placed between pads, hoping that the JVM doesn't
# reorder them.
#
# Optimisation note: It would be possible to use a release-only
# form of CAS here, if it were provided.
#
# @!visibility private
class Cell < Concurrent::AtomicReference
alias_method :cas, :compare_and_set
def cas_computed
cas(current_value = value, yield(current_value))
end
# @!visibility private
def self.padding
# TODO: this only adds padding after the :value slot, need to find a way to add padding before the slot
# TODO (pitr-ch 28-Jul-2018): the padding instance vars may not be created
# hide from yardoc in a method
attr_reader *(12.times.collect{ |i| "padding_#{i}".to_sym })
end
padding
end
extend Volatile
attr_volatile :cells, # Table of cells. When non-null, size is a power of 2.
:base, # Base value, used mainly when there is no contention, but also as a fallback during table initialization races. Updated via CAS.
:busy # Spinlock (locked via CAS) used when resizing and/or creating Cells.
alias_method :busy?, :busy
def initialize
super()
self.busy = false
self.base = 0
end
# Handles cases of updates involving initialization, resizing,
# creating new Cells, and/or contention. See above for
# explanation. This method suffers the usual non-modularity
# problems of optimistic retry code, relying on rechecked sets of
# reads.
#
# Arguments:
# [+x+]
# the value
# [+hash_code+]
# hash code used
# [+x+]
# false if CAS failed before call
def retry_update(x, hash_code, was_uncontended) # :yields: current_value
hash = hash_code
collided = false # True if last slot nonempty
while true
if current_cells = cells
if !(cell = current_cells.volatile_get_by_hash(hash))
if busy?
collided = false
else # Try to attach new Cell
if try_to_install_new_cell(Cell.new(x), hash) # Optimistically create and try to insert new cell
break
else
redo # Slot is now non-empty
end
end
elsif !was_uncontended # CAS already known to fail
was_uncontended = true # Continue after rehash
elsif cell.cas_computed {|current_value| yield current_value}
break
elsif current_cells.size >= CPU_COUNT || cells != current_cells # At max size or stale
collided = false
elsif collided && expand_table_unless_stale(current_cells)
collided = false
redo # Retry with expanded table
else
collided = true
end
hash = XorShiftRandom.xorshift(hash)
elsif try_initialize_cells(x, hash) || cas_base_computed {|current_base| yield current_base}
break
end
end
self.hash_code = hash
end
private
# Static per-thread hash code key. Shared across all instances to
# reduce Thread locals pollution and because adjustments due to
# collisions in one table are likely to be appropriate for
# others.
THREAD_LOCAL_KEY = "#{name}.hash_code".to_sym
# A thread-local hash code accessor. The code is initially
# random, but may be set to a different value upon collisions.
def hash_code
Thread.current[THREAD_LOCAL_KEY] ||= XorShiftRandom.get
end
def hash_code=(hash)
Thread.current[THREAD_LOCAL_KEY] = hash
end
# Sets base and all +cells+ to the given value.
def internal_reset(initial_value)
current_cells = cells
self.base = initial_value
if current_cells
current_cells.each do |cell|
cell.value = initial_value if cell
end
end
end
def cas_base_computed
cas_base(current_base = base, yield(current_base))
end
def free?
!busy?
end
def try_initialize_cells(x, hash)
if free? && !cells
try_in_busy do
unless cells # Recheck under lock
new_cells = PowerOfTwoTuple.new(2)
new_cells.volatile_set_by_hash(hash, Cell.new(x))
self.cells = new_cells
end
end
end
end
def expand_table_unless_stale(current_cells)
try_in_busy do
if current_cells == cells # Recheck under lock
new_cells = current_cells.next_in_size_table
current_cells.each_with_index {|x, i| new_cells.volatile_set(i, x)}
self.cells = new_cells
end
end
end
def try_to_install_new_cell(new_cell, hash)
try_in_busy do
# Recheck under lock
if (current_cells = cells) && !current_cells.volatile_get(i = current_cells.hash_to_index(hash))
current_cells.volatile_set(i, new_cell)
end
end
end
def try_in_busy
if cas_busy(false, true)
begin
yield
ensure
self.busy = false
end
end
end
end
end
end
end