lib/elastic_graph/graphql/aggregation/query.rb



# Copyright 2024 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/graphql/aggregation/key"
require "elastic_graph/graphql/datastore_query"
require "elastic_graph/graphql/filtering/field_path"
require "elastic_graph/support/hash_util"

module ElasticGraph
  class GraphQL
    module Aggregation
      class Query < ::Data.define(
        # Unique name for the aggregation
        :name,
        # Whether or not we need to get the document count for each bucket.
        :needs_doc_count,
        # Whether or not we need to get the error on the document count to satisfy the sub-aggregation query.
        # https://www.elastic.co/guide/en/elasticsearch/reference/8.10/search-aggregations-bucket-terms-aggregation.html#_per_bucket_document_count_error
        :needs_doc_count_error,
        # Filter to apply to this sub-aggregation.
        :filter,
        # Paginator for handling size and other pagination concerns.
        :paginator,
        # A sub-aggregation query can have sub-aggregations of its own.
        :sub_aggregations,
        # Collection of `Computation` objects that specify numeric computations to perform.
        :computations,
        # Collection of `DateHistogramGrouping`, `FieldTermGrouping`, and `ScriptTermGrouping` objects that specify how this sub-aggregation should be grouped.
        :groupings,
        # Adapter to use for groupings.
        :grouping_adapter
      )
        def needs_total_doc_count?
          # We only need a total document count when there are NO groupings and the doc count is requested.
          # The datastore will return the number of hits in each grouping automatically, so we don't need
          # a total doc count when there are groupings. And when the query isn't requesting the field, we
          # don't need it, either.
          needs_doc_count && groupings.empty?
        end

        # Builds an aggregations hash. The returned value has a few different cases:
        #
        # - If `size` is 0, or `groupings` and `computations` are both empty, we return an empty hash,
        #   so that `to_datastore_body` is an empty hash. We do this so that we avoid sending
        #   the datastore any sort of aggregations query in these cases, as the client is not
        #   requesting any aggregation data.
        # - If `SINGLETON_CURSOR` was provide for either `before` or `after`, we also return an empty hash,
        #   because we know there cannot be any results to return--the cursor is a reference to
        #   the one and only item in the list, and nothing can exist before or after it.
        # - Otherwise, we return an aggregatinos hash based on the groupings, computations, and sub-aggregations.
        def build_agg_hash(filter_interpreter)
          build_agg_detail(filter_interpreter, field_path: [], parent_queries: [])&.clauses || {}
        end

        def build_agg_detail(filter_interpreter, field_path:, parent_queries:)
          return nil if paginator.desired_page_size.zero? || paginator.paginated_from_singleton_cursor?
          queries = parent_queries + [self] # : ::Array[Query]

          filter_detail(filter_interpreter, field_path) do
            grouping_adapter.grouping_detail_for(self) do
              Support::HashUtil.disjoint_merge(computations_detail, sub_aggregation_detail(filter_interpreter, queries))
            end
          end
        end

        private

        def filter_detail(filter_interpreter, field_path)
          filtering_field_path = Filtering::FieldPath.of(field_path.filter_map(&:name_in_index))
          filter_clause = filter_interpreter.build_query([filter].compact, from_field_path: filtering_field_path)

          inner_detail = yield

          return inner_detail if filter_clause.nil?
          key = "#{name}:filtered"

          clause = {
            key => {
              "filter" => filter_clause,
              "aggs" => inner_detail.clauses
            }.compact
          }

          inner_meta = inner_detail.meta
          meta =
            if (buckets_path = inner_detail.meta["buckets_path"])
              # In this case, we have some grouping aggregations applied, and the response will include a `buckets` array.
              # Here we are prefixing the `buckets_path` with the `key` used for our filter aggregation to maintain its accuracy.
              inner_meta.merge({"buckets_path" => [key] + buckets_path})
            else
              # In this case, no grouping aggregations have been applied, and the response will _not_ have a `buckets` array.
              # Instead, we'll need to treat the single unbucketed aggregation as a single bucket. To indicate that, we use
              # `bucket_path` (singular) rather than `buckets_path` (plural).
              inner_meta.merge({"bucket_path" => [key]})
            end

          AggregationDetail.new(clause, meta)
        end

        def computations_detail
          build_inner_aggregation_detail(computations) do |computation|
            {computation.key(aggregation_name: name) => computation.clause}
          end
        end

        def sub_aggregation_detail(filter_interpreter, parent_queries)
          build_inner_aggregation_detail(sub_aggregations.values) do |sub_agg|
            sub_agg.build_agg_hash(filter_interpreter, parent_queries: parent_queries)
          end
        end

        def build_inner_aggregation_detail(collection, &block)
          initial = {} # : ::Hash[::String, untyped]
          collection.map(&block).reduce(initial) do |accum, hash|
            Support::HashUtil.disjoint_merge(accum, hash)
          end
        end
      end

      # The details of an aggregation level, including the `aggs` clauses themselves and `meta`
      # that we want echoed back to us in the response for the aggregation level.
      AggregationDetail = ::Data.define(
        # Aggregation clauses that would go under `aggs.
        :clauses,
        # Custom metadata that will be echoed back to us in the response.
        # https://www.elastic.co/guide/en/elasticsearch/reference/8.11/search-aggregations.html#add-metadata-to-an-agg
        :meta
      ) do
        # @implements AggregationDetail

        # Wraps this aggregation detail in another aggregation layer for the given `grouping`,
        # so that we can easily build up the necessary multi-level aggregation structure.
        def wrap_with_grouping(grouping, query:)
          agg_key = grouping.key
          extra_inner_meta = grouping.inner_meta.merge({
            # The response just includes tuples of values for the key of each bucket. We need to know what fields those
            # values come from, and this `meta` field  indicates that.
            "grouping_fields" => [agg_key]
          })

          inner_agg_hash = {
            "aggs" => (clauses unless (clauses || {}).empty?),
            "meta" => meta.merge(extra_inner_meta)
          }.compact

          missing_bucket_inner_agg_hash = inner_agg_hash.key?("aggs") ? inner_agg_hash : {} # : ::Hash[::String, untyped]

          AggregationDetail.new(
            {
              agg_key => grouping.non_composite_clause_for(query).merge(inner_agg_hash),

              # Here we include a `missing` aggregation as a sibling to the main grouping aggregation. We do this
              # so that we get a bucket of documents that have `null` values for the field we are grouping on, in
              # order to provide the same behavior as the `CompositeGroupingAdapter` (which uses the built-in
              # `missing_bucket` option).
              #
              # To work correctly, we need to include this `missing` aggregation as a sibling at _every_ level of
              # the aggregation structure, and the `missing` aggregation needs the same child aggregations as the
              # main grouping aggregation has. Given the recursive nature of how this is applied, this results in
              # a fairly complex structure, even though conceptually the idea behind this isn't _too_ bad.
              Key.missing_value_bucket_key(agg_key) => {
                "missing" => {"field" => grouping.encoded_index_field_path}
              }.merge(missing_bucket_inner_agg_hash)
            },
            {"buckets_path" => [agg_key]}
          )
        end
      end
    end
  end
end