class Hierarchy
def self.ancestors(from:, to:)
Return all the possible ancestors associations by navigating through :belongs_to
def self.ancestors(from:, to:) return [] if from.to_s == to.to_s queue = [{ class: from.to_s, path: [] }] visited = { from.to_s => [] } paths = [] while queue.any? current = queue.shift current_class = current[:class] current_path = current[:path] current_class.constantize.reflect_on_all_associations(:belongs_to).each do |relation| next if relation.options[:polymorphic] next_class = relation.klass.to_s next_path = current_path + [relation.name] paths << hashify(next_path) if next_class == to.to_s if non_looped_path?(from, to, relation, next_path) visited[next_class] = next_path queue.push({ class: next_class, path: next_path }) end end end paths end
def self.ancestors_bfs(from:, to:, classify: false)
Starting from the "from" class towards the "to" class
Return the ancestors associations by navigating through :belongs_to
def self.ancestors_bfs(from:, to:, classify: false) return if from == to queue = [{ class: from, path: [] }] visited = [from] while queue.any? current = queue.shift current_class = current[:class] current_path = current[:path] current_class.reflect_on_all_associations(:belongs_to).each do |relation| next if relation.options[:polymorphic] next_class = relation.klass if classify # An array of classes next_path = current_path + [relation.klass.to_s] return next_path if next_class.to_s == to.to_s else # A hash of associations next_path = current_path + [relation.name] return hashify(next_path) if next_class.to_s == to.to_s end if visited.exclude?(next_class) visited << next_class queue.push({ class: next_class, path: next_path }) end end end end
def self.ancestors_dfs(from:, to:, descendants: [])
Starting from the "from" class towards the "to" class
Return the ancestors associations by navigating through :belongs_to
def self.ancestors_dfs(from:, to:, descendants: []) return if from.to_s == to.to_s and descendants == [] # Base case return 'loop' if from.in? descendants # Avoids cycle descendants.push(from) from.reflect_on_all_associations(:belongs_to).map do |relation| return relation.name if relation.klass.to_s == to.to_s # Path is found path = ancestors_dfs(from: relation.klass, to: to, descendants: descendants) return { relation.name => path } if valid_path?(path, to.model_name.param_key.to_sym) end.compact.first end
def self.associations(klass)
def self.associations(klass) build_hierarchy(class: klass) end
def self.bottom_up_classes(klass)
Return the array of children classes in a bottom up manner
def self.bottom_up_classes(klass) @classes_list = [] build_descendants(klass) topological_sort([klass.to_s] + @classes_list) end
def self.build_descendants(klass)
def self.build_descendants(klass) dfs_descendants(class: klass, classes?: true) rescue SystemStackError Rails.logger.ap "Infinite loop detected and handled for #{opts[:class]} classes_list", :warn [] end
def self.build_hierarchy(opts)
def self.build_hierarchy(opts) @cache = {} dfs_hierarchy(opts) rescue SystemStackError Rails.logger.ap "Infinite loop detected and handled for #{opts[:class]} hierarchy", :warn [] end
def self.children_classes(opts)
def self.children_classes(opts) walkables(opts[:class]).map do |reflection| child_class = get_class(reflection) if opts[:classes?] [child_class, child_class.to_s] else [child_class, reflection.name] end end.uniq end
def self.classes(klass)
def self.classes(klass) build_hierarchy(class: klass, classes?: true) end
def self.classes_list(klass)
def self.classes_list(klass) @classes_list = [] build_descendants(klass) @classes_list end
def self.dfs_descendants(opts, klass_name = nil)
def self.dfs_descendants(opts, klass_name = nil) return if klass_name.in? @classes_list @classes_list.push(klass_name) if klass_name.present? children_classes(opts).each do |child_klass, child_name| child_opts = { class: child_klass, classes?: opts[:classes?] } dfs_descendants(child_opts, child_name) end true end
def self.dfs_hierarchy(opts, klass_name = nil, ancestral_nodes = [])
def self.dfs_hierarchy(opts, klass_name = nil, ancestral_nodes = []) return @cache[klass_name] if klass_name.in? @cache.keys return klass_name if opts[:class].in? ancestral_nodes # Early abort to not enter in a cycle if leaf?(opts[:class]) @cache[klass_name] = klass_name return klass_name if klass_name.present? # Leaf [] # Leaf and Root else ancestral_nodes.push(opts[:class]) children_hierarchies = children_classes(opts).map do |c_class, c_name| dfs_hierarchy({ class: c_class, classes?: opts[:classes?] }, c_name, ancestral_nodes.dup) end @cache[klass_name] = { klass_name => children_hierarchies } return @cache[klass_name] if klass_name.present? # Middle children_hierarchies # Root end end
def self.get_class(reflection)
def self.get_class(reflection) child = reflection.name.to_s.singularize.classify child = reflection.options[:class_name].to_s if reflection.options.key?(:class_name) child.constantize end
def self.hashify(array)
def self.hashify(array) if array.length == 1 array.first else { array.first => hashify(array.drop(1)) } end end
def self.leaf?(klass)
def self.leaf?(klass) return true if walkables(klass).empty? false end
def self.loop?(klass)
def self.loop?(klass) @cache = {} false if dfs_hierarchy(class: klass, classes?: false) rescue SystemStackError true end
def self.non_looped_path?(from, to, relation, next_path)
def self.non_looped_path?(from, to, relation, next_path) relation.class_name != from.name and relation.class_name != to.name and next_path == next_path.uniq end
def self.topological_sort(classes)
def self.topological_sort(classes) dependencies = classes.to_h do |c| [c, Hierarchy.ancestors_bfs(from: c.constantize, to: classes[0].constantize, classify: true)] end sorted_items = [] visited = {} visit = lambda do |item| unless visited[item] visited[item] = true dependencies[item]&.each { |dependency| visit.call(dependency) } sorted_items.unshift(item) end end classes.each { |item| visit.call(item) unless visited[item] } sorted_items end
def self.valid_path?(path, target)
def self.valid_path?(path, target) return true if path == target case path when Array path.any? { |sub_path| valid_path?(sub_path, target) } when Hash path.values.any? { |value| valid_path?(value, target) } else false end end
def self.walkables(klass)
def self.walkables(klass) # get all models associated with :has_many or :has_one that are walkable. klass.reflections.values.select do |r| r.macro.in? %i[has_one has_many] and not r.options.key?(:through) end end