lib/concurrent/tvar.rb



require 'set'
require 'concurrent/synchronization'

module Concurrent

  # A `TVar` is a transactional variable - a single-element container that
  # is used as part of a transaction - see `Concurrent::atomically`.
  #
  # @!macro thread_safe_variable_comparison
  #
  # {include:file:docs-source/tvar.md}
  class TVar < Synchronization::Object
    safe_initialization!

    # Create a new `TVar` with an initial value.
    def initialize(value)
      @value = value
      @version = 0
      @lock = Mutex.new
    end

    # Get the value of a `TVar`.
    def value
      Concurrent::atomically do
        Transaction::current.read(self)
      end
    end

    # Set the value of a `TVar`.
    def value=(value)
      Concurrent::atomically do
        Transaction::current.write(self, value)
      end
    end

    # @!visibility private
    def unsafe_value # :nodoc:
      @value
    end

    # @!visibility private
    def unsafe_value=(value) # :nodoc:
      @value = value
    end

    # @!visibility private
    def unsafe_version # :nodoc:
      @version
    end

    # @!visibility private
    def unsafe_increment_version # :nodoc:
      @version += 1
    end

    # @!visibility private
    def unsafe_lock # :nodoc:
      @lock
    end

  end

  # Run a block that reads and writes `TVar`s as a single atomic transaction.
  # With respect to the value of `TVar` objects, the transaction is atomic, in
  # that it either happens or it does not, consistent, in that the `TVar`
  # objects involved will never enter an illegal state, and isolated, in that
  # transactions never interfere with each other. You may recognise these
  # properties from database transactions.
  #
  # There are some very important and unusual semantics that you must be aware of:
  #
  # * Most importantly, the block that you pass to atomically may be executed
  #     more than once. In most cases your code should be free of
  #     side-effects, except for via TVar.
  #
  # * If an exception escapes an atomically block it will abort the transaction.
  #
  # * It is undefined behaviour to use callcc or Fiber with atomically.
  #
  # * If you create a new thread within an atomically, it will not be part of
  #     the transaction. Creating a thread counts as a side-effect.
  #
  # Transactions within transactions are flattened to a single transaction.
  #
  # @example
  #   a = new TVar(100_000)
  #   b = new TVar(100)
  #
  #   Concurrent::atomically do
  #     a.value -= 10
  #     b.value += 10
  #   end
  def atomically
    raise ArgumentError.new('no block given') unless block_given?

    # Get the current transaction

    transaction = Transaction::current

    # Are we not already in a transaction (not nested)?

    if transaction.nil?
      # New transaction

      begin
        # Retry loop

        loop do

          # Create a new transaction

          transaction = Transaction.new
          Transaction::current = transaction

          # Run the block, aborting on exceptions

          begin
            result = yield
          rescue Transaction::AbortError => e
            transaction.abort
            result = Transaction::ABORTED
          rescue Transaction::LeaveError => e
            transaction.abort
            break result
          rescue => e
            transaction.abort
            raise e
          end
          # If we can commit, break out of the loop

          if result != Transaction::ABORTED
            if transaction.commit
              break result
            end
          end
        end
      ensure
        # Clear the current transaction

        Transaction::current = nil
      end
    else
      # Nested transaction - flatten it and just run the block

      yield
    end
  end

  # Abort a currently running transaction - see `Concurrent::atomically`.
  def abort_transaction
    raise Transaction::AbortError.new
  end

  # Leave a transaction without committing or aborting - see `Concurrent::atomically`.
  def leave_transaction
    raise Transaction::LeaveError.new
  end

  module_function :atomically, :abort_transaction, :leave_transaction

  private

  class Transaction

    ABORTED = ::Object.new

    ReadLogEntry = Struct.new(:tvar, :version)

    AbortError = Class.new(StandardError)
    LeaveError = Class.new(StandardError)

    def initialize
      @read_log  = []
      @write_log = {}
    end

    def read(tvar)
      Concurrent::abort_transaction unless valid?

      if @write_log.has_key? tvar
        @write_log[tvar]
      else
        @read_log.push(ReadLogEntry.new(tvar, tvar.unsafe_version))
        tvar.unsafe_value
      end
    end

    def write(tvar, value)
      # Have we already written to this TVar?

      unless @write_log.has_key? tvar
        # Try to lock the TVar

        unless tvar.unsafe_lock.try_lock
          # Someone else is writing to this TVar - abort
          Concurrent::abort_transaction
        end

        # If we previously wrote to it, check the version hasn't changed

        @read_log.each do |log_entry|
          if log_entry.tvar == tvar and tvar.unsafe_version > log_entry.version
            Concurrent::abort_transaction
          end
        end
      end

      # Record the value written

      @write_log[tvar] = value
    end

    def abort
      unlock
    end

    def commit
      return false unless valid?

      @write_log.each_pair do |tvar, value|
        tvar.unsafe_value = value
        tvar.unsafe_increment_version
      end

      unlock

      true
    end

    def valid?
      @read_log.each do |log_entry|
        unless @write_log.has_key? log_entry.tvar
          if log_entry.tvar.unsafe_version > log_entry.version
            return false
          end
        end
      end

      true
    end

    def unlock
      @write_log.each_key do |tvar|
        tvar.unsafe_lock.unlock
      end
    end

    def self.current
      Thread.current[:current_tvar_transaction]
    end

    def self.current=(transaction)
      Thread.current[:current_tvar_transaction] = transaction
    end

  end

end