require'concurrent/concern/dereferenceable'require'concurrent/synchronization/object'moduleConcurrent# An `MVar` is a synchronized single element container. They are empty or# contain one item. Taking a value from an empty `MVar` blocks, as does# putting a value into a full one. You can either think of them as blocking# queue of length one, or a special kind of mutable variable.## On top of the fundamental `#put` and `#take` operations, we also provide a# `#mutate` that is atomic with respect to operations on the same instance.# These operations all support timeouts.## We also support non-blocking operations `#try_put!` and `#try_take!`, a# `#set!` that ignores existing values, a `#value` that returns the value# without removing it or returns `MVar::EMPTY`, and a `#modify!` that yields# `MVar::EMPTY` if the `MVar` is empty and can be used to set `MVar::EMPTY`.# You shouldn't use these operations in the first instance.## `MVar` is a [Dereferenceable](Dereferenceable).## `MVar` is related to M-structures in Id, `MVar` in Haskell and `SyncVar` in Scala.## Note that unlike the original Haskell paper, our `#take` is blocking. This is how# Haskell and Scala do it today.## @!macro copy_options## ## See Also## 1. P. Barth, R. Nikhil, and Arvind. [M-Structures: Extending a parallel, non- strict, functional language with state](http://dl.acm.org/citation.cfm?id=652538). In Proceedings of the 5th# ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991.## 2. S. Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](http://dl.acm.org/citation.cfm?id=237794).# In Proceedings of the 23rd Symposium on Principles of Programming Languages# (PoPL), 1996.classMVar<Synchronization::ObjectincludeConcern::Dereferenceablesafe_initialization!# Unique value that represents that an `MVar` was emptyEMPTY=::Object.new# Unique value that represents that an `MVar` timed out before it was able# to produce a value.TIMEOUT=::Object.new# Create a new `MVar`, either empty or with an initial value.## @param [Hash] opts the options controlling how the future will be processed## @!macro deref_optionsdefinitialize(value=EMPTY,opts={})@value=value@mutex=Mutex.new@empty_condition=ConditionVariable.new@full_condition=ConditionVariable.newset_deref_options(opts)end# Remove the value from an `MVar`, leaving it empty, and blocking if there# isn't a value. A timeout can be set to limit the time spent blocked, in# which case it returns `TIMEOUT` if the time is exceeded.# @return [Object] the value that was taken, or `TIMEOUT`deftake(timeout=nil)@mutex.synchronizedowait_for_full(timeout)# If we timed out we'll still be emptyifunlocked_full?value=@value@value=EMPTY@empty_condition.signalapply_deref_options(value)elseTIMEOUTendendend# acquires lock on the from an `MVAR`, yields the value to provided block,# and release lock. A timeout can be set to limit the time spent blocked,# in which case it returns `TIMEOUT` if the time is exceeded.# @return [Object] the value returned by the block, or `TIMEOUT`defborrow(timeout=nil)@mutex.synchronizedowait_for_full(timeout)# if we timeoud out we'll still be emptyifunlocked_full?yield@valueelseTIMEOUTendendend# Put a value into an `MVar`, blocking if there is already a value until# it is empty. A timeout can be set to limit the time spent blocked, in# which case it returns `TIMEOUT` if the time is exceeded.# @return [Object] the value that was put, or `TIMEOUT`defput(value,timeout=nil)@mutex.synchronizedowait_for_empty(timeout)# If we timed out we won't be emptyifunlocked_empty?@value=value@full_condition.signalapply_deref_options(value)elseTIMEOUTendendend# Atomically `take`, yield the value to a block for transformation, and then# `put` the transformed value. Returns the transformed value. A timeout can# be set to limit the time spent blocked, in which case it returns `TIMEOUT`# if the time is exceeded.# @return [Object] the transformed value, or `TIMEOUT`defmodify(timeout=nil)raiseArgumentError.new('no block given')unlessblock_given?@mutex.synchronizedowait_for_full(timeout)# If we timed out we'll still be emptyifunlocked_full?value=@value@value=yieldvalue@full_condition.signalapply_deref_options(value)elseTIMEOUTendendend# Non-blocking version of `take`, that returns `EMPTY` instead of blocking.deftry_take!@mutex.synchronizedoifunlocked_full?value=@value@value=EMPTY@empty_condition.signalapply_deref_options(value)elseEMPTYendendend# Non-blocking version of `put`, that returns whether or not it was successful.deftry_put!(value)@mutex.synchronizedoifunlocked_empty?@value=value@full_condition.signaltrueelsefalseendendend# Non-blocking version of `put` that will overwrite an existing value.defset!(value)@mutex.synchronizedoold_value=@value@value=value@full_condition.signalapply_deref_options(old_value)endend# Non-blocking version of `modify` that will yield with `EMPTY` if there is no value yet.defmodify!raiseArgumentError.new('no block given')unlessblock_given?@mutex.synchronizedovalue=@value@value=yieldvalueifunlocked_empty?@empty_condition.signalelse@full_condition.signalendapply_deref_options(value)endend# Returns if the `MVar` is currently empty.defempty?@mutex.synchronize{@value==EMPTY}end# Returns if the `MVar` currently contains a value.deffull?!empty?endprotecteddefsynchronize(&block)@mutex.synchronize(&block)endprivatedefunlocked_empty?@value==EMPTYenddefunlocked_full?!unlocked_empty?enddefwait_for_full(timeout)wait_while(@full_condition,timeout){unlocked_empty?}enddefwait_for_empty(timeout)wait_while(@empty_condition,timeout){unlocked_full?}enddefwait_while(condition,timeout)iftimeout.nil?whileyieldcondition.wait(@mutex)endelsestop=Concurrent.monotonic_time+timeoutwhileyield&&timeout>0.0condition.wait(@mutex,timeout)timeout=stop-Concurrent.monotonic_timeendendendendend