require'concurrent/constants'require'concurrent/errors'require'concurrent/maybe'require'concurrent/atomic/atomic_reference'require'concurrent/atomic/count_down_latch'require'concurrent/utility/engine'require'concurrent/utility/monotonic_time'moduleConcurrent# @!macro exchanger## A synchronization point at which threads can pair and swap elements within# pairs. Each thread presents some object on entry to the exchange method,# matches with a partner thread, and receives its partner's object on return.## @!macro thread_safe_variable_comparison## This implementation is very simple, using only a single slot for each# exchanger (unlike more advanced implementations which use an "arena").# This approach will work perfectly fine when there are only a few threads# accessing a single `Exchanger`. Beyond a handful of threads the performance# will degrade rapidly due to contention on the single slot, but the algorithm# will remain correct.## @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger# @example## exchanger = Concurrent::Exchanger.new## threads = [# Thread.new { puts "first: " << exchanger.exchange('foo', 1) }, #=> "first: bar"# Thread.new { puts "second: " << exchanger.exchange('bar', 1) } #=> "second: foo"# ]# threads.each {|t| t.join(2) }# @!visibility privateclassAbstractExchanger<Synchronization::Object# @!visibility privateCANCEL=::Object.newprivate_constant:CANCELdefinitializesuperend# @!macro exchanger_method_do_exchange## Waits for another thread to arrive at this exchange point (unless the# current thread is interrupted), and then transfers the given object to# it, receiving its object in return. The timeout value indicates the# approximate number of seconds the method should block while waiting# for the exchange. When the timeout value is `nil` the method will# block indefinitely.## @param [Object] value the value to exchange with another thread# @param [Numeric, nil] timeout in seconds, `nil` blocks indefinitely## @!macro exchanger_method_exchange## In some edge cases when a `timeout` is given a return value of `nil` may be# ambiguous. Specifically, if `nil` is a valid value in the exchange it will# be impossible to tell whether `nil` is the actual return value or if it# signifies timeout. When `nil` is a valid value in the exchange consider# using {#exchange!} or {#try_exchange} instead.## @return [Object] the value exchanged by the other thread or `nil` on timeoutdefexchange(value,timeout=nil)(value=do_exchange(value,timeout))==CANCEL?nil:valueend# @!macro exchanger_method_do_exchange# @!macro exchanger_method_exchange_bang## On timeout a {Concurrent::TimeoutError} exception will be raised.## @return [Object] the value exchanged by the other thread# @raise [Concurrent::TimeoutError] on timeoutdefexchange!(value,timeout=nil)if(value=do_exchange(value,timeout))==CANCELraiseConcurrent::TimeoutErrorelsevalueendend# @!macro exchanger_method_do_exchange# @!macro exchanger_method_try_exchange## The return value will be a {Concurrent::Maybe} set to `Just` on success or# `Nothing` on timeout.## @return [Concurrent::Maybe] on success a `Just` maybe will be returned with# the item exchanged by the other thread as `#value`; on timeout a# `Nothing` maybe will be returned with {Concurrent::TimeoutError} as `#reason`## @example## exchanger = Concurrent::Exchanger.new## result = exchanger.exchange(:foo, 0.5)## if result.just?# puts result.value #=> :bar# else# puts 'timeout'# enddeftry_exchange(value,timeout=nil)if(value=do_exchange(value,timeout))==CANCELConcurrent::Maybe.nothing(Concurrent::TimeoutError)elseConcurrent::Maybe.just(value)endendprivate# @!macro exchanger_method_do_exchange## @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeoutdefdo_exchange(value,timeout)raiseNotImplementedErrorendend# @!macro internal_implementation_note# @!visibility privateclassRubyExchanger<AbstractExchanger# A simplified version of java.util.concurrent.Exchanger written by# Doug Lea, Bill Scherer, and Michael Scott with assistance from members# of JCP JSR-166 Expert Group and released to the public domain. It does# not include the arena or the multi-processor spin loops.# http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/Exchanger.javasafe_initialization!classNode<Concurrent::Synchronization::Objectattr_atomic:valuesafe_initialization!definitialize(item)super()@Item=item@Latch=Concurrent::CountDownLatch.newself.value=nilenddeflatch@Latchenddefitem@Itemendendprivate_constant:Nodedefinitializesuperendprivateattr_atomic(:slot)# @!macro exchanger_method_do_exchange## @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeoutdefdo_exchange(value,timeout)# ALGORITHM## From the original Java version:## > The basic idea is to maintain a "slot", which is a reference to# > a Node containing both an Item to offer and a "hole" waiting to# > get filled in. If an incoming "occupying" thread sees that the# > slot is null, it CAS'es (compareAndSets) a Node there and waits# > for another to invoke exchange. That second "fulfilling" thread# > sees that the slot is non-null, and so CASes it back to null,# > also exchanging items by CASing the hole, plus waking up the# > occupying thread if it is blocked. In each case CAS'es may# > fail because a slot at first appears non-null but is null upon# > CAS, or vice-versa. So threads may need to retry these# > actions.## This version:## An exchange occurs between an "occupier" thread and a "fulfiller" thread.# The "slot" is used to setup this interaction. The first thread in the# exchange puts itself into the slot (occupies) and waits for a fulfiller.# The second thread removes the occupier from the slot and attempts to# perform the exchange. Removing the occupier also frees the slot for# another occupier/fulfiller pair.## Because the occupier and the fulfiller are operating independently and# because there may be contention with other threads, any failed operation# indicates contention. Both the occupier and the fulfiller operate within# spin loops. Any failed actions along the happy path will cause the thread# to repeat the loop and try again.## When a timeout value is given the thread must be cognizant of time spent# in the spin loop. The remaining time is checked every loop. When the time# runs out the thread will exit.## A "node" is the data structure used to perform the exchange. Only the# occupier's node is necessary. It's the node used for the exchange.# Each node has an "item," a "hole" (self), and a "latch." The item is the# node's initial value. It never changes. It's what the fulfiller returns on# success. The occupier's hole is where the fulfiller put its item. It's the# item that the occupier returns on success. The latch is used for synchronization.# Because a thread may act as either an occupier or fulfiller (or possibly# both in periods of high contention) every thread creates a node when# the exchange method is first called.## The following steps occur within the spin loop. If any actions fail# the thread will loop and try again, so long as there is time remaining.# If time runs out the thread will return CANCEL.## Check the slot for an occupier:## * If the slot is empty try to occupy# * If the slot is full try to fulfill## Attempt to occupy:## * Attempt to CAS myself into the slot# * Go to sleep and wait to be woken by a fulfiller# * If the sleep is successful then the fulfiller completed its happy path# - Return the value from my hole (the value given by the fulfiller)# * When the sleep fails (time ran out) attempt to cancel the operation# - Attempt to CAS myself out of the hole# - If successful there is no contention# - Return CANCEL# - On failure, I am competing with a fulfiller# - Attempt to CAS my hole to CANCEL# - On success# - Let the fulfiller deal with my cancel# - Return CANCEL# - On failure the fulfiller has completed its happy path# - Return th value from my hole (the fulfiller's value)## Attempt to fulfill:## * Attempt to CAS the occupier out of the slot# - On failure loop again# * Attempt to CAS my item into the occupier's hole# - On failure the occupier is trying to cancel# - Loop again# - On success we are on the happy path# - Wake the sleeping occupier# - Return the occupier's itemvalue=NULLifvalue.nil?# The sentinel allows nil to be a valid valueme=Node.new(value)# create my node in case I need to occupyend_at=Concurrent.monotonic_time+timeout.to_f# The time to give upresult=loopdoother=slotifother&&compare_and_set_slot(other,nil)# try to fulfillifother.compare_and_set_value(nil,value)# happy pathother.latch.count_downbreakother.itemendelsifother.nil?&&compare_and_set_slot(nil,me)# try to occupytimeout=end_at-Concurrent.monotonic_timeiftimeoutifme.latch.wait(timeout)# happy pathbreakme.valueelse# attempt to remove myself from the slotifcompare_and_set_slot(me,nil)breakCANCELelsif!me.compare_and_set_value(nil,CANCEL)# I've failed to block the fulfillerbreakme.valueendendendbreakCANCELiftimeout&&Concurrent.monotonic_time>=end_atendresult==NULL?nil:resultendendifConcurrent.on_jruby?# @!macro internal_implementation_note# @!visibility privateclassJavaExchanger<AbstractExchangerdefinitialize@exchanger=java.util.concurrent.Exchanger.newendprivate# @!macro exchanger_method_do_exchange## @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeoutdefdo_exchange(value,timeout)result=niliftimeout.nil?Synchronization::JRuby.sleep_interruptiblydoresult=@exchanger.exchange(value)endelseSynchronization::JRuby.sleep_interruptiblydoresult=@exchanger.exchange(value,1000*timeout,java.util.concurrent.TimeUnit::MILLISECONDS)endendresultrescuejava.util.concurrent.TimeoutExceptionCANCELendendend# @!visibility private# @!macro internal_implementation_noteExchangerImplementation=casewhenConcurrent.on_jruby?JavaExchangerelseRubyExchangerendprivate_constant:ExchangerImplementation# @!macro exchangerclassExchanger<ExchangerImplementation# @!method initialize# Creates exchanger instance# @!method exchange(value, timeout = nil)# @!macro exchanger_method_do_exchange# @!macro exchanger_method_exchange# @!method exchange!(value, timeout = nil)# @!macro exchanger_method_do_exchange# @!macro exchanger_method_exchange_bang# @!method try_exchange(value, timeout = nil)# @!macro exchanger_method_do_exchange# @!macro exchanger_method_try_exchangeendend