class FDB::HighContentionAllocator
def allocate(db_or_tr)
def allocate(db_or_tr) db_or_tr.transact do |tr| if !tr.instance_variable_defined?(:@__fdb_directory_layer_hca_state__) @lock.synchronize do if !tr.instance_variable_defined?(:@__fdb_directory_layer_hca_state__) tr.instance_variable_set(:@__fdb_directory_layer_hca_state__, AllocatorTransactionState.new) end end end tr_state = tr.instance_variable_get(:@__fdb_directory_layer_hca_state__) loop do start, count = tr.snapshot.get_range(@counters.range[0], @counters.range[1], {:limit => 1, :reverse => true}) .map { |kv| [ @counters.unpack(kv.key)[0], kv.value.unpack('q<')[0] ] }.first || [0,0] window = 0 window_advanced = false loop do tr_state.lock.synchronize do if window_advanced tr.clear_range(@counters, @counters[start]) tr.options.set_next_write_no_write_conflict_range() tr.clear_range(@recent, @recent[start]) end tr.add(@counters[start], [1].pack('q<')) count = tr.snapshot[@counters[start]] end count = count.nil? ? 0 : count.unpack('q<')[0] window = window_size(start) if count * 2 < window break end start += window window_advanced = true end candidate = 0 found = false loop do candidate = rand(start...start+window) latest_counter = nil candidate_value = nil tr_state.lock.synchronize do latest_counter = tr.snapshot.get_range(@counters.range[0], @counters.range[1], {:limit => 1, :reverse => true}) candidate_value = tr[@recent[candidate]] tr.options.set_next_write_no_write_conflict_range() tr[@recent[candidate]] = '' end latest_counter = latest_counter.map{ |kv| [ @counters.unpack(kv.key)[0] ] }.first || [0] if latest_counter.length > 0 and latest_counter[0] > start break end if candidate_value.nil? found = true tr.add_write_conflict_key(@recent[candidate]) break end end if found break FDB::Tuple.pack([candidate]) end end end end
def initialize(subspace)
def initialize(subspace) @counters = subspace[0] @recent = subspace[1] @lock = Mutex.new end
def window_size(start)
def window_size(start) if start < 255 64 elsif start < 65535 1024 else 8192 end end