lib/polars/rolling_group_by.rb
module Polars # A rolling grouper. # # This has an `.agg` method which will allow you to run all polars expressions in a # group by context. class RollingGroupBy # @private def initialize( df, index_column, period, offset, closed, group_by, predicates ) period = Utils.parse_as_duration_string(period) offset = Utils.parse_as_duration_string(offset) @df = df @time_column = index_column @period = period @offset = offset @closed = closed @group_by = group_by @predicates = predicates end # Filter groups with a list of predicates after aggregation. # # Using this method is equivalent to adding the predicates to the aggregation and # filtering afterwards. # # This method can be chained and all conditions will be combined using `&`. # # @param predicates [Array] # Expressions that evaluate to a boolean value for each group. Typically, this # requires the use of an aggregation function. Multiple predicates are # combined using `&`. # # @return [RollingGroupBy] def having(*predicates) RollingGroupBy.new( @df, @time_column, @period, @offset, @closed, @group_by, Utils._chain_predicates(@predicates, predicates) ) end # Compute aggregations for each group of a group by operation. # # @param aggs [Array] # Aggregations to compute for each group of the group by operation, # specified as positional arguments. # Accepts expression input. Strings are parsed as column names. # @param named_aggs [Hash] # Additional aggregations, specified as keyword arguments. # The resulting columns will be renamed to the keyword used. # # @return [DataFrame] def agg(*aggs, **named_aggs) group_by = @df.lazy.rolling( index_column: @time_column, period: @period, offset: @offset, closed: @closed, group_by: @group_by ) if @predicates&.any? group_by = group_by.having(@predicates) end group_by.agg(*aggs, **named_aggs).collect( optimizations: QueryOptFlags.none ) end # Apply a custom/user-defined function (UDF) over the groups as a new DataFrame. # # Using this is considered an anti-pattern as it will be very slow because: # # - it forces the engine to materialize the whole `DataFrames` for the groups. # - it is not parallelized. # - it blocks optimizations as the passed python function is opaque to the # optimizer. # # The idiomatic way to apply custom functions over multiple columns is using: # # `Polars.struct([my_columns]).map_elements { |struct_series| ... }` # # @param schema [Object] # Schema of the output function. This has to be known statically. If the # given schema is incorrect, this is a bug in the caller's query and may # lead to errors. If set to None, polars assumes the schema is unchanged. # # @return [DataFrame] def map_groups( schema, &function ) if @predicates&.any? msg = "cannot call `map_groups` when filtering groups with `having`" raise TypeError, msg end @df.lazy .rolling( index_column: @time_column, period: @period, offset: @offset, closed: @closed, group_by: @group_by ) .map_groups(schema, &function) .collect(optimizations: QueryOptFlags.none) end end end