lib/concurrent-ruby/concurrent/atomic/locals.rb
require 'fiber' require 'concurrent/utility/engine' require 'concurrent/constants' module Concurrent # @!visibility private # @!macro internal_implementation_note # # An abstract implementation of local storage, with sub-classes for # per-thread and per-fiber locals. # # Each execution context (EC, thread or fiber) has a lazily initialized array # of local variable values. Each time a new local variable is created, we # allocate an "index" for it. # # For example, if the allocated index is 1, that means slot #1 in EVERY EC's # locals array will be used for the value of that variable. # # The good thing about using a per-EC structure to hold values, rather than # a global, is that no synchronization is needed when reading and writing # those values (since the structure is only ever accessed by a single # thread). # # Of course, when a local variable is GC'd, 1) we need to recover its index # for use by other new local variables (otherwise the locals arrays could # get bigger and bigger with time), and 2) we need to null out all the # references held in the now-unused slots (both to avoid blocking GC of those # objects, and also to prevent "stale" values from being passed on to a new # local when the index is reused). # # Because we need to null out freed slots, we need to keep references to # ALL the locals arrays, so we can null out the appropriate slots in all of # them. This is why we need to use a finalizer to clean up the locals array # when the EC goes out of scope. class AbstractLocals def initialize @free = [] @lock = Mutex.new @all_arrays = {} @next = 0 end def synchronize @lock.synchronize { yield } end if Concurrent.on_cruby? def weak_synchronize yield end else alias_method :weak_synchronize, :synchronize end def next_index(local) index = synchronize do if @free.empty? @next += 1 else @free.pop end end # When the local goes out of scope, we should free the associated index # and all values stored into it. ObjectSpace.define_finalizer(local, local_finalizer(index)) index end def free_index(index) weak_synchronize do # The cost of GC'ing a TLV is linear in the number of ECs using local # variables. But that is natural! More ECs means more storage is used # per local variable. So naturally more CPU time is required to free # more storage. # # DO NOT use each_value which might conflict with new pair assignment # into the hash in #set method. @all_arrays.values.each do |locals| locals[index] = nil end # free index has to be published after the arrays are cleared: @free << index end end def fetch(index) locals = self.locals value = locals ? locals[index] : nil if nil == value yield elsif NULL.equal?(value) nil else value end end def set(index, value) locals = self.locals! locals[index] = (nil == value ? NULL : value) value end private # When the local goes out of scope, clean up that slot across all locals currently assigned. def local_finalizer(index) proc do free_index(index) end end # When a thread/fiber goes out of scope, remove the array from @all_arrays. def thread_fiber_finalizer(array_object_id) proc do weak_synchronize do @all_arrays.delete(array_object_id) end end end # Returns the locals for the current scope, or nil if none exist. def locals raise NotImplementedError end # Returns the locals for the current scope, creating them if necessary. def locals! raise NotImplementedError end end # @!visibility private # @!macro internal_implementation_note # An array-backed storage of indexed variables per thread. class ThreadLocals < AbstractLocals def locals Thread.current.thread_variable_get(:concurrent_thread_locals) end def locals! thread = Thread.current locals = thread.thread_variable_get(:concurrent_thread_locals) unless locals locals = thread.thread_variable_set(:concurrent_thread_locals, []) weak_synchronize do @all_arrays[locals.object_id] = locals end # When the thread goes out of scope, we should delete the associated locals: ObjectSpace.define_finalizer(thread, thread_fiber_finalizer(locals.object_id)) end locals end end # @!visibility private # @!macro internal_implementation_note # An array-backed storage of indexed variables per fiber. class FiberLocals < AbstractLocals def locals Thread.current[:concurrent_fiber_locals] end def locals! thread = Thread.current locals = thread[:concurrent_fiber_locals] unless locals locals = thread[:concurrent_fiber_locals] = [] weak_synchronize do @all_arrays[locals.object_id] = locals end # When the fiber goes out of scope, we should delete the associated locals: ObjectSpace.define_finalizer(Fiber.current, thread_fiber_finalizer(locals.object_id)) end locals end end private_constant :AbstractLocals, :ThreadLocals, :FiberLocals end