class ThreadSafe::Util::Striped64
needed again; and for short-lived ones, it does not matter.
contention levels will recur, so the cells will eventually be
under the assumption that for long-running instances, observed
expanded mask. We do not try to detect or remove such cells,
doubling the table causes no thread to hash to it under
once hashed to it terminate, as well as in the case where
It is possible for a Cell
to become unused when threads that
observed contention rates are typically low in these cases.
may not occur at all. However, despite these limitations,
and because threads are typically not bound to CPUS forever,
only become known via CAS failures, convergence can be slow,
colliding threads. Because search is random, and collisions
search for this mapping by randomly varying the hash codes of
slots that eliminates collisions. When we reach capacity, we
there would exist a perfect hash function mapping threads to
than CPUs, supposing that each thread were bound to a CPU,
The table size is capped because, when there are more threads
free slot.
using a secondary hash (XorShift) to try to find a
exists, a CAS is tried. Retries proceed by “double hashing”,
available, a new Cell
is created. Otherwise, if the slot
holds the lock. If a hashed slot is empty, and lock is
the capacity, it is doubled in size unless some other threadretry_update
). Upon a collision, if the table size is less than
CASes when performing an update operation (see method
Contention and/or table collisions are indicated by failed
Per-thread hash codes are initialized to random values.
which is still better than alternatives.
retries, there is increased contention and reduced locality,
available, threads try other slots (or the base). During these
There is no need for a blocking lock: When the lock is not
resizing the table, as well as populating slots with new +Cell+s.
A single spinlock (busy
) is used for initializing and
needed.
number of CPUS. Table slots remain empty (nil
) until they are
reaching the nearest power of two greater than or equal to the
The table size is doubled upon further contention until
failed CAS on base
update), the table is initialized to size 2.
updates are made to the base
field. Upon first contention (a
them until they are needed. When there is no contention, all
In part because +Cell+s are relatively large, we avoid creating
this precaution.
cache lines (with a huge negative performance impact) without
placed adjacent to each other, and so will most often share
other. But Atomic objects residing in arrays will tend to be
scattered in memory and thus don’t interfere much with each
overkill for most Atomics because they are usually irregularly
to reduce cache contention on most processors. Padding is
Table entries are of class Cell
; a variant of AtomicLong padded
by subclasses.
Nearly all methods on this class are private, accessed directly
is a power of two. Indexing uses masked per-thread hash codes.
updated variables, plus an extra base
field. The table size
This class maintains a lazily-initialized table of atomically
Class holding common representation and mechanics for classes supporting dynamic striping on 64bit values.
Original source code available here: gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?revision=1.6<br>A Ruby port of the Doug Lea’s jsr166e.Striped64 class version 1.6 available in public domain.
def cas_base_computed
def cas_base_computed cas_base(current_base = base, yield(current_base)) end
def expand_table_unless_stale(current_cells)
def expand_table_unless_stale(current_cells) try_in_busy do if current_cells == cells # Recheck under lock new_cells = current_cells.next_in_size_table current_cells.each_with_index {|x, i| new_cells.volatile_set(i, x)} self.cells = new_cells end end end
def free?
def free? !busy? end
def hash_code
A thread-local hash code accessor. The code is initially
def hash_code Thread.current[THREAD_LOCAL_KEY] ||= XorShiftRandom.get end
def hash_code=(hash)
def hash_code=(hash) Thread.current[THREAD_LOCAL_KEY] = hash end
def initialize
def initialize super() self.busy = false self.base = 0 end
def internal_reset(initial_value)
def internal_reset(initial_value) current_cells = cells self.base = initial_value if current_cells current_cells.each do |cell| cell.value = initial_value if cell end end end
def retry_update(x, hash_code, was_uncontended) # :yields: current_value
false if CAS failed before call
[+x+]
hash code used
[+hash_code+]
the value
[+x+]
Arguments:
reads.
problems of optimistic retry code, relying on rechecked sets of
explanation. This method suffers the usual non-modularity
creating new Cells, and/or contention. See above for
Handles cases of updates involving initialization, resizing,
def retry_update(x, hash_code, was_uncontended) # :yields: current_value hash = hash_code collided = false # True if last slot nonempty while true if current_cells = cells if !(cell = current_cells.volatile_get_by_hash(hash)) if busy? collided = false else # Try to attach new Cell if try_to_install_new_cell(Cell.new(x), hash) # Optimistically create and try to insert new cell break else redo # Slot is now non-empty end end elsif !was_uncontended # CAS already known to fail was_uncontended = true # Continue after rehash elsif cell.cas_computed {|current_value| yield current_value} break elsif current_cells.size >= CPU_COUNT || cells != current_cells # At max size or stale collided = false elsif collided && expand_table_unless_stale(current_cells) collided = false redo # Retry with expanded table else collided = true end hash = XorShiftRandom.xorshift(hash) elsif try_initialize_cells(x, hash) || cas_base_computed {|current_base| yield current_base} break end end self.hash_code = hash end
def try_in_busy
def try_in_busy if cas_busy(false, true) begin yield ensure self.busy = false end end end
def try_initialize_cells(x, hash)
def try_initialize_cells(x, hash) if free? && !cells try_in_busy do unless cells # Recheck under lock new_cells = PowerOfTwoTuple.new(2) new_cells.volatile_set_by_hash(hash, Cell.new(x)) self.cells = new_cells end end end end
def try_to_install_new_cell(new_cell, hash)
def try_to_install_new_cell(new_cell, hash) try_in_busy do # Recheck under lock if (current_cells = cells) && !current_cells.volatile_get(i = current_cells.hash_to_index(hash)) current_cells.volatile_set(i, new_cell) end end end