lib/polars/rolling_group_by.rb



module Polars
  # A rolling grouper.
  #
  # This has an `.agg` method which will allow you to run all polars expressions in a
  # group by context.
  class RollingGroupBy
    # @private
    def initialize(
      df,
      index_column,
      period,
      offset,
      closed,
      group_by,
      predicates
    )
      period = Utils.parse_as_duration_string(period)
      offset = Utils.parse_as_duration_string(offset)

      @df = df
      @time_column = index_column
      @period = period
      @offset = offset
      @closed = closed
      @group_by = group_by
      @predicates = predicates
    end

    # Filter groups with a list of predicates after aggregation.
    #
    # Using this method is equivalent to adding the predicates to the aggregation and
    # filtering afterwards.
    #
    # This method can be chained and all conditions will be combined using `&`.
    #
    # @param predicates [Array]
    #   Expressions that evaluate to a boolean value for each group. Typically, this
    #   requires the use of an aggregation function. Multiple predicates are
    #   combined using `&`.
    #
    # @return [RollingGroupBy]
    def having(*predicates)
      RollingGroupBy.new(
        @df,
        @time_column,
        @period,
        @offset,
        @closed,
        @group_by,
        Utils._chain_predicates(@predicates, predicates)
      )
    end

    # Compute aggregations for each group of a group by operation.
    #
    # @param aggs [Array]
    #   Aggregations to compute for each group of the group by operation,
    #   specified as positional arguments.
    #   Accepts expression input. Strings are parsed as column names.
    # @param named_aggs [Hash]
    #   Additional aggregations, specified as keyword arguments.
    #   The resulting columns will be renamed to the keyword used.
    #
    # @return [DataFrame]
    def agg(*aggs, **named_aggs)
      group_by =
        @df.lazy.rolling(
          index_column: @time_column, period: @period, offset: @offset, closed: @closed, group_by: @group_by
        )

      if @predicates&.any?
        group_by = group_by.having(@predicates)
      end

      group_by.agg(*aggs, **named_aggs).collect(
        optimizations: QueryOptFlags.none
      )
    end

    # Apply a custom/user-defined function (UDF) over the groups as a new DataFrame.
    #
    # Using this is considered an anti-pattern as it will be very slow because:
    #
    # - it forces the engine to materialize the whole `DataFrames` for the groups.
    # - it is not parallelized.
    # - it blocks optimizations as the passed python function is opaque to the
    #   optimizer.
    #
    # The idiomatic way to apply custom functions over multiple columns is using:
    #
    # `Polars.struct([my_columns]).map_elements { |struct_series| ... }`
    #
    # @param schema [Object]
    #   Schema of the output function. This has to be known statically. If the
    #   given schema is incorrect, this is a bug in the caller's query and may
    #   lead to errors. If set to None, polars assumes the schema is unchanged.
    #
    # @return [DataFrame]
    def map_groups(
      schema,
      &function
    )
      if @predicates&.any?
        msg = "cannot call `map_groups` when filtering groups with `having`"
        raise TypeError, msg
      end

      @df.lazy
        .rolling(
          index_column: @time_column,
          period: @period,
          offset: @offset,
          closed: @closed,
          group_by: @group_by
        )
        .map_groups(schema, &function)
        .collect(optimizations: QueryOptFlags.none)
    end
  end
end