module Concurrent
def call_dataflow(method, executor, *inputs, &block)
def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? unless inputs.all? { |input| input.is_a? IVar } raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") end result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end