class Concurrent::RubyExchanger

def do_exchange(value, timeout)

Returns:
  • (Object, CANCEL) - the value exchanged by the other thread; {CANCEL} on timeout
def do_exchange(value, timeout)
  # ALGORITHM
  #
  # From the original Java version:
  #
  # > The basic idea is to maintain a "slot", which is a reference to
  # > a Node containing both an Item to offer and a "hole" waiting to
  # > get filled in.  If an incoming "occupying" thread sees that the
  # > slot is null, it CAS'es (compareAndSets) a Node there and waits
  # > for another to invoke exchange.  That second "fulfilling" thread
  # > sees that the slot is non-null, and so CASes it back to null,
  # > also exchanging items by CASing the hole, plus waking up the
  # > occupying thread if it is blocked.  In each case CAS'es may
  # > fail because a slot at first appears non-null but is null upon
  # > CAS, or vice-versa.  So threads may need to retry these
  # > actions.
  #
  # This version:
  #
  # An exchange occurs between an "occupier" thread and a "fulfiller" thread.
  # The "slot" is used to setup this interaction. The first thread in the
  # exchange puts itself into the slot (occupies) and waits for a fulfiller.
  # The second thread removes the occupier from the slot and attempts to
  # perform the exchange. Removing the occupier also frees the slot for
  # another occupier/fulfiller pair.
  #
  # Because the occupier and the fulfiller are operating independently and
  # because there may be contention with other threads, any failed operation
  # indicates contention. Both the occupier and the fulfiller operate within
  # spin loops. Any failed actions along the happy path will cause the thread
  # to repeat the loop and try again.
  #
  # When a timeout value is given the thread must be cognizant of time spent
  # in the spin loop. The remaining time is checked every loop. When the time
  # runs out the thread will exit.
  #
  # A "node" is the data structure used to perform the exchange. Only the
  # occupier's node is necessary. It's the node used for the exchange.
  # Each node has an "item," a "hole" (self), and a "latch." The item is the
  # node's initial value. It never changes. It's what the fulfiller returns on
  # success. The occupier's hole is where the fulfiller put its item. It's the
  # item that the occupier returns on success. The latch is used for synchronization.
  # Because a thread may act as either an occupier or fulfiller (or possibly
  # both in periods of high contention) every thread creates a node when
  # the exchange method is first called.
  #
  # The following steps occur within the spin loop. If any actions fail
  # the thread will loop and try again, so long as there is time remaining.
  # If time runs out the thread will return CANCEL.
  #
  # Check the slot for an occupier:
  #
  #   * If the slot is empty try to occupy
  #   * If the slot is full try to fulfill
  #
  # Attempt to occupy:
  #
  #   * Attempt to CAS myself into the slot
  #   * Go to sleep and wait to be woken by a fulfiller
  #   * If the sleep is successful then the fulfiller completed its happy path
  #     - Return the value from my hole (the value given by the fulfiller)
  #   * When the sleep fails (time ran out) attempt to cancel the operation
  #     - Attempt to CAS myself out of the hole
  #     - If successful there is no contention
  #       - Return CANCEL
  #     - On failure, I am competing with a fulfiller
  #       - Attempt to CAS my hole to CANCEL
  #       - On success
  #         - Let the fulfiller deal with my cancel
  #         - Return CANCEL
  #       - On failure the fulfiller has completed its happy path
  #         - Return th value from my hole (the fulfiller's value)
  #
  # Attempt to fulfill:
  #
  # * Attempt to CAS the occupier out of the slot
  #   - On failure loop again
  # * Attempt to CAS my item into the occupier's hole
  #   - On failure the occupier is trying to cancel
  #     - Loop again
  #   - On success we are on the happy path
  #     - Wake the sleeping occupier
  #     - Return the occupier's item
  value  = NULL if value.nil? # The sentinel allows nil to be a valid value
  me     = Node.new(value) # create my node in case I need to occupy
  end_at = Concurrent.monotonic_time + timeout.to_f # The time to give up
  result = loop do
    other = slot
    if other && compare_and_set_slot(other, nil)
      # try to fulfill
      if other.compare_and_set_value(nil, value)
        # happy path
        other.latch.count_down
        break other.item
      end
    elsif other.nil? && compare_and_set_slot(nil, me)
      # try to occupy
      timeout = end_at - Concurrent.monotonic_time if timeout
      if me.latch.wait(timeout)
        # happy path
        break me.value
      else
        # attempt to remove myself from the slot
        if compare_and_set_slot(me, nil)
          break CANCEL
        elsif !me.compare_and_set_value(nil, CANCEL)
          # I've failed to block the fulfiller
          break me.value
        end
      end
    end
    break CANCEL if timeout && Concurrent.monotonic_time >= end_at
  end
  result == NULL ? nil : result
end