class Polars::LazyFrame

Representation of a Lazy computation graph/query against a DataFrame.

def self._from_rbldf(rb_ldf)

Other tags:
    Private: -
def self._from_rbldf(rb_ldf)
  ldf = LazyFrame.allocate
  ldf._ldf = rb_ldf
  ldf
end

def self._select_engine(engine)

Other tags:
    Private: -
def self._select_engine(engine)
  engine == "auto" ? Plr.get_engine_affinity : engine
end

def self.deserialize(source, format: "binary")

Other tags:
    Note: -
    Note: -

Returns:
  • (LazyFrame) -

Parameters:
  • format ('binary', 'json') --
  • source (Object) --
def self.deserialize(source, format: "binary")
  if Utils.pathlike?(source)
    source = Utils.normalize_filepath(source)
  end
  if format == "binary"
    raise Todo unless RbLazyFrame.respond_to?(:deserialize_binary)
    deserializer = RbLazyFrame.method(:deserialize_binary)
  elsif format == "json"
    deserializer = RbLazyFrame.method(:deserialize_json)
  else
    msg = "`format` must be one of {{'binary', 'json'}}, got #{format.inspect}"
    raise ArgumentError, msg
  end
  _from_rbldf(deserializer.(source))
end

def _filter(

def _filter(
  predicates:,
  constraints:,
  invert: false
)
  all_predicates = []
  boolean_masks = []
  predicates.each do |p|
    # quick exit/skip conditions
    if (p.is_a?(FalseClass) && invert) || (p.is_a?(TrueClass) && !invert)
      next # ignore; doesn't filter/remove anything
    end
    if (p.is_a?(TrueClass) && invert) || (p.is_a?(FalseClass) && !invert)
      return clear # discard all rows
    end
    # note: identify masks separately from predicates
    if Utils.is_bool_sequence(p, include_series: true)
      boolean_masks << Polars::Series.new(p, dtype: Boolean)
    elsif (
      (is_seq = Utils.is_sequence(p)) && p.any? { |x| !x.is_a?(Expr) }) ||
      (!is_seq && !p.is_a?(Expr) && !(p.is_a?(::String) && collect_schema.include?(p))
    )
      err = p.is_a?(Series) ? "Series(…, dtype: #{p.dtype})" : p.inspect
      msg = "invalid predicate for `filter`: #{err}"
      raise TypeError, msg
    else
      all_predicates.concat(
        Utils.parse_into_list_of_expressions(p).map { |x| Utils.wrap_expr(x) }
      )
    end
  end
  # unpack equality constraints from kwargs
  all_predicates.concat(
    constraints.map { |name, value| F.col(name).eq(value) }
  )
  if !(all_predicates.any? || boolean_masks.any?)
    msg = "at least one predicate or constraint must be provided"
    raise TypeError, msg
  end
  # if multiple predicates, combine as 'horizontal' expression
  combined_predicate =
    if all_predicates.any?
      if all_predicates.length > 1
        F.all_horizontal(*all_predicates)
      else
        all_predicates[0]
      end
    else
      nil
    end
  # apply reduced boolean mask first, if applicable, then predicates
  if boolean_masks.any?
    if boolean_masks.length > 1
      raise Todo
    end
    mask_expr = F.lit(boolean_masks[0])
    combined_predicate =
      if combined_predicate.nil?
        mask_expr
      else
        mask_expr & combined_predicate
      end
  end
  if combined_predicate.nil?
    return _from_rbldf(_ldf)
  end
  filter_method = invert ? _ldf.method(:remove) : _ldf.method(:filter)
  _from_rbldf(filter_method.(combined_predicate._rbexpr))
end

def _from_rbldf(rb_ldf)

def _from_rbldf(rb_ldf)
  self.class._from_rbldf(rb_ldf)
end

def _select_engine(engine)

def _select_engine(engine)
  self.class._select_engine(engine)
end

def _to_sink_target(path)

def _to_sink_target(path)
  if Utils.pathlike?(path)
    Utils.normalize_filepath(path)
  else
    path
  end
end

def bottom_k(

Other tags:
    Example: Get the rows which contain the 4 smallest values when sorting on column a and b. -
    Example: Get the rows which contain the 4 smallest values in column b. -

Returns:
  • (LazyFrame) -

Parameters:
  • reverse (Object) --
  • by (Object) --
  • k (Integer) --
def bottom_k(
  k,
  by:,
  reverse: false
)
  by = Utils.parse_into_list_of_expressions(by)
  reverse = Utils.extend_bool(reverse, by.length, "reverse", "by")
  _from_rbldf(_ldf.bottom_k(k, by, reverse))
end

def cache

Returns:
  • (LazyFrame) -
def cache
  _from_rbldf(_ldf.cache)
end

def cast(dtypes, strict: true)

Other tags:
    Example: Cast all frame columns to the specified dtype: -
    Example: Cast all frame columns matching one dtype (or dtype group) to another dtype: -
    Example: Cast specific frame columns to the specified dtypes: -

Returns:
  • (LazyFrame) -

Parameters:
  • strict (Boolean) --
  • dtypes (Hash) --
def cast(dtypes, strict: true)
  if !dtypes.is_a?(Hash)
    return _from_rbldf(_ldf.cast_all(dtypes, strict))
  end
  cast_map = {}
  dtypes.each do |c, dtype|
    dtype = Utils.parse_into_dtype(dtype)
    cast_map.merge!(
      c.is_a?(::String) ? {c => dtype} : Utils.expand_selector(self, c).to_h { |x| [x, dtype] }
    )
  end
  _from_rbldf(_ldf.cast(cast_map, strict))
end

def clear(n = 0)

Returns:
  • (LazyFrame) -
def clear(n = 0)
  DataFrame.new(schema: schema).clear(n).lazy
end

def collect(

Returns:
  • (DataFrame) -

Parameters:
  • optimizations () --
  • background (Boolean) --
  • engine () --
def collect(
  engine: "auto",
  background: false,
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  engine = _select_engine(engine)
  if engine == "streaming"
    Utils.issue_unstable_warning("streaming mode is considered unstable.")
  end
  ldf = _ldf.with_optimizations(optimizations._rboptflags)
  if background
    Utils.issue_unstable_warning("background mode is considered unstable.")
    return InProcessQuery.new(ldf.collect_concurrently)
  end
  Utils.wrap_df(ldf.collect(engine))
end

def collect_batches(

Returns:
  • (Object) -

Parameters:
  • optimizations (Object) --
  • engine (String) --
  • lazy (Boolean) --
  • maintain_order (Boolean) --
  • chunk_size (Integer) --

Other tags:
    Note: -
    Note: -
def collect_batches(
  chunk_size: nil,
  maintain_order: true,
  lazy: false,
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  ldf = _ldf.with_optimizations(optimizations._rboptflags)
  inner = ldf.collect_batches(
    engine,
    maintain_order,
    chunk_size,
    lazy
  )
  CollectBatches.new(inner)
end

def collect_schema

Other tags:
    Example: Access various properties of the schema. -
    Example: Determine the schema. -

Returns:
  • (Schema) -
def collect_schema
  Schema.new(_ldf.collect_schema, check_dtypes: false)
end

def columns

Returns:
  • (Array) -
def columns
  _ldf.collect_schema.keys
end

def count

Returns:
  • (LazyFrame) -
def count
  _from_rbldf(_ldf.count)
end

def describe(

Other tags:
    Example: Customize which percentiles are displayed, applying linear interpolation: -
    Example: Show default frame statistics: -

Other tags:
    Note: -
    Note: -
    Note: -

Returns:
  • (DataFrame) -

Parameters:
  • interpolation ('nearest', 'higher', 'lower', 'midpoint', 'linear', 'equiprobable') --
  • percentiles (Array) --
def describe(
  percentiles: [0.25, 0.5, 0.75],
  interpolation: "nearest"
)
  schema = collect_schema.to_h
  if schema.empty?
    msg = "cannot describe a LazyFrame that has no columns"
    raise TypeError, msg
  end
  # create list of metrics
  metrics = ["count", "null_count", "mean", "std", "min"]
  if (quantiles = Utils.parse_percentiles(percentiles)).any?
    metrics.concat(quantiles.map { |q| "%g%%" % [q * 100] })
  end
  metrics.append("max")
  skip_minmax = lambda do |dt|
    dt.nested? || [Categorical, Enum, Null, Object, Unknown].include?(dt)
  end
  # determine which columns will produce std/mean/percentile/etc
  # statistics in a single pass over the frame schema
  has_numeric_result, sort_cols = Set.new, Set.new
  metric_exprs = []
  null = F.lit(nil)
  schema.each do |c, dtype|
    is_numeric = dtype.numeric?
    is_temporal = !is_numeric && dtype.temporal?
    # counts
    count_exprs = [
      F.col(c).count.name.prefix("count:"),
      F.col(c).null_count.name.prefix("null_count:")
    ]
    # mean
    mean_expr =
      if is_temporal || is_numeric || dtype == Boolean
        F.col(c).mean
      else
        null
      end
    # standard deviation, min, max
    expr_std = is_numeric ? F.col(c).std : null
    min_expr = !skip_minmax.(dtype) ? F.col(c).min : null
    max_expr = !skip_minmax.(dtype) ? F.col(c).max : null
    # percentiles
    pct_exprs = []
    quantiles.each do |p|
      if is_numeric || is_temporal
        pct_expr =
          if is_temporal
            F.col(c).to_physical.quantile(p, interpolation: interpolation).cast(dtype)
          else
            F.col(c).quantile(p, interpolation: interpolation)
          end
        sort_cols.add(c)
      else
        pct_expr = null
      end
      pct_exprs << pct_expr.alias("#{p}:#{c}")
    end
    if is_numeric || dtype.nested? || [Null, Boolean].include?(dtype)
      has_numeric_result.add(c)
    end
    # add column expressions (in end-state 'metrics' list order)
    metric_exprs.concat(
      [
        *count_exprs,
        mean_expr.alias("mean:#{c}"),
        expr_std.alias("std:#{c}"),
        min_expr.alias("min:#{c}"),
        *pct_exprs,
        max_expr.alias("max:#{c}")
      ]
    )
  end
  # calculate requested metrics in parallel, then collect the result
  df_metrics = (
    (
      # if more than one quantile, sort the relevant columns to make them O(1)
      # TODO: drop sort once we have efficient retrieval of multiple quantiles
      sort_cols ? with_columns(sort_cols.map { |c| F.col(c).sort }) : self
    )
    .select(*metric_exprs)
    .collect
  )
  # reshape wide result
  n_metrics = metrics.length
  column_metrics =
    schema.length.times.map do |n|
      df_metrics.row(0)[(n * n_metrics)...((n + 1) * n_metrics)]
    end
  summary = schema.keys.zip(column_metrics).to_h
  # cast by column type (numeric/bool -> float), (other -> string)
  schema.each_key do |c|
    summary[c] =
      summary[c].map do |v|
        if v.nil? || v.is_a?(Hash)
          nil
        else
          if has_numeric_result.include?(c)
            if v == true
              1.0
            elsif v == false
              0.0
            else
              v.to_f
            end
          else
            "#{v}"
          end
        end
      end
  end
  # return results as a DataFrame
  df_summary = Polars.from_hash(summary)
  df_summary.insert_column(0, Polars::Series.new("statistic", metrics))
  df_summary
end

def drop(*columns, strict: true)

Other tags:
    Example: Use positional arguments to drop multiple columns. -
    Example: Drop multiple columns by passing a selector. -
    Example: Drop a single column by passing the name of that column. -

Returns:
  • (LazyFrame) -

Parameters:
  • strict (Boolean) --
  • columns (Object) --
def drop(*columns, strict: true)
  selectors = []
  columns.each do |c|
    if c.is_a?(Enumerable)
      selectors += c
    else
      selectors += [c]
    end
  end
  drop_cols = Utils.parse_list_into_selector(selectors, strict: strict)
  _from_rbldf(_ldf.drop(drop_cols._rbselector))
end

def drop_nans(subset: nil)

Returns:
  • (LazyFrame) -

Parameters:
  • subset (Object) --
def drop_nans(subset: nil)
  selector_subset = nil
  if !subset.nil?
    selector_subset = Utils.parse_list_into_selector(subset)._rbselector
  end
  _from_rbldf(_ldf.drop_nans(selector_subset))
end

def drop_nulls(subset: nil)

Returns:
  • (LazyFrame) -

Parameters:
  • subset (Object) --
def drop_nulls(subset: nil)
  selector_subset = nil
  if !subset.nil?
    selector_subset = Utils.parse_list_into_selector(subset)._rbselector
  end
  _from_rbldf(_ldf.drop_nulls(selector_subset))
end

def dtypes

Returns:
  • (Array) -
def dtypes
  _ldf.collect_schema.values
end

def explain(

Returns:
  • (String) -
def explain(
  format: "plain",
  optimized: true,
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  engine = _select_engine(engine)
  if engine == "streaming"
    Utils.issue_unstable_warning("streaming mode is considered unstable.")
  end
  if optimized
    ldf = _ldf.with_optimizations(optimizations._rboptflags)
    if format == "tree"
      return ldf.describe_optimized_plan_tree
    else
      return ldf.describe_optimized_plan
    end
  end
  if format == "tree"
    _ldf.describe_plan_tree
  else
    _ldf.describe_plan
  end
end

def explode(

Returns:
  • (LazyFrame) -

Parameters:
  • keep_nulls (Boolean) --
  • empty_as_null (Boolean) --
  • more_columns (Array) --
  • columns (Object) --
def explode(
  columns,
  *more_columns,
  empty_as_null: true,
  keep_nulls: true
)
  subset = Utils.parse_list_into_selector(columns) | Utils.parse_list_into_selector(
    more_columns
  )
  _from_rbldf(_ldf.explode(subset._rbselector, empty_as_null, keep_nulls))
end

def fill_nan(value)

Other tags:
    Note: -

Returns:
  • (LazyFrame) -

Parameters:
  • value (Object) --
def fill_nan(value)
  if !value.is_a?(Expr)
    value = F.lit(value)
  end
  _from_rbldf(_ldf.fill_nan(value._rbexpr))
end

def fill_null(value = nil, strategy: nil, limit: nil, matches_supertype: true)

Returns:
  • (LazyFrame) -
def fill_null(value = nil, strategy: nil, limit: nil, matches_supertype: true)
  if !value.nil?
    if value.is_a?(Expr)
      dtypes = nil
    elsif value.is_a?(TrueClass) || value.is_a?(FalseClass)
      dtypes = [Boolean]
    elsif matches_supertype && (value.is_a?(Integer) || value.is_a?(Float))
      dtypes = [
        Int8,
        Int16,
        Int32,
        Int64,
        Int128,
        UInt8,
        UInt16,
        UInt32,
        UInt64,
        Float32,
        Float64,
        Decimal.new
      ]
    elsif value.is_a?(Integer)
      dtypes = [Int64]
    elsif value.is_a?(Float)
      dtypes = [Float64]
    elsif value.is_a?(::Date)
      dtypes = [Date]
    elsif value.is_a?(::String)
      dtypes = [String, Categorical]
    else
      # fallback; anything not explicitly handled above
      dtypes = nil
    end
    if dtypes
      return with_columns(
        F.col(dtypes).fill_null(value, strategy: strategy, limit: limit)
      )
    end
  end
  select(Polars.all.fill_null(value, strategy: strategy, limit: limit))
end

def filter(*predicates, **constraints)

Other tags:
    Example: Filter by comparing two columns against each other -
    Example: Filter on an OR condition: -
    Example: Provide multiple filters using `**kwargs` syntax: -
    Example: Provide multiple filters using `*args` syntax: -
    Example: Filter on multiple conditions: -
    Example: Filter on one condition: -

Returns:
  • (LazyFrame) -

Parameters:
  • constraints (Hash) --
  • predicates (Array) --
def filter(*predicates, **constraints)
  if constraints.empty?
    # early-exit conditions (exclude/include all rows)
    if predicates.empty? || (predicates.length == 1 && predicates[0].is_a?(TrueClass))
      return dup
    end
    if predicates.length == 1 && predicates[0].is_a?(FalseClass)
      return clear
    end
  end
  _filter(
    predicates: predicates,
    constraints: constraints,
    invert: false
  )
end

def first

Returns:
  • (LazyFrame) -
def first
  slice(0, 1)
end

def gather_every(n, offset: 0)

Returns:
  • (LazyFrame) -

Parameters:
  • offset (Integer) --
  • n (Integer) --
def gather_every(n, offset: 0)
  select(F.col("*").gather_every(n, offset))
end

def group_by(*by, maintain_order: false, **named_by)

Returns:
  • (LazyGroupBy) -

Parameters:
  • named_by (Hash) --
  • maintain_order (Boolean) --
  • by (Array) --
def group_by(*by, maintain_order: false, **named_by)
  exprs = Utils.parse_into_list_of_expressions(*by, **named_by)
  lgb = _ldf.group_by(exprs, maintain_order)
  LazyGroupBy.new(lgb)
end

def group_by_dynamic(

Other tags:
    Example: Dynamic group by on an index column. -
    Example: Dynamic group bys can also be combined with grouping on normal keys. -
    Example: When closed="both" the time values at the window boundaries belong to 2 groups. -
    Example: When closed="left", should not include right end of interval. -
    Example: The window boundaries can also be added to the aggregation result. -
    Example: Group by windows of 1 hour starting at 2021-12-16 00:00:00. -

Returns:
  • (DataFrame) -

Parameters:
  • start_by ('window', 'datapoint', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday') --
  • group_by (Object) --
  • label ('left', 'right', 'datapoint') --
  • closed ("right", "left", "both", "none") --
  • include_boundaries (Boolean) --
  • offset (Object) --
  • period (Object) --
  • every (Object) --
  • index_column (Object) --
def group_by_dynamic(
  index_column,
  every:,
  period: nil,
  offset: nil,
  include_boundaries: false,
  closed: "left",
  label: "left",
  group_by: nil,
  start_by: "window"
)
  index_column = Utils.parse_into_expression(index_column, str_as_lit: false)
  if offset.nil?
    offset = period.nil? ? "-#{every}" : "0ns"
  end
  if period.nil?
    period = every
  end
  period = Utils.parse_as_duration_string(period)
  offset = Utils.parse_as_duration_string(offset)
  every = Utils.parse_as_duration_string(every)
  rbexprs_by = group_by.nil? ? [] : Utils.parse_into_list_of_expressions(group_by)
  lgb = _ldf.group_by_dynamic(
    index_column,
    every,
    period,
    offset,
    label,
    include_boundaries,
    closed,
    rbexprs_by,
    start_by
  )
  LazyGroupBy.new(lgb)
end

def head(n = 5)

Returns:
  • (LazyFrame) -

Parameters:
  • n (Integer) --
def head(n = 5)
  slice(0, n)
end

def include?(key)

Returns:
  • (Boolean) -
def include?(key)
  columns.include?(key)
end

def initialize(

Create a new LazyFrame.
def initialize(
  data = nil,
  schema: nil,
  schema_overrides: nil,
  strict: true,
  orient: nil,
  infer_schema_length: N_INFER_DEFAULT,
  nan_to_null: false,
  height: nil
)
  self._ldf = (
    DataFrame.new(
      data,
      schema: schema,
      schema_overrides: schema_overrides,
      strict: strict,
      orient: orient,
      infer_schema_length: infer_schema_length,
      nan_to_null: nan_to_null,
      height: height
    )
    .lazy
    ._ldf
  )
end

def initialize_copy(other)

def initialize_copy(other)
  super
  self._ldf = _ldf._clone
end

def interpolate

Returns:
  • (LazyFrame) -
def interpolate
  select(F.col("*").interpolate)
end

def join(

Returns:
  • (LazyFrame) -

Parameters:
  • maintain_order ('none', 'left', 'right', 'left_right', 'right_left') --
  • coalesce (Boolean) --
  • force_parallel (Boolean) --
  • allow_parallel (Boolean) --
  • nulls_equal (Boolean) --
  • validate ('m:m', 'm:1', '1:m', '1:1') --
  • suffix (String) --
  • how ("inner", "left", "full", "semi", "anti", "cross") --
  • on () -- Object
  • right_on (Object) --
  • left_on (Object) --
  • other (LazyFrame) --
def join(
  other,
  left_on: nil,
  right_on: nil,
  on: nil,
  how: "inner",
  suffix: "_right",
  validate: "m:m",
  nulls_equal: false,
  allow_parallel: true,
  force_parallel: false,
  coalesce: nil,
  maintain_order: nil
)
  if !other.is_a?(LazyFrame)
    raise ArgumentError, "Expected a `LazyFrame` as join table, got #{other.class.name}"
  end
  if maintain_order.nil?
    maintain_order = "none"
  end
  if how == "outer"
    how = "full"
  elsif how == "cross"
    return _from_rbldf(
      _ldf.join(
        other._ldf,
        [],
        [],
        allow_parallel,
        nulls_equal,
        force_parallel,
        how,
        suffix,
        validate,
        maintain_order,
        coalesce
      )
    )
  end
  if !on.nil?
    rbexprs = Utils.parse_into_list_of_expressions(on)
    rbexprs_left = rbexprs
    rbexprs_right = rbexprs
  elsif !left_on.nil? && !right_on.nil?
    rbexprs_left = Utils.parse_into_list_of_expressions(left_on)
    rbexprs_right = Utils.parse_into_list_of_expressions(right_on)
  else
    raise ArgumentError, "must specify `on` OR `left_on` and `right_on`"
  end
  _from_rbldf(
    self._ldf.join(
      other._ldf,
      rbexprs_left,
      rbexprs_right,
      allow_parallel,
      force_parallel,
      nulls_equal,
      how,
      suffix,
      validate,
      maintain_order,
      coalesce
    )
  )
end

def join_asof(

Other tags:
    Example: If we instead use `strategy: "forward"`, then each date from `population` which doesn't have an exact match is matched with the closest later date from `gdp`: -
    Example: Note how the dates don't quite match. If we join them using `join_asof` and `strategy: "backward"`, then each date from `population` which doesn't have an exact match is matched with the closest earlier date from `gdp`: -

Returns:
  • (LazyFrame) -

Parameters:
  • check_sortedness (Boolean) --
  • allow_exact_matches (Boolean) --
  • coalesce (Boolean) --
  • force_parallel (Boolean) --
  • allow_parallel (Boolean) --
  • tolerance (Object) --
  • suffix (String) --
  • strategy ("backward", "forward") --
  • by (Object) --
  • by_right (Object) --
  • by_left (Object) --
  • on (String) --
  • right_on (String) --
  • left_on (String) --
  • other (LazyFrame) --
def join_asof(
  other,
  left_on: nil,
  right_on: nil,
  on: nil,
  by_left: nil,
  by_right: nil,
  by: nil,
  strategy: "backward",
  suffix: "_right",
  tolerance: nil,
  allow_parallel: true,
  force_parallel: false,
  coalesce: true,
  allow_exact_matches: true,
  check_sortedness: true
)
  if !other.is_a?(LazyFrame)
    raise ArgumentError, "Expected a `LazyFrame` as join table, got #{other.class.name}"
  end
  if on.is_a?(::String)
    left_on = on
    right_on = on
  end
  if left_on.nil? || right_on.nil?
    raise ArgumentError, "You should pass the column to join on as an argument."
  end
  if by_left.is_a?(::String) || by_left.is_a?(Expr)
    by_left_ = [by_left]
  else
    by_left_ = by_left
  end
  if by_right.is_a?(::String) || by_right.is_a?(Expr)
    by_right_ = [by_right]
  else
    by_right_ = by_right
  end
  if by.is_a?(::String)
    by_left_ = [by]
    by_right_ = [by]
  elsif by.is_a?(::Array)
    by_left_ = by
    by_right_ = by
  end
  tolerance_str = nil
  tolerance_num = nil
  if tolerance.is_a?(::String)
    tolerance_str = tolerance
  else
    tolerance_num = tolerance
  end
  _from_rbldf(
    _ldf.join_asof(
      other._ldf,
      Polars.col(left_on)._rbexpr,
      Polars.col(right_on)._rbexpr,
      by_left_,
      by_right_,
      allow_parallel,
      force_parallel,
      suffix,
      strategy,
      tolerance_num,
      tolerance_str,
      coalesce,
      allow_exact_matches,
      check_sortedness
    )
  )
end

def join_where(

Other tags:
    Example: To OR them together, use a single expression and the `|` operator. -
    Example: Join two lazyframes together based on two predicates which get AND-ed together. -

Returns:
  • (LazyFrame) -

Parameters:
  • suffix (String) --
  • predicates (Object) --
  • other (Object) --

Other tags:
    Note: -
    Note: -
def join_where(
  other,
  *predicates,
  suffix: "_right"
)
  Utils.require_same_type(self, other)
  rbexprs = Utils.parse_into_list_of_expressions(*predicates)
  _from_rbldf(
    _ldf.join_where(
      other._ldf,
      rbexprs,
      suffix
    )
  )
end

def last

Returns:
  • (LazyFrame) -
def last
  tail(1)
end

def lazy

Returns:
  • (LazyFrame) -
def lazy
  self
end

def limit(n = 5)

Returns:
  • (LazyFrame) -

Parameters:
  • n (Integer) --
def limit(n = 5)
  head(n)
end

def map_batches(

Other tags:
    Note: -
    Note: -
    Note: -

Returns:
  • (LazyFrame) -

Parameters:
  • streamable (Boolean) --
  • validate_output_schema (Boolean) --
  • schema (Object) --
  • no_optimizations (Boolean) --
  • slice_pushdown (Boolean) --
  • projection_pushdown (Boolean) --
  • predicate_pushdown (Boolean) --
def map_batches(
  predicate_pushdown: true,
  projection_pushdown: true,
  slice_pushdown: true,
  no_optimizations: false,
  schema: nil,
  validate_output_schema: true,
  streamable: false,
  &function
)
  raise Todo if !schema.nil?
  if no_optimizations
    predicate_pushdown = false
    projection_pushdown = false
    slice_pushdown = false
  end
  _from_rbldf(
    _ldf.map_batches(
      function,
      predicate_pushdown,
      projection_pushdown,
      slice_pushdown,
      streamable,
      schema,
      validate_output_schema,
    )
  )
end

def match_to_schema(

Other tags:
    Example: Upcasting integers and floats -
    Example: Removing extra columns -
    Example: Adding missing columns -
    Example: Ensuring the schema matches -

Returns:
  • (LazyFrame) -

Parameters:
  • float_cast (Object) --
  • integer_cast (Object) --
  • extra_struct_fields (Object) --
  • extra_columns (Object) --
  • missing_struct_fields (Object) --
  • missing_columns (Object) --
  • schema (Object) --

Other tags:
    Note: -
def match_to_schema(
  schema,
  missing_columns: "raise",
  missing_struct_fields: "raise",
  extra_columns: "raise",
  extra_struct_fields: "raise",
  integer_cast: "forbid",
  float_cast: "forbid"
)
  prepare_missing_columns = lambda do |value|
    if value.is_a?(Expr)
      value._rbexpr
    else
      value
    end
  end
  if schema.is_a?(Hash)
    schema_prep = Schema.new(schema)
  else
    schema_prep = schema
  end
  if missing_columns.is_a?(Hash)
    missing_columns_rbexpr =
      missing_columns.to_h do |key, value|
        [key.to_s, prepare_missing_columns.(value)]
      end
  elsif missing_columns.is_a?(Expr)
    missing_columns_rbexpr = prepare_missing_columns.(missing_columns)
  else
    missing_columns_rbexpr = missing_columns
  end
  LazyFrame._from_rbldf(
    _ldf.match_to_schema(
      schema_prep,
      missing_columns_rbexpr,
      missing_struct_fields,
      extra_columns,
      extra_struct_fields,
      integer_cast,
      float_cast
    )
  )
end

def max

Returns:
  • (LazyFrame) -
def max
  _from_rbldf(_ldf.max)
end

def mean

Returns:
  • (LazyFrame) -
def mean
  _from_rbldf(_ldf.mean)
end

def median

Returns:
  • (LazyFrame) -
def median
  _from_rbldf(_ldf.median)
end

def merge_sorted(other, key)

Returns:
  • (LazyFrame) -

Parameters:
  • key (String) --
  • other (DataFrame) --
def merge_sorted(other, key)
  _from_rbldf(_ldf.merge_sorted(other._ldf, key))
end

def min

Returns:
  • (LazyFrame) -
def min
  _from_rbldf(_ldf.min)
end

def null_count

Returns:
  • (LazyFrame) -
def null_count
  _from_rbldf(_ldf.null_count)
end

def pipe(function, *args, **kwargs, &block)

Returns:
  • (LazyFrame) -

Parameters:
  • kwargs (Object) --
  • args (Object) --
  • function (Object) --
def pipe(function, *args, **kwargs, &block)
  function.(self, *args, **kwargs, &block)
end

def pipe_with_schema(function)

Returns:
  • (LazyFrame) -

Parameters:
  • function (Object) --

Other tags:
    Note: -
def pipe_with_schema(function)
  wrapper = lambda do |lf_and_schema|
    # The last index is because we return a list for multiple inputs
    # to make `pipe_with_schemas` (plural) work, but we don't use that
    function.(
      _from_rbldf(lf_and_schema[0][0]),
      lf_and_schema[1][0]
    )._ldf
  end
  _from_rbldf(_ldf.pipe_with_schema(wrapper))
end

def pivot(

Other tags:
    Example: Using `pivot`, we can reshape so we have one row per student, with different subjects as columns, and their `test_1` scores as values: -

Other tags:
    Note: -

Returns:
  • (LazyFrame) -

Parameters:
  • separator (String) --
  • maintain_order (Boolean) --
  • aggregate_function (Object) --
  • values (Object) --
  • index (Object) --
  • on_columns (Object) --
  • on (Object) --
def pivot(
  on,
  on_columns:,
  index: nil,
  values: nil,
  aggregate_function: nil,
  maintain_order: false,
  separator: "_"
)
  if index.nil? && values.nil?
    msg = "`pivot` needs either `index or `values` needs to be specified"
    raise InvalidOperationError, msg
  end
  on_selector = Utils.parse_list_into_selector(on)
  if !values.nil?
    values_selector = Utils.parse_list_into_selector(values)
  end
  if !index.nil?
    index_selector = Utils.parse_list_into_selector(index)
  end
  if values.nil?
    values_selector = Polars.cs.all - on_selector - index_selector
  end
  if index.nil?
    index_selector = Polars.cs.all - on_selector - values_selector
  end
  agg = F.element
  if aggregate_function.is_a?(::String)
    if aggregate_function == "first"
      agg = agg.first
    elsif aggregate_function == "item"
      agg = agg.item
    elsif aggregate_function == "sum"
      agg = agg.sum
    elsif aggregate_function == "max"
      agg = agg.max
    elsif aggregate_function == "min"
      agg = agg.min
    elsif aggregate_function == "mean"
      agg = agg.mean
    elsif aggregate_function == "median"
      agg = agg.median
    elsif aggregate_function == "last"
      agg = agg.last
    elsif aggregate_function == "len"
      agg = agg.len
    elsif aggregate_function == "count"
      Utils.issue_deprecation_warning(
        "`aggregate_function='count'` input for `pivot` is deprecated." +
        " Please use `aggregate_function='len'`."
      )
      agg = agg.len
    else
      msg = "invalid input for `aggregate_function` argument: #{aggregate_function.inspect}"
      raise ArgumentError, msg
    end
  elsif aggregate_function.nil?
    agg = agg.item(allow_empty: true)
  else
    agg = aggregate_function
  end
  if on_columns.is_a?(DataFrame)
    on_cols = on_columns
  elsif on_columns.is_a?(Series)
    on_cols = on_columns.to_frame
  else
    on_cols = Series.new(on_columns).to_frame
  end
  _from_rbldf(
    _ldf.pivot(
      on_selector._rbselector,
      on_cols._df,
      index_selector._rbselector,
      values_selector._rbselector,
      agg._rbexpr,
      maintain_order,
      separator
    )
  )
end

def profile(

Returns:
  • (Array) -

Parameters:
  • optimizations (Object) --
  • engine (String) --
def profile(
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  engine = _select_engine(engine)
  ldf = _ldf.with_optimizations(optimizations._rboptflags)
  df_rb, timings_rb = ldf.profile
  [Utils.wrap_df(df_rb), Utils.wrap_df(timings_rb)]
end

def quantile(quantile, interpolation: "nearest")

Returns:
  • (LazyFrame) -

Parameters:
  • interpolation ("nearest", "higher", "lower", "midpoint", "linear") --
  • quantile (Float) --
def quantile(quantile, interpolation: "nearest")
  quantile = Utils.parse_into_expression(quantile, str_as_lit: false)
  _from_rbldf(_ldf.quantile(quantile, interpolation))
end

def remove(

Other tags:
    Example: Remove rows by comparing two columns against each other; in this case, we remove rows where the two columns are not equal (using `ne_missing` to ensure that null values compare equal): -
    Example: Provide constraints(s) using `**kwargs` syntax: -
    Example: Provide multiple constraints using `*args` syntax: -
    Example: Discard rows based on multiple conditions, combined with and/or operators: -
    Example: Remove rows matching a condition: -

Returns:
  • (LazyFrame) -

Parameters:
  • constraints (Hash) --
  • predicates (Array) --
def remove(
  *predicates,
  **constraints
)
  if constraints.empty?
    # early-exit conditions (exclude/include all rows)
    if predicates.empty? || (predicates.length == 1 && predicates[0].is_a?(TrueClass))
      return clear
    end
    if predicates.length == 1 && predicates[0].is_a?(FalseClass)
      return dup
    end
  end
  _filter(
    predicates: predicates,
    constraints: constraints,
    invert: true
  )
end

def rename(mapping, strict: true)

Returns:
  • (LazyFrame) -

Parameters:
  • strict (Boolean) --
  • mapping (Hash) --
def rename(mapping, strict: true)
  if mapping.respond_to?(:call)
    select(F.all.name.map(&mapping))
  else
    existing = mapping.keys
    _new = mapping.values
    _from_rbldf(_ldf.rename(existing, _new, strict))
  end
end

def reverse

Returns:
  • (LazyFrame) -
def reverse
  _from_rbldf(_ldf.reverse)
end

def rolling(

Returns:
  • (LazyFrame) -

Parameters:
  • group_by (Object) --
  • closed ("right", "left", "both", "none") --
  • offset (Object) --
  • period (Object) --
  • index_column (Object) --
def rolling(
  index_column:,
  period:,
  offset: nil,
  closed: "right",
  group_by: nil
)
  index_column = Utils.parse_into_expression(index_column)
  if offset.nil?
    offset = Utils.negate_duration_string(Utils.parse_as_duration_string(period))
  end
  rbexprs_by = (
    !group_by.nil? ? Utils.parse_into_list_of_expressions(group_by) : []
  )
  period = Utils.parse_as_duration_string(period)
  offset = Utils.parse_as_duration_string(offset)
  lgb = _ldf.rolling(index_column, period, offset, closed, rbexprs_by)
  LazyGroupBy.new(lgb)
end

def schema

Returns:
  • (Hash) -
def schema
  _ldf.collect_schema
end

def select(*exprs, **named_exprs)

Returns:
  • (LazyFrame) -

Parameters:
  • named_exprs (Hash) --
  • exprs (Array) --
def select(*exprs, **named_exprs)
  structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", "0") != "0"
  rbexprs = Utils.parse_into_list_of_expressions(
    *exprs, **named_exprs, __structify: structify
  )
  _from_rbldf(_ldf.select(rbexprs))
end

def select_seq(*exprs, **named_exprs)

Returns:
  • (LazyFrame) -

Parameters:
  • named_exprs (Hash) --
  • exprs (Array) --
def select_seq(*exprs, **named_exprs)
  structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", 0).to_i != 0
  rbexprs = Utils.parse_into_list_of_expressions(
    *exprs, **named_exprs, __structify: structify
  )
  _from_rbldf(_ldf.select_seq(rbexprs))
end

def serialize(file = nil, format: "binary")

Other tags:
    Example: Serialize the logical plan into a binary representation. -

Other tags:
    Note: -

Returns:
  • (Object) -

Parameters:
  • format ('binary', 'json') --
  • file (Object) --
def serialize(file = nil, format: "binary")
  if format == "binary"
    raise Todo unless _ldf.respond_to?(:serialize_binary)
    serializer = _ldf.method(:serialize_binary)
  elsif format == "json"
    msg = "'json' serialization format of LazyFrame is deprecated"
    warn msg
    serializer = _ldf.method(:serialize_json)
  else
    msg = "`format` must be one of {{'binary', 'json'}}, got #{format.inspect}"
    raise ArgumentError, msg
  end
  Utils.serialize_polars_object(serializer, file)
end

def set_sorted(

Returns:
  • (LazyFrame) -

Parameters:
  • nulls_last (Boolean) --
  • descending (Boolean) --
  • more_columns (Array) --
  • column (Object) --

Other tags:
    Note: -
def set_sorted(
  column,
  *more_columns,
  descending: false,
  nulls_last: false
)
  if !Utils.strlike?(column)
    msg = "expected a 'str' for argument 'column' in 'set_sorted'"
    raise TypeError, msg
  end
  if Utils.bool?(descending)
    ds = [descending]
  else
    ds = descending
  end
  if Utils.bool?(nulls_last)
    nl = [nulls_last]
  else
    nl = nulls_last
  end
  _from_rbldf(
    _ldf.hint_sorted(
      [column] + more_columns, ds, nl
    )
  )
end

def shift(n = 1, fill_value: nil)

Returns:
  • (LazyFrame) -

Parameters:
  • fill_value (Object) --
  • n (Integer) --
def shift(n = 1, fill_value: nil)
  if !fill_value.nil?
    fill_value = Utils.parse_into_expression(fill_value, str_as_lit: true)
  end
  n = Utils.parse_into_expression(n)
  _from_rbldf(_ldf.shift(n, fill_value))
end

def show_graph(

Returns:
  • (Object) -

Parameters:
  • optimizations (Object) --
  • plan_stage ('ir', 'physical') --
  • engine (String) --
  • raw_output (Boolean) --
  • output_path (String) --
  • show (Boolean) --
  • optimized (Boolean) --
def show_graph(
  optimized: true,
  show: true,
  output_path: nil,
  raw_output: false,
  figsize: [16.0, 12.0],
  engine: "auto",
  plan_stage: "ir",
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  engine = _select_engine(engine)
  if engine == "streaming"
    issue_unstable_warning("streaming mode is considered unstable.")
  end
  optimizations = optimizations.dup
  optimizations._rboptflags.streaming = engine == "streaming"
  _ldf = self._ldf.with_optimizations(optimizations._rboptflags)
  if plan_stage == "ir"
    dot = _ldf.to_dot(optimized)
  elsif plan_stage == "physical"
    if engine == "streaming"
      dot = _ldf.to_dot_streaming_phys(optimized)
    else
      dot = _ldf.to_dot(optimized)
    end
  else
    error_msg = "invalid plan stage '#{plan_stage}'"
    raise TypeError, error_msg
  end
  Utils.display_dot_graph(
    dot: dot,
    show: show,
    output_path: output_path,
    raw_output: raw_output
  )
end

def sink_batches(

Returns:
  • (Object) -

Parameters:
  • optimizations (Object) --
  • engine (String) --
  • lazy (Boolean) --
  • maintain_order (Boolean) --
  • chunk_size (Integer) --

Other tags:
    Note: -
    Note: -
def sink_batches(
  chunk_size: nil,
  maintain_order: true,
  lazy: false,
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS,
  &function
)
  _wrap = lambda do |rbdf|
    df = Utils.wrap_df(rbdf)
    !!function.(df)
  end
  ldf = _ldf.sink_batches(
    _wrap,
    maintain_order,
    chunk_size
  )
  if !lazy
    ldf = ldf.with_optimizations(optimizations._rboptflags)
    lf = LazyFrame._from_rbldf(ldf)
    lf.collect(engine: engine)
    return nil
  end
  LazyFrame._from_rbldf(ldf)
end

def sink_csv(

Returns:
  • (DataFrame) -

Parameters:
  • optimizations () --
  • engine () --
  • lazy (Boolean) --
  • mkdir (Boolean) --
  • sync_on_close ('data', 'all') --
  • retries (Integer) --
  • storage_options (Object) --
  • maintain_order (Boolean) --
  • quote_style ("necessary", "always", "non_numeric", "never") --
  • null_value (String) --
  • decimal_comma (Boolean) --
  • float_precision (Integer) --
  • float_scientific (Integer) --
  • time_format (String) --
  • date_format (String) --
  • datetime_format (String) --
  • batch_size (Integer) --
  • quote_char (String) --
  • line_terminator (String) --
  • separator (String) --
  • include_header (Boolean) --
  • include_bom (Boolean) --
  • path (String) --
def sink_csv(
  path,
  include_bom: false,
  compression: "uncompressed",
  compression_level: nil,
  check_extension: true,
  include_header: true,
  separator: ",",
  line_terminator: "\n",
  quote_char: '"',
  batch_size: 1024,
  datetime_format: nil,
  date_format: nil,
  time_format: nil,
  float_scientific: nil,
  float_precision: nil,
  decimal_comma: false,
  null_value: nil,
  quote_style: nil,
  maintain_order: true,
  storage_options: nil,
  credential_provider: "auto",
  retries: nil,
  sync_on_close: nil,
  mkdir: false,
  lazy: false,
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  Utils._check_arg_is_1byte("separator", separator, false)
  Utils._check_arg_is_1byte("quote_char", quote_char, false)
  engine = _select_engine(engine)
  _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder)
  credential_provider_builder = _init_credential_provider_builder.(
    credential_provider, path, storage_options, "sink_csv"
  )
  target = _to_sink_target(path)
  if !retries.nil?
    msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead."
    Utils.issue_deprecation_warning(msg)
    storage_options = storage_options || {}
    storage_options["max_retries"] = retries
  end
  sink_options = SinkOptions.new(
    mkdir: mkdir,
    maintain_order: maintain_order,
    sync_on_close: sync_on_close,
    storage_options: storage_options,
    credential_provider: credential_provider_builder
  )
  ldf_rb = _ldf.sink_csv(
    target,
    sink_options,
    include_bom,
    compression,
    compression_level,
    check_extension,
    include_header,
    separator.ord,
    line_terminator,
    quote_char.ord,
    batch_size,
    datetime_format,
    date_format,
    time_format,
    float_scientific,
    float_precision,
    decimal_comma,
    null_value,
    quote_style
  )
  if !lazy
    ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags)
    ldf = LazyFrame._from_rbldf(ldf_rb)
    ldf.collect(engine: engine)
    return nil
  end
  LazyFrame._from_rbldf(ldf_rb)
end

def sink_delta(

Other tags:
    Example: Sink a large than fits into memory dataset to a Delta Lake table. -

Returns:
  • (Object) -

Parameters:
  • optimizations (Object) --
  • delta_merge_options (Hash) --
  • delta_write_options (Hash) --
  • credential_provider (Object) --
  • storage_options (Object) --
  • mode ('error', 'append', 'overwrite', 'ignore', 'merge') --
  • target (Object) --

Other tags:
    Note: -
def sink_delta(
  target,
  mode: "error",
  storage_options: nil,
  credential_provider: "auto",
  delta_write_options: nil,
  delta_merge_options: nil,
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  Polars.send(:_check_if_delta_available)
  # TODO
  # _check_for_unsupported_types(collect_schema.dtypes)
  if Utils.pathlike?(target)
    target = Polars.send(:_resolve_delta_lake_uri, target.to_s, strict: false)
  end
  _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder)
  if !target.is_a?(DeltaLake::Table)
    credential_provider_builder = _init_credential_provider_builder.(
      credential_provider, target, storage_options, "sink_delta"
    )
  elsif !credential_provider.nil? && credential_provider != "auto"
    msg = "cannot use credential_provider when passing a DeltaTable object"
    raise ArgumentError, msg
  else
    credential_provider_builder = nil
  end
  credential_provider_creds = {}
  if credential_provider_builder
    raise Todo
  end
  # We aren't calling into polars-native write functions so we just update
  # the storage_options here.
  storage_options =
    if !storage_options.nil? || !credential_provider_builder.nil?
      (storage_options || {}).merge(credential_provider_creds)
    else
      nil
    end
  stream = collect_batches(
    engine: "streaming",
    maintain_order: true,
    chunk_size: nil,
    lazy: true,
    optimizations: optimizations
  )
  if mode == "merge"
    if delta_merge_options.nil?
      msg = "you need to pass delta_merge_options with at least a given predicate for `MERGE` to work."
      raise ArgumentError, msg
    end
    if target.is_a?(::String)
      dt = DeltaLake::Table.new(target, storage_options: storage_options)
    else
      dt = target
    end
    dt.merge(stream, **delta_merge_options)
  else
    if delta_write_options.nil?
      delta_write_options = {}
    end
    DeltaLake.write(
      target,
      stream,
      mode: mode,
      storage_options: storage_options,
      **delta_write_options
    )
    nil
  end
end

def sink_ipc(

Returns:
  • (DataFrame) -

Parameters:
  • optimizations () --
  • engine () --
  • lazy (Boolean) --
  • mkdir (Boolean) --
  • sync_on_close ('data', 'all') --
  • retries (Integer) --
  • storage_options (String) --
  • maintain_order (Boolean) --
  • compression ("lz4", "zstd") --
  • path (String) --
def sink_ipc(
  path,
  compression: "uncompressed",
  compat_level: nil,
  record_batch_size: nil,
  maintain_order: true,
  storage_options: nil,
  credential_provider: "auto",
  retries: nil,
  sync_on_close: nil,
  mkdir: false,
  lazy: false,
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS,
  _record_batch_statistics: false
)
  engine = _select_engine(engine)
  _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder)
  if !retries.nil?
    msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead."
    Utils.issue_deprecation_warning(msg)
    storage_options = storage_options || {}
    storage_options["max_retries"] = retries
  end
  credential_provider_builder = _init_credential_provider_builder.(
    credential_provider, path, storage_options, "sink_ipc"
  )
  target = _to_sink_target(path)
  compat_level_rb = nil
  if compat_level.nil?
    compat_level_rb = true
  else
    raise Todo
  end
  if compression.nil?
    compression = "uncompressed"
  end
  sink_options = SinkOptions.new(
    mkdir: mkdir,
    maintain_order: maintain_order,
    sync_on_close: sync_on_close,
    storage_options: storage_options,
    credential_provider: credential_provider_builder
  )
  ldf_rb = _ldf.sink_ipc(
    target,
    sink_options,
    compression,
    compat_level_rb,
    record_batch_size,
    _record_batch_statistics
  )
  if !lazy
    ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags)
    ldf = LazyFrame._from_rbldf(ldf_rb)
    ldf.collect(engine: engine)
    return nil
  end
  LazyFrame._from_rbldf(ldf_rb)
end

def sink_ndjson(

Returns:
  • (DataFrame) -

Parameters:
  • optimizations () --
  • engine () --
  • lazy (Boolean) --
  • mkdir (Boolean) --
  • sync_on_close ('data', 'all') --
  • retries (Integer) --
  • storage_options (String) --
  • maintain_order (Boolean) --
  • path (String) --
def sink_ndjson(
  path,
  compression: "uncompressed",
  compression_level: nil,
  check_extension: true,
  maintain_order: true,
  storage_options: nil,
  credential_provider: "auto",
  retries: nil,
  sync_on_close: nil,
  mkdir: false,
  lazy: false,
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  engine = _select_engine(engine)
  if !retries.nil?
    msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead."
    Utils.issue_deprecation_warning(msg)
    storage_options = storage_options || {}
    storage_options["max_retries"] = retries
  end
  _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder)
  credential_provider_builder = _init_credential_provider_builder.(
    credential_provider, path, storage_options, "sink_ndjson"
  )
  target = _to_sink_target(path)
  sink_options = SinkOptions.new(
    mkdir: mkdir,
    maintain_order: maintain_order,
    sync_on_close: sync_on_close,
    storage_options: storage_options,
    credential_provider: credential_provider_builder
  )
  ldf_rb = _ldf.sink_ndjson(
    target,
    compression,
    compression_level,
    check_extension,
    sink_options
  )
  if !lazy
    ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags)
    ldf = LazyFrame._from_rbldf(ldf_rb)
    ldf.collect(engine: engine)
    return nil
  end
  LazyFrame._from_rbldf(ldf_rb)
end

def sink_parquet(

Returns:
  • (DataFrame) -

Parameters:
  • optimizations () --
  • engine () --
  • lazy (Boolean) --
  • mkdir (Boolean) --
  • sync_on_close ('data', 'all') --
  • retries (Integer) --
  • storage_options (String) --
  • maintain_order (Boolean) --
  • data_page_size (Integer) --
  • row_group_size (Integer) --
  • statistics (Boolean) --
  • compression_level (Integer) --
  • compression ("lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd") --
  • path (String) --
def sink_parquet(
  path,
  compression: "zstd",
  compression_level: nil,
  statistics: true,
  row_group_size: nil,
  data_page_size: nil,
  maintain_order: true,
  storage_options: nil,
  credential_provider: "auto",
  retries: nil,
  sync_on_close: nil,
  metadata: nil,
  mkdir: false,
  lazy: false,
  arrow_schema: nil,
  engine: "auto",
  optimizations: DEFAULT_QUERY_OPT_FLAGS
)
  engine = _select_engine(engine)
  if statistics == true
    statistics = {
      min: true,
      max: true,
      distinct_count: false,
      null_count: true
    }
  elsif statistics == false
    statistics = {}
  elsif statistics == "full"
    statistics = {
      min: true,
      max: true,
      distinct_count: true,
      null_count: true
    }
  end
  _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder)
  if !retries.nil?
    msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead."
    Utils.issue_deprecation_warning(msg)
    storage_options = storage_options || {}
    storage_options["max_retries"] = retries
  end
  credential_provider_builder = _init_credential_provider_builder.(
    credential_provider, path, storage_options, "sink_parquet"
  )
  target = _to_sink_target(path)
  sink_options = SinkOptions.new(
    mkdir: mkdir,
    maintain_order: maintain_order,
    sync_on_close: sync_on_close,
    storage_options: storage_options,
    credential_provider: credential_provider_builder
  )
  ldf_rb = _ldf.sink_parquet(
    target,
    sink_options,
    compression,
    compression_level,
    statistics,
    row_group_size,
    data_page_size,
    metadata,
    arrow_schema
  )
  if !lazy
    ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags)
    ldf = LazyFrame._from_rbldf(ldf_rb)
    ldf.collect(engine: engine)
    return nil
  end
  LazyFrame._from_rbldf(ldf_rb)
end

def slice(offset, length = nil)

Returns:
  • (LazyFrame) -

Parameters:
  • length (Integer) --
  • offset (Integer) --
def slice(offset, length = nil)
  if length && length < 0
    raise ArgumentError, "Negative slice lengths (#{length}) are invalid for LazyFrame"
  end
  _from_rbldf(_ldf.slice(offset, length))
end

def sort(by, *more_by, descending: false, nulls_last: false, maintain_order: false, multithreaded: true)

Returns:
  • (LazyFrame) -

Parameters:
  • multithreaded (Boolean) --
  • maintain_order (Boolean) --
  • nulls_last (Boolean) --
  • descending (Boolean) --
  • more_by (Array) --
  • by (Object) --
def sort(by, *more_by, descending: false, nulls_last: false, maintain_order: false, multithreaded: true)
  if by.is_a?(::String) && more_by.empty?
    return _from_rbldf(
      _ldf.sort(
        by, descending, nulls_last, maintain_order, multithreaded
      )
    )
  end
  by = Utils.parse_into_list_of_expressions(by, *more_by)
  descending = Utils.extend_bool(descending, by.length, "descending", "by")
  nulls_last = Utils.extend_bool(nulls_last, by.length, "nulls_last", "by")
  _from_rbldf(
    _ldf.sort_by_exprs(
      by, descending, nulls_last, maintain_order, multithreaded
    )
  )
end

def sql(query, table_name: "self")

Other tags:
    Example: Apply SQL transforms (aliasing "self" to "frame") then filter natively (you can freely mix SQL and native operations): -
    Example: Query the LazyFrame using SQL: -

Other tags:
    Note: -
    Note: -

Returns:
  • (Expr) -

Parameters:
  • table_name (String) --
  • query (String) --
def sql(query, table_name: "self")
  ctx = Polars::SQLContext.new
  name = table_name || "self"
  ctx.register(name, self)
  ctx.execute(query)
end

def std(ddof: 1)

Returns:
  • (LazyFrame) -
def std(ddof: 1)
  _from_rbldf(_ldf.std(ddof))
end

def sum

Returns:
  • (LazyFrame) -
def sum
  _from_rbldf(_ldf.sum)
end

def tail(n = 5)

Returns:
  • (LazyFrame) -

Parameters:
  • n (Integer) --
def tail(n = 5)
  _from_rbldf(_ldf.tail(n))
end

def to_s

Returns:
  • (String) -
def to_s
  <<~EOS
    naive plan: (run LazyFrame#explain(optimized: true) to see the optimized plan)
    #{explain(optimized: false)}
  EOS
end

def top_k(

Other tags:
    Example: Get the rows which contain the 4 largest values when sorting on column b and a. -
    Example: Get the rows which contain the 4 largest values in column b. -

Returns:
  • (LazyFrame) -

Parameters:
  • reverse (Object) --
  • by (Object) --
  • k (Integer) --
def top_k(
  k,
  by:,
  reverse: false
)
  by = Utils.parse_into_list_of_expressions(by)
  reverse = Utils.extend_bool(reverse, by.length, "reverse", "by")
  _from_rbldf(_ldf.top_k(k, by, reverse))
end

def unique(maintain_order: false, subset: nil, keep: "any")

Returns:
  • (LazyFrame) -

Parameters:
  • keep ("first", "last") --
  • subset (Object) --
  • maintain_order (Boolean) --
def unique(maintain_order: false, subset: nil, keep: "any")
  parsed_subset = nil
  if !subset.nil?
    parsed_subset = Utils.parse_into_list_of_expressions(subset, __require_selectors: true)
  end
  _from_rbldf(_ldf.unique(maintain_order, parsed_subset, keep))
end

def unnest(columns, *more_columns, separator: nil)

Returns:
  • (LazyFrame) -

Parameters:
  • separator (String) --
  • more_columns (Array) --
  • columns (Object) --
def unnest(columns, *more_columns, separator: nil)
  subset = Utils.parse_list_into_selector(columns) | Utils.parse_list_into_selector(
    more_columns
  )
  _from_rbldf(_ldf.unnest(subset._rbselector, separator))
end

def unpivot(

Returns:
  • (LazyFrame) -

Parameters:
  • streamable (Boolean) --
  • value_name (String) --
  • variable_name (String) --
  • index (Object) --
  • on (Object) --
def unpivot(
  on = nil,
  index: nil,
  variable_name: nil,
  value_name: nil,
  streamable: true
)
  if !streamable
    warn "The `streamable` parameter for `LazyFrame.unpivot` is deprecated"
  end
  selector_on = on.nil? ? Selectors.empty : Utils.parse_list_into_selector(on)
  selector_index = index.nil? ? Selectors.empty : Utils.parse_list_into_selector(index)
  _from_rbldf(
    _ldf.unpivot(
      selector_on._rbselector,
      selector_index._rbselector,
      value_name,
      variable_name
    )
  )
end

def update(

Other tags:
    Example: Update `df` values including null values in `new_df`, using a full outer join strategy that defines explicit join columns in each frame: -
    Example: Update `df` values with the non-null values in `new_df`, using a full outer join strategy that defines explicit join columns in each frame: -
    Example: Update `df` values with the non-null values in `new_df`, by row index, but only keeping those rows that are common to both frames: -
    Example: Update `df` values with the non-null values in `new_df`, by row index: -

Other tags:
    Note: -
    Note: -

Returns:
  • (LazyFrame) -

Parameters:
  • maintain_order ('none', 'left', 'right', 'left_right', 'right_left') --
  • include_nulls (Boolean) --
  • right_on (Object) --
  • left_on (Object) --
  • how ('left', 'inner', 'full') --
  • on (Object) --
  • other (LazyFrame) --
def update(
  other,
  on: nil,
  how: "left",
  left_on: nil,
  right_on: nil,
  include_nulls: false,
  maintain_order: "left"
)
  Utils.require_same_type(self, other)
  if ["outer", "outer_coalesce"].include?(how)
    how = "full"
  end
  if !["left", "inner", "full"].include?(how)
    msg = "`how` must be one of {{'left', 'inner', 'full'}}; found #{how.inspect}"
    raise ArgumentError, msg
  end
  slf = self
  row_index_used = false
  if on.nil?
    if left_on.nil? && right_on.nil?
      # no keys provided--use row index
      row_index_used = true
      row_index_name = "__POLARS_ROW_INDEX"
      slf = slf.with_row_index(name: row_index_name)
      other = other.with_row_index(name: row_index_name)
      left_on = right_on = [row_index_name]
    else
      # one of left or right is missing, raise error
      if left_on.nil?
        msg = "missing join columns for left frame"
        raise ArgumentError, msg
      end
      if right_on.nil?
        msg = "missing join columns for right frame"
        raise ArgumentError, msg
      end
    end
  else
    # move on into left/right_on to simplify logic
    left_on = right_on = on
  end
  if left_on.is_a?(::String)
    left_on = [left_on]
  end
  if right_on.is_a?(::String)
    right_on = [right_on]
  end
  left_schema = slf.collect_schema
  left_on.each do |name|
    if !left_schema.include?(name)
      msg = "left join column #{name.inspect} not found"
      raise ArgumentError, msg
    end
  end
  right_schema = other.collect_schema
  right_on.each do |name|
    if !right_schema.include?(name)
      msg = "right join column #{name.inspect} not found"
      raise ArgumentError, msg
    end
  end
  # no need to join if *only* join columns are in other (inner/left update only)
  if how != "full" && right_schema.length == right_on.length
    if row_index_used
      return slf.drop(row_index_name)
    end
    return slf
  end
  # only use non-idx right columns present in left frame
  right_other = Set.new(right_schema.to_h.keys).intersection(left_schema.to_h.keys) - Set.new(right_on)
  # When include_nulls is true, we need to distinguish records after the join that
  # were originally null in the right frame, as opposed to records that were null
  # because the key was missing from the right frame.
  # Add a validity column to track whether row was matched or not.
  if include_nulls
    validity = ["__POLARS_VALIDITY"]
    other = other.with_columns(F.lit(true).alias(validity[0]))
  else
    validity = []
  end
  tmp_name = "__POLARS_RIGHT"
  drop_columns = right_other.map { |name| "#{name}#{tmp_name}" } + validity
  result = (
    slf.join(
      other.select(*right_on, *right_other, *validity),
      left_on: left_on,
      right_on: right_on,
      how: how,
      suffix: tmp_name,
      coalesce: true,
      maintain_order: maintain_order
    )
    .with_columns(
      right_other.map do |name|
        (
          if include_nulls
            # use left value only when right value failed to join
            F.when(F.col(validity).is_null)
            .then(F.col(name))
            .otherwise(F.col("#{name}#{tmp_name}"))
          else
            F.coalesce(["#{name}#{tmp_name}", F.col(name)])
          end
        ).alias(name)
      end
    )
    .drop(drop_columns)
  )
  if row_index_used
    result = result.drop(row_index_name)
  end
  _from_rbldf(result._ldf)
end

def var(ddof: 1)

Returns:
  • (LazyFrame) -
def var(ddof: 1)
  _from_rbldf(_ldf.var(ddof))
end

def width

Returns:
  • (Integer) -
def width
  _ldf.collect_schema.length
end

def with_columns(*exprs, **named_exprs)

Returns:
  • (LazyFrame) -

Parameters:
  • named_exprs (Hash) --
  • exprs (Object) --
def with_columns(*exprs, **named_exprs)
  structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", "0") != "0"
  rbexprs = Utils.parse_into_list_of_expressions(*exprs, **named_exprs, __structify: structify)
  _from_rbldf(_ldf.with_columns(rbexprs))
end

def with_columns_seq(

Returns:
  • (LazyFrame) -

Parameters:
  • named_exprs (Hash) --
  • exprs (Array) --
def with_columns_seq(
  *exprs,
  **named_exprs
)
  structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", 0).to_i != 0
  rbexprs = Utils.parse_into_list_of_expressions(
    *exprs, **named_exprs, __structify: structify
  )
  _from_rbldf(_ldf.with_columns_seq(rbexprs))
end

def with_row_index(name: "index", offset: 0)

Other tags:
    Note: -

Returns:
  • (LazyFrame) -

Parameters:
  • offset (Integer) --
  • name (String) --
def with_row_index(name: "index", offset: 0)
  _from_rbldf(_ldf.with_row_index(name, offset))
end