lib/concurrent/dataflow.rb



require 'concurrent/atomic'
require 'concurrent/future'

module Concurrent

  # @!visibility private
  class DependencyCounter # :nodoc:

    def initialize(count, &block)
      @counter = AtomicFixnum.new(count)
      @block = block
    end

    def update(time, value, reason)
      if @counter.decrement == 0
        @block.call
      end
    end
  end

  # Dataflow allows you to create a task that will be scheduled then all of its
  # data dependencies are available. Data dependencies are +Future+ values. The
  # dataflow task itself is also a +Future+ value, so you can build up a graph of
  # these tasks, each of which is run when all the data and other tasks it depends
  # on are available or completed.
  #
  # Our syntax is somewhat related to that of Akka's +flow+ and Habanero Java's
  # +DataDrivenFuture+. However unlike Akka we don't schedule a task at all until
  # it is ready to run, and unlike Habanero Java we pass the data values into the
  # task instead of dereferencing them again in the task.
  #
  # The theory of dataflow goes back to the 80s. In the terminology of the literature,
  # our implementation is coarse-grained, in that each task can be many instructions,
  # and dynamic in that you can create more tasks within other tasks.
  #
  # @example Parallel Fibonacci calculator
  #   def fib(n)
  #     if n < 2
  #       Concurrent::dataflow { n }
  #     else
  #       n1 = fib(n - 1)
  #       n2 = fib(n - 2)
  #       Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
  #     end
  #   end
  #   
  #   f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...
  #   
  #   # wait up to 1 second for the answer...
  #   f.value(1) #=> 377
  #
  # @param [Future] inputs zero or more +Future+ operations that this dataflow depends upon
  #
  # @yield The operation to perform once all the dependencies are met
  # @yieldparam [Future] inputs each of the +Future+ inputs to the dataflow
  # @yieldreturn [Object] the result of the block operation
  #
  # @return [Object] the result of all the operations
  #
  # @raise [ArgumentError] if no block is given
  # @raise [ArgumentError] if any of the inputs are not +IVar+s
  def dataflow(*inputs, &block)
    raise ArgumentError.new('no block given') unless block_given?
    raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }

    result = Future.new do
      values = inputs.map { |input| input.value }
      block.call(*values)
    end

    if inputs.empty?
      result.execute
    else
      counter = DependencyCounter.new(inputs.size) { result.execute }

      inputs.each do |input|
        input.add_observer counter
      end
    end

    result
  end

  module_function :dataflow
end