module Concurrent

def call_dataflow(method, executor, *inputs, &block)

def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  unless inputs.all? { |input| input.is_a? IVar }
    raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }")
  end
  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end
  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }
    inputs.each do |input|
      input.add_observer counter
    end
  end
  result
end