class Polars::LazyFrame
Representation of a Lazy computation graph/query against a DataFrame.
def self._from_rbldf(rb_ldf)
- Private: -
def self._from_rbldf(rb_ldf) ldf = LazyFrame.allocate ldf._ldf = rb_ldf ldf end
def self._select_engine(engine)
- Private: -
def self._select_engine(engine) engine == "auto" ? Plr.get_engine_affinity : engine end
def self.deserialize(source, format: "binary")
- Note: -
Note: -
Returns:
-
(LazyFrame)-
Parameters:
-
format('binary', 'json') -- -
source(Object) --
def self.deserialize(source, format: "binary") if Utils.pathlike?(source) source = Utils.normalize_filepath(source) end if format == "binary" raise Todo unless RbLazyFrame.respond_to?(:deserialize_binary) deserializer = RbLazyFrame.method(:deserialize_binary) elsif format == "json" deserializer = RbLazyFrame.method(:deserialize_json) else msg = "`format` must be one of {{'binary', 'json'}}, got #{format.inspect}" raise ArgumentError, msg end _from_rbldf(deserializer.(source)) end
def _filter(
def _filter( predicates:, constraints:, invert: false ) all_predicates = [] boolean_masks = [] predicates.each do |p| # quick exit/skip conditions if (p.is_a?(FalseClass) && invert) || (p.is_a?(TrueClass) && !invert) next # ignore; doesn't filter/remove anything end if (p.is_a?(TrueClass) && invert) || (p.is_a?(FalseClass) && !invert) return clear # discard all rows end # note: identify masks separately from predicates if Utils.is_bool_sequence(p, include_series: true) boolean_masks << Polars::Series.new(p, dtype: Boolean) elsif ( (is_seq = Utils.is_sequence(p)) && p.any? { |x| !x.is_a?(Expr) }) || (!is_seq && !p.is_a?(Expr) && !(p.is_a?(::String) && collect_schema.include?(p)) ) err = p.is_a?(Series) ? "Series(…, dtype: #{p.dtype})" : p.inspect msg = "invalid predicate for `filter`: #{err}" raise TypeError, msg else all_predicates.concat( Utils.parse_into_list_of_expressions(p).map { |x| Utils.wrap_expr(x) } ) end end # unpack equality constraints from kwargs all_predicates.concat( constraints.map { |name, value| F.col(name).eq(value) } ) if !(all_predicates.any? || boolean_masks.any?) msg = "at least one predicate or constraint must be provided" raise TypeError, msg end # if multiple predicates, combine as 'horizontal' expression combined_predicate = if all_predicates.any? if all_predicates.length > 1 F.all_horizontal(*all_predicates) else all_predicates[0] end else nil end # apply reduced boolean mask first, if applicable, then predicates if boolean_masks.any? if boolean_masks.length > 1 raise Todo end mask_expr = F.lit(boolean_masks[0]) combined_predicate = if combined_predicate.nil? mask_expr else mask_expr & combined_predicate end end if combined_predicate.nil? return _from_rbldf(_ldf) end filter_method = invert ? _ldf.method(:remove) : _ldf.method(:filter) _from_rbldf(filter_method.(combined_predicate._rbexpr)) end
def _from_rbldf(rb_ldf)
def _from_rbldf(rb_ldf) self.class._from_rbldf(rb_ldf) end
def _select_engine(engine)
def _select_engine(engine) self.class._select_engine(engine) end
def _to_sink_target(path)
def _to_sink_target(path) if Utils.pathlike?(path) Utils.normalize_filepath(path) else path end end
def bottom_k(
- Example: Get the rows which contain the 4 smallest values when sorting on column a and b. -
Example: Get the rows which contain the 4 smallest values in column b. -
Returns:
-
(LazyFrame)-
Parameters:
-
reverse(Object) -- -
by(Object) -- -
k(Integer) --
def bottom_k( k, by:, reverse: false ) by = Utils.parse_into_list_of_expressions(by) reverse = Utils.extend_bool(reverse, by.length, "reverse", "by") _from_rbldf(_ldf.bottom_k(k, by, reverse)) end
def cache
-
(LazyFrame)-
def cache _from_rbldf(_ldf.cache) end
def cast(dtypes, strict: true)
- Example: Cast all frame columns to the specified dtype: -
Example: Cast all frame columns matching one dtype (or dtype group) to another dtype: -
Example: Cast specific frame columns to the specified dtypes: -
Returns:
-
(LazyFrame)-
Parameters:
-
strict(Boolean) -- -
dtypes(Hash) --
def cast(dtypes, strict: true) if !dtypes.is_a?(Hash) return _from_rbldf(_ldf.cast_all(dtypes, strict)) end cast_map = {} dtypes.each do |c, dtype| dtype = Utils.parse_into_dtype(dtype) cast_map.merge!( c.is_a?(::String) ? {c => dtype} : Utils.expand_selector(self, c).to_h { |x| [x, dtype] } ) end _from_rbldf(_ldf.cast(cast_map, strict)) end
def clear(n = 0)
-
(LazyFrame)-
def clear(n = 0) DataFrame.new(schema: schema).clear(n).lazy end
def collect(
-
(DataFrame)-
Parameters:
-
optimizations() -- -
background(Boolean) -- -
engine() --
def collect( engine: "auto", background: false, optimizations: DEFAULT_QUERY_OPT_FLAGS ) engine = _select_engine(engine) if engine == "streaming" Utils.issue_unstable_warning("streaming mode is considered unstable.") end ldf = _ldf.with_optimizations(optimizations._rboptflags) if background Utils.issue_unstable_warning("background mode is considered unstable.") return InProcessQuery.new(ldf.collect_concurrently) end Utils.wrap_df(ldf.collect(engine)) end
def collect_batches(
-
(Object)-
Parameters:
-
optimizations(Object) -- -
engine(String) -- -
lazy(Boolean) -- -
maintain_order(Boolean) -- -
chunk_size(Integer) --
Other tags:
- Note: -
Note: -
def collect_batches( chunk_size: nil, maintain_order: true, lazy: false, engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS ) ldf = _ldf.with_optimizations(optimizations._rboptflags) inner = ldf.collect_batches( engine, maintain_order, chunk_size, lazy ) CollectBatches.new(inner) end
def collect_schema
- Example: Access various properties of the schema. -
Example: Determine the schema. -
Returns:
-
(Schema)-
def collect_schema Schema.new(_ldf.collect_schema, check_dtypes: false) end
def columns
-
(Array)-
def columns _ldf.collect_schema.keys end
def count
-
(LazyFrame)-
def count _from_rbldf(_ldf.count) end
def describe(
- Example: Customize which percentiles are displayed, applying linear interpolation: -
Example: Show default frame statistics: -
Other tags:
- Note: -
Note: -
Note: -
Returns:
-
(DataFrame)-
Parameters:
-
interpolation('nearest', 'higher', 'lower', 'midpoint', 'linear', 'equiprobable') -- -
percentiles(Array) --
def describe( percentiles: [0.25, 0.5, 0.75], interpolation: "nearest" ) schema = collect_schema.to_h if schema.empty? msg = "cannot describe a LazyFrame that has no columns" raise TypeError, msg end # create list of metrics metrics = ["count", "null_count", "mean", "std", "min"] if (quantiles = Utils.parse_percentiles(percentiles)).any? metrics.concat(quantiles.map { |q| "%g%%" % [q * 100] }) end metrics.append("max") skip_minmax = lambda do |dt| dt.nested? || [Categorical, Enum, Null, Object, Unknown].include?(dt) end # determine which columns will produce std/mean/percentile/etc # statistics in a single pass over the frame schema has_numeric_result, sort_cols = Set.new, Set.new metric_exprs = [] null = F.lit(nil) schema.each do |c, dtype| is_numeric = dtype.numeric? is_temporal = !is_numeric && dtype.temporal? # counts count_exprs = [ F.col(c).count.name.prefix("count:"), F.col(c).null_count.name.prefix("null_count:") ] # mean mean_expr = if is_temporal || is_numeric || dtype == Boolean F.col(c).mean else null end # standard deviation, min, max expr_std = is_numeric ? F.col(c).std : null min_expr = !skip_minmax.(dtype) ? F.col(c).min : null max_expr = !skip_minmax.(dtype) ? F.col(c).max : null # percentiles pct_exprs = [] quantiles.each do |p| if is_numeric || is_temporal pct_expr = if is_temporal F.col(c).to_physical.quantile(p, interpolation: interpolation).cast(dtype) else F.col(c).quantile(p, interpolation: interpolation) end sort_cols.add(c) else pct_expr = null end pct_exprs << pct_expr.alias("#{p}:#{c}") end if is_numeric || dtype.nested? || [Null, Boolean].include?(dtype) has_numeric_result.add(c) end # add column expressions (in end-state 'metrics' list order) metric_exprs.concat( [ *count_exprs, mean_expr.alias("mean:#{c}"), expr_std.alias("std:#{c}"), min_expr.alias("min:#{c}"), *pct_exprs, max_expr.alias("max:#{c}") ] ) end # calculate requested metrics in parallel, then collect the result df_metrics = ( ( # if more than one quantile, sort the relevant columns to make them O(1) # TODO: drop sort once we have efficient retrieval of multiple quantiles sort_cols ? with_columns(sort_cols.map { |c| F.col(c).sort }) : self ) .select(*metric_exprs) .collect ) # reshape wide result n_metrics = metrics.length column_metrics = schema.length.times.map do |n| df_metrics.row(0)[(n * n_metrics)...((n + 1) * n_metrics)] end summary = schema.keys.zip(column_metrics).to_h # cast by column type (numeric/bool -> float), (other -> string) schema.each_key do |c| summary[c] = summary[c].map do |v| if v.nil? || v.is_a?(Hash) nil else if has_numeric_result.include?(c) if v == true 1.0 elsif v == false 0.0 else v.to_f end else "#{v}" end end end end # return results as a DataFrame df_summary = Polars.from_hash(summary) df_summary.insert_column(0, Polars::Series.new("statistic", metrics)) df_summary end
def drop(*columns, strict: true)
- Example: Use positional arguments to drop multiple columns. -
Example: Drop multiple columns by passing a selector. -
Example: Drop a single column by passing the name of that column. -
Returns:
-
(LazyFrame)-
Parameters:
-
strict(Boolean) -- -
columns(Object) --
def drop(*columns, strict: true) selectors = [] columns.each do |c| if c.is_a?(Enumerable) selectors += c else selectors += [c] end end drop_cols = Utils.parse_list_into_selector(selectors, strict: strict) _from_rbldf(_ldf.drop(drop_cols._rbselector)) end
def drop_nans(subset: nil)
-
(LazyFrame)-
Parameters:
-
subset(Object) --
def drop_nans(subset: nil) selector_subset = nil if !subset.nil? selector_subset = Utils.parse_list_into_selector(subset)._rbselector end _from_rbldf(_ldf.drop_nans(selector_subset)) end
def drop_nulls(subset: nil)
-
(LazyFrame)-
Parameters:
-
subset(Object) --
def drop_nulls(subset: nil) selector_subset = nil if !subset.nil? selector_subset = Utils.parse_list_into_selector(subset)._rbselector end _from_rbldf(_ldf.drop_nulls(selector_subset)) end
def dtypes
-
(Array)-
def dtypes _ldf.collect_schema.values end
def explain(
-
(String)-
def explain( format: "plain", optimized: true, engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS ) engine = _select_engine(engine) if engine == "streaming" Utils.issue_unstable_warning("streaming mode is considered unstable.") end if optimized ldf = _ldf.with_optimizations(optimizations._rboptflags) if format == "tree" return ldf.describe_optimized_plan_tree else return ldf.describe_optimized_plan end end if format == "tree" _ldf.describe_plan_tree else _ldf.describe_plan end end
def explode(
-
(LazyFrame)-
Parameters:
-
keep_nulls(Boolean) -- -
empty_as_null(Boolean) -- -
more_columns(Array) -- -
columns(Object) --
def explode( columns, *more_columns, empty_as_null: true, keep_nulls: true ) subset = Utils.parse_list_into_selector(columns) | Utils.parse_list_into_selector( more_columns ) _from_rbldf(_ldf.explode(subset._rbselector, empty_as_null, keep_nulls)) end
def fill_nan(value)
- Note: -
Returns:
-
(LazyFrame)-
Parameters:
-
value(Object) --
def fill_nan(value) if !value.is_a?(Expr) value = F.lit(value) end _from_rbldf(_ldf.fill_nan(value._rbexpr)) end
def fill_null(value = nil, strategy: nil, limit: nil, matches_supertype: true)
-
(LazyFrame)-
def fill_null(value = nil, strategy: nil, limit: nil, matches_supertype: true) if !value.nil? if value.is_a?(Expr) dtypes = nil elsif value.is_a?(TrueClass) || value.is_a?(FalseClass) dtypes = [Boolean] elsif matches_supertype && (value.is_a?(Integer) || value.is_a?(Float)) dtypes = [ Int8, Int16, Int32, Int64, Int128, UInt8, UInt16, UInt32, UInt64, Float32, Float64, Decimal.new ] elsif value.is_a?(Integer) dtypes = [Int64] elsif value.is_a?(Float) dtypes = [Float64] elsif value.is_a?(::Date) dtypes = [Date] elsif value.is_a?(::String) dtypes = [String, Categorical] else # fallback; anything not explicitly handled above dtypes = nil end if dtypes return with_columns( F.col(dtypes).fill_null(value, strategy: strategy, limit: limit) ) end end select(Polars.all.fill_null(value, strategy: strategy, limit: limit)) end
def filter(*predicates, **constraints)
- Example: Filter by comparing two columns against each other -
Example: Filter on an OR condition: -
Example: Provide multiple filters using `**kwargs` syntax: -
Example: Provide multiple filters using `*args` syntax: -
Example: Filter on multiple conditions: -
Example: Filter on one condition: -
Returns:
-
(LazyFrame)-
Parameters:
-
constraints(Hash) -- -
predicates(Array) --
def filter(*predicates, **constraints) if constraints.empty? # early-exit conditions (exclude/include all rows) if predicates.empty? || (predicates.length == 1 && predicates[0].is_a?(TrueClass)) return dup end if predicates.length == 1 && predicates[0].is_a?(FalseClass) return clear end end _filter( predicates: predicates, constraints: constraints, invert: false ) end
def first
-
(LazyFrame)-
def first slice(0, 1) end
def gather_every(n, offset: 0)
-
(LazyFrame)-
Parameters:
-
offset(Integer) -- -
n(Integer) --
def gather_every(n, offset: 0) select(F.col("*").gather_every(n, offset)) end
def group_by(*by, maintain_order: false, **named_by)
-
(LazyGroupBy)-
Parameters:
-
named_by(Hash) -- -
maintain_order(Boolean) -- -
by(Array) --
def group_by(*by, maintain_order: false, **named_by) exprs = Utils.parse_into_list_of_expressions(*by, **named_by) lgb = _ldf.group_by(exprs, maintain_order) LazyGroupBy.new(lgb) end
def group_by_dynamic(
- Example: Dynamic group by on an index column. -
Example: Dynamic group bys can also be combined with grouping on normal keys. -
Example: When closed="both" the time values at the window boundaries belong to 2 groups. -
Example: When closed="left", should not include right end of interval. -
Example: The window boundaries can also be added to the aggregation result. -
Example: Group by windows of 1 hour starting at 2021-12-16 00:00:00. -
Returns:
-
(DataFrame)-
Parameters:
-
start_by('window', 'datapoint', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday') -- -
group_by(Object) -- -
label('left', 'right', 'datapoint') -- -
closed("right", "left", "both", "none") -- -
include_boundaries(Boolean) -- -
offset(Object) -- -
period(Object) -- -
every(Object) -- -
index_column(Object) --
def group_by_dynamic( index_column, every:, period: nil, offset: nil, include_boundaries: false, closed: "left", label: "left", group_by: nil, start_by: "window" ) index_column = Utils.parse_into_expression(index_column, str_as_lit: false) if offset.nil? offset = period.nil? ? "-#{every}" : "0ns" end if period.nil? period = every end period = Utils.parse_as_duration_string(period) offset = Utils.parse_as_duration_string(offset) every = Utils.parse_as_duration_string(every) rbexprs_by = group_by.nil? ? [] : Utils.parse_into_list_of_expressions(group_by) lgb = _ldf.group_by_dynamic( index_column, every, period, offset, label, include_boundaries, closed, rbexprs_by, start_by ) LazyGroupBy.new(lgb) end
def head(n = 5)
-
(LazyFrame)-
Parameters:
-
n(Integer) --
def head(n = 5) slice(0, n) end
def include?(key)
-
(Boolean)-
def include?(key) columns.include?(key) end
def initialize(
def initialize( data = nil, schema: nil, schema_overrides: nil, strict: true, orient: nil, infer_schema_length: N_INFER_DEFAULT, nan_to_null: false, height: nil ) self._ldf = ( DataFrame.new( data, schema: schema, schema_overrides: schema_overrides, strict: strict, orient: orient, infer_schema_length: infer_schema_length, nan_to_null: nan_to_null, height: height ) .lazy ._ldf ) end
def initialize_copy(other)
def initialize_copy(other) super self._ldf = _ldf._clone end
def interpolate
-
(LazyFrame)-
def interpolate select(F.col("*").interpolate) end
def join(
-
(LazyFrame)-
Parameters:
-
maintain_order('none', 'left', 'right', 'left_right', 'right_left') -- -
coalesce(Boolean) -- -
force_parallel(Boolean) -- -
allow_parallel(Boolean) -- -
nulls_equal(Boolean) -- -
validate('m:m', 'm:1', '1:m', '1:1') -- -
suffix(String) -- -
how("inner", "left", "full", "semi", "anti", "cross") -- -
on() -- Object -
right_on(Object) -- -
left_on(Object) -- -
other(LazyFrame) --
def join( other, left_on: nil, right_on: nil, on: nil, how: "inner", suffix: "_right", validate: "m:m", nulls_equal: false, allow_parallel: true, force_parallel: false, coalesce: nil, maintain_order: nil ) if !other.is_a?(LazyFrame) raise ArgumentError, "Expected a `LazyFrame` as join table, got #{other.class.name}" end if maintain_order.nil? maintain_order = "none" end if how == "outer" how = "full" elsif how == "cross" return _from_rbldf( _ldf.join( other._ldf, [], [], allow_parallel, nulls_equal, force_parallel, how, suffix, validate, maintain_order, coalesce ) ) end if !on.nil? rbexprs = Utils.parse_into_list_of_expressions(on) rbexprs_left = rbexprs rbexprs_right = rbexprs elsif !left_on.nil? && !right_on.nil? rbexprs_left = Utils.parse_into_list_of_expressions(left_on) rbexprs_right = Utils.parse_into_list_of_expressions(right_on) else raise ArgumentError, "must specify `on` OR `left_on` and `right_on`" end _from_rbldf( self._ldf.join( other._ldf, rbexprs_left, rbexprs_right, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce ) ) end
def join_asof(
- Example: If we instead use `strategy: "forward"`, then each date from `population` which doesn't have an exact match is matched with the closest later date from `gdp`: -
Example: Note how the dates don't quite match. If we join them using `join_asof` and `strategy: "backward"`, then each date from `population` which doesn't have an exact match is matched with the closest earlier date from `gdp`: -
Returns:
-
(LazyFrame)-
Parameters:
-
check_sortedness(Boolean) -- -
allow_exact_matches(Boolean) -- -
coalesce(Boolean) -- -
force_parallel(Boolean) -- -
allow_parallel(Boolean) -- -
tolerance(Object) -- -
suffix(String) -- -
strategy("backward", "forward") -- -
by(Object) -- -
by_right(Object) -- -
by_left(Object) -- -
on(String) -- -
right_on(String) -- -
left_on(String) -- -
other(LazyFrame) --
def join_asof( other, left_on: nil, right_on: nil, on: nil, by_left: nil, by_right: nil, by: nil, strategy: "backward", suffix: "_right", tolerance: nil, allow_parallel: true, force_parallel: false, coalesce: true, allow_exact_matches: true, check_sortedness: true ) if !other.is_a?(LazyFrame) raise ArgumentError, "Expected a `LazyFrame` as join table, got #{other.class.name}" end if on.is_a?(::String) left_on = on right_on = on end if left_on.nil? || right_on.nil? raise ArgumentError, "You should pass the column to join on as an argument." end if by_left.is_a?(::String) || by_left.is_a?(Expr) by_left_ = [by_left] else by_left_ = by_left end if by_right.is_a?(::String) || by_right.is_a?(Expr) by_right_ = [by_right] else by_right_ = by_right end if by.is_a?(::String) by_left_ = [by] by_right_ = [by] elsif by.is_a?(::Array) by_left_ = by by_right_ = by end tolerance_str = nil tolerance_num = nil if tolerance.is_a?(::String) tolerance_str = tolerance else tolerance_num = tolerance end _from_rbldf( _ldf.join_asof( other._ldf, Polars.col(left_on)._rbexpr, Polars.col(right_on)._rbexpr, by_left_, by_right_, allow_parallel, force_parallel, suffix, strategy, tolerance_num, tolerance_str, coalesce, allow_exact_matches, check_sortedness ) ) end
def join_where(
- Example: To OR them together, use a single expression and the `|` operator. -
Example: Join two lazyframes together based on two predicates which get AND-ed together. -
Returns:
-
(LazyFrame)-
Parameters:
-
suffix(String) -- -
predicates(Object) -- -
other(Object) --
Other tags:
- Note: -
Note: -
def join_where( other, *predicates, suffix: "_right" ) Utils.require_same_type(self, other) rbexprs = Utils.parse_into_list_of_expressions(*predicates) _from_rbldf( _ldf.join_where( other._ldf, rbexprs, suffix ) ) end
def last
-
(LazyFrame)-
def last tail(1) end
def lazy
-
(LazyFrame)-
def lazy self end
def limit(n = 5)
-
(LazyFrame)-
Parameters:
-
n(Integer) --
def limit(n = 5) head(n) end
def map_batches(
- Note: -
Note: -
Note: -
Returns:
-
(LazyFrame)-
Parameters:
-
streamable(Boolean) -- -
validate_output_schema(Boolean) -- -
schema(Object) -- -
no_optimizations(Boolean) -- -
slice_pushdown(Boolean) -- -
projection_pushdown(Boolean) -- -
predicate_pushdown(Boolean) --
def map_batches( predicate_pushdown: true, projection_pushdown: true, slice_pushdown: true, no_optimizations: false, schema: nil, validate_output_schema: true, streamable: false, &function ) raise Todo if !schema.nil? if no_optimizations predicate_pushdown = false projection_pushdown = false slice_pushdown = false end _from_rbldf( _ldf.map_batches( function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output_schema, ) ) end
def match_to_schema(
- Example: Upcasting integers and floats -
Example: Removing extra columns -
Example: Adding missing columns -
Example: Ensuring the schema matches -
Returns:
-
(LazyFrame)-
Parameters:
-
float_cast(Object) -- -
integer_cast(Object) -- -
extra_struct_fields(Object) -- -
extra_columns(Object) -- -
missing_struct_fields(Object) -- -
missing_columns(Object) -- -
schema(Object) --
Other tags:
- Note: -
def match_to_schema( schema, missing_columns: "raise", missing_struct_fields: "raise", extra_columns: "raise", extra_struct_fields: "raise", integer_cast: "forbid", float_cast: "forbid" ) prepare_missing_columns = lambda do |value| if value.is_a?(Expr) value._rbexpr else value end end if schema.is_a?(Hash) schema_prep = Schema.new(schema) else schema_prep = schema end if missing_columns.is_a?(Hash) missing_columns_rbexpr = missing_columns.to_h do |key, value| [key.to_s, prepare_missing_columns.(value)] end elsif missing_columns.is_a?(Expr) missing_columns_rbexpr = prepare_missing_columns.(missing_columns) else missing_columns_rbexpr = missing_columns end LazyFrame._from_rbldf( _ldf.match_to_schema( schema_prep, missing_columns_rbexpr, missing_struct_fields, extra_columns, extra_struct_fields, integer_cast, float_cast ) ) end
def max
-
(LazyFrame)-
def max _from_rbldf(_ldf.max) end
def mean
-
(LazyFrame)-
def mean _from_rbldf(_ldf.mean) end
def median
-
(LazyFrame)-
def median _from_rbldf(_ldf.median) end
def merge_sorted(other, key)
-
(LazyFrame)-
Parameters:
-
key(String) -- -
other(DataFrame) --
def merge_sorted(other, key) _from_rbldf(_ldf.merge_sorted(other._ldf, key)) end
def min
-
(LazyFrame)-
def min _from_rbldf(_ldf.min) end
def null_count
-
(LazyFrame)-
def null_count _from_rbldf(_ldf.null_count) end
def pipe(function, *args, **kwargs, &block)
-
(LazyFrame)-
Parameters:
-
kwargs(Object) -- -
args(Object) -- -
function(Object) --
def pipe(function, *args, **kwargs, &block) function.(self, *args, **kwargs, &block) end
def pipe_with_schema(function)
-
(LazyFrame)-
Parameters:
-
function(Object) --
Other tags:
- Note: -
def pipe_with_schema(function) wrapper = lambda do |lf_and_schema| # The last index is because we return a list for multiple inputs # to make `pipe_with_schemas` (plural) work, but we don't use that function.( _from_rbldf(lf_and_schema[0][0]), lf_and_schema[1][0] )._ldf end _from_rbldf(_ldf.pipe_with_schema(wrapper)) end
def pivot(
- Example: Using `pivot`, we can reshape so we have one row per student, with different subjects as columns, and their `test_1` scores as values: -
Other tags:
- Note: -
Returns:
-
(LazyFrame)-
Parameters:
-
separator(String) -- -
maintain_order(Boolean) -- -
aggregate_function(Object) -- -
values(Object) -- -
index(Object) -- -
on_columns(Object) -- -
on(Object) --
def pivot( on, on_columns:, index: nil, values: nil, aggregate_function: nil, maintain_order: false, separator: "_" ) if index.nil? && values.nil? msg = "`pivot` needs either `index or `values` needs to be specified" raise InvalidOperationError, msg end on_selector = Utils.parse_list_into_selector(on) if !values.nil? values_selector = Utils.parse_list_into_selector(values) end if !index.nil? index_selector = Utils.parse_list_into_selector(index) end if values.nil? values_selector = Polars.cs.all - on_selector - index_selector end if index.nil? index_selector = Polars.cs.all - on_selector - values_selector end agg = F.element if aggregate_function.is_a?(::String) if aggregate_function == "first" agg = agg.first elsif aggregate_function == "item" agg = agg.item elsif aggregate_function == "sum" agg = agg.sum elsif aggregate_function == "max" agg = agg.max elsif aggregate_function == "min" agg = agg.min elsif aggregate_function == "mean" agg = agg.mean elsif aggregate_function == "median" agg = agg.median elsif aggregate_function == "last" agg = agg.last elsif aggregate_function == "len" agg = agg.len elsif aggregate_function == "count" Utils.issue_deprecation_warning( "`aggregate_function='count'` input for `pivot` is deprecated." + " Please use `aggregate_function='len'`." ) agg = agg.len else msg = "invalid input for `aggregate_function` argument: #{aggregate_function.inspect}" raise ArgumentError, msg end elsif aggregate_function.nil? agg = agg.item(allow_empty: true) else agg = aggregate_function end if on_columns.is_a?(DataFrame) on_cols = on_columns elsif on_columns.is_a?(Series) on_cols = on_columns.to_frame else on_cols = Series.new(on_columns).to_frame end _from_rbldf( _ldf.pivot( on_selector._rbselector, on_cols._df, index_selector._rbselector, values_selector._rbselector, agg._rbexpr, maintain_order, separator ) ) end
def profile(
-
(Array)-
Parameters:
-
optimizations(Object) -- -
engine(String) --
def profile( engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS ) engine = _select_engine(engine) ldf = _ldf.with_optimizations(optimizations._rboptflags) df_rb, timings_rb = ldf.profile [Utils.wrap_df(df_rb), Utils.wrap_df(timings_rb)] end
def quantile(quantile, interpolation: "nearest")
-
(LazyFrame)-
Parameters:
-
interpolation("nearest", "higher", "lower", "midpoint", "linear") -- -
quantile(Float) --
def quantile(quantile, interpolation: "nearest") quantile = Utils.parse_into_expression(quantile, str_as_lit: false) _from_rbldf(_ldf.quantile(quantile, interpolation)) end
def remove(
- Example: Remove rows by comparing two columns against each other; in this case, we remove rows where the two columns are not equal (using `ne_missing` to ensure that null values compare equal): -
Example: Provide constraints(s) using `**kwargs` syntax: -
Example: Provide multiple constraints using `*args` syntax: -
Example: Discard rows based on multiple conditions, combined with and/or operators: -
Example: Remove rows matching a condition: -
Returns:
-
(LazyFrame)-
Parameters:
-
constraints(Hash) -- -
predicates(Array) --
def remove( *predicates, **constraints ) if constraints.empty? # early-exit conditions (exclude/include all rows) if predicates.empty? || (predicates.length == 1 && predicates[0].is_a?(TrueClass)) return clear end if predicates.length == 1 && predicates[0].is_a?(FalseClass) return dup end end _filter( predicates: predicates, constraints: constraints, invert: true ) end
def rename(mapping, strict: true)
-
(LazyFrame)-
Parameters:
-
strict(Boolean) -- -
mapping(Hash) --
def rename(mapping, strict: true) if mapping.respond_to?(:call) select(F.all.name.map(&mapping)) else existing = mapping.keys _new = mapping.values _from_rbldf(_ldf.rename(existing, _new, strict)) end end
def reverse
-
(LazyFrame)-
def reverse _from_rbldf(_ldf.reverse) end
def rolling(
-
(LazyFrame)-
Parameters:
-
group_by(Object) -- -
closed("right", "left", "both", "none") -- -
offset(Object) -- -
period(Object) -- -
index_column(Object) --
def rolling( index_column:, period:, offset: nil, closed: "right", group_by: nil ) index_column = Utils.parse_into_expression(index_column) if offset.nil? offset = Utils.negate_duration_string(Utils.parse_as_duration_string(period)) end rbexprs_by = ( !group_by.nil? ? Utils.parse_into_list_of_expressions(group_by) : [] ) period = Utils.parse_as_duration_string(period) offset = Utils.parse_as_duration_string(offset) lgb = _ldf.rolling(index_column, period, offset, closed, rbexprs_by) LazyGroupBy.new(lgb) end
def schema
-
(Hash)-
def schema _ldf.collect_schema end
def select(*exprs, **named_exprs)
-
(LazyFrame)-
Parameters:
-
named_exprs(Hash) -- -
exprs(Array) --
def select(*exprs, **named_exprs) structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", "0") != "0" rbexprs = Utils.parse_into_list_of_expressions( *exprs, **named_exprs, __structify: structify ) _from_rbldf(_ldf.select(rbexprs)) end
def select_seq(*exprs, **named_exprs)
-
(LazyFrame)-
Parameters:
-
named_exprs(Hash) -- -
exprs(Array) --
def select_seq(*exprs, **named_exprs) structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", 0).to_i != 0 rbexprs = Utils.parse_into_list_of_expressions( *exprs, **named_exprs, __structify: structify ) _from_rbldf(_ldf.select_seq(rbexprs)) end
def serialize(file = nil, format: "binary")
- Example: Serialize the logical plan into a binary representation. -
Other tags:
- Note: -
Returns:
-
(Object)-
Parameters:
-
format('binary', 'json') -- -
file(Object) --
def serialize(file = nil, format: "binary") if format == "binary" raise Todo unless _ldf.respond_to?(:serialize_binary) serializer = _ldf.method(:serialize_binary) elsif format == "json" msg = "'json' serialization format of LazyFrame is deprecated" warn msg serializer = _ldf.method(:serialize_json) else msg = "`format` must be one of {{'binary', 'json'}}, got #{format.inspect}" raise ArgumentError, msg end Utils.serialize_polars_object(serializer, file) end
def set_sorted(
-
(LazyFrame)-
Parameters:
-
nulls_last(Boolean) -- -
descending(Boolean) -- -
more_columns(Array) -- -
column(Object) --
Other tags:
- Note: -
def set_sorted( column, *more_columns, descending: false, nulls_last: false ) if !Utils.strlike?(column) msg = "expected a 'str' for argument 'column' in 'set_sorted'" raise TypeError, msg end if Utils.bool?(descending) ds = [descending] else ds = descending end if Utils.bool?(nulls_last) nl = [nulls_last] else nl = nulls_last end _from_rbldf( _ldf.hint_sorted( [column] + more_columns, ds, nl ) ) end
def shift(n = 1, fill_value: nil)
-
(LazyFrame)-
Parameters:
-
fill_value(Object) -- -
n(Integer) --
def shift(n = 1, fill_value: nil) if !fill_value.nil? fill_value = Utils.parse_into_expression(fill_value, str_as_lit: true) end n = Utils.parse_into_expression(n) _from_rbldf(_ldf.shift(n, fill_value)) end
def show_graph(
-
(Object)-
Parameters:
-
optimizations(Object) -- -
plan_stage('ir', 'physical') -- -
engine(String) -- -
raw_output(Boolean) -- -
output_path(String) -- -
show(Boolean) -- -
optimized(Boolean) --
def show_graph( optimized: true, show: true, output_path: nil, raw_output: false, figsize: [16.0, 12.0], engine: "auto", plan_stage: "ir", optimizations: DEFAULT_QUERY_OPT_FLAGS ) engine = _select_engine(engine) if engine == "streaming" issue_unstable_warning("streaming mode is considered unstable.") end optimizations = optimizations.dup optimizations._rboptflags.streaming = engine == "streaming" _ldf = self._ldf.with_optimizations(optimizations._rboptflags) if plan_stage == "ir" dot = _ldf.to_dot(optimized) elsif plan_stage == "physical" if engine == "streaming" dot = _ldf.to_dot_streaming_phys(optimized) else dot = _ldf.to_dot(optimized) end else error_msg = "invalid plan stage '#{plan_stage}'" raise TypeError, error_msg end Utils.display_dot_graph( dot: dot, show: show, output_path: output_path, raw_output: raw_output ) end
def sink_batches(
-
(Object)-
Parameters:
-
optimizations(Object) -- -
engine(String) -- -
lazy(Boolean) -- -
maintain_order(Boolean) -- -
chunk_size(Integer) --
Other tags:
- Note: -
Note: -
def sink_batches( chunk_size: nil, maintain_order: true, lazy: false, engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS, &function ) _wrap = lambda do |rbdf| df = Utils.wrap_df(rbdf) !!function.(df) end ldf = _ldf.sink_batches( _wrap, maintain_order, chunk_size ) if !lazy ldf = ldf.with_optimizations(optimizations._rboptflags) lf = LazyFrame._from_rbldf(ldf) lf.collect(engine: engine) return nil end LazyFrame._from_rbldf(ldf) end
def sink_csv(
-
(DataFrame)-
Parameters:
-
optimizations() -- -
engine() -- -
lazy(Boolean) -- -
mkdir(Boolean) -- -
sync_on_close('data', 'all') -- -
retries(Integer) -- -
storage_options(Object) -- -
maintain_order(Boolean) -- -
quote_style("necessary", "always", "non_numeric", "never") -- -
null_value(String) -- -
decimal_comma(Boolean) -- -
float_precision(Integer) -- -
float_scientific(Integer) -- -
time_format(String) -- -
date_format(String) -- -
datetime_format(String) -- -
batch_size(Integer) -- -
quote_char(String) -- -
line_terminator(String) -- -
separator(String) -- -
include_header(Boolean) -- -
include_bom(Boolean) -- -
path(String) --
def sink_csv( path, include_bom: false, compression: "uncompressed", compression_level: nil, check_extension: true, include_header: true, separator: ",", line_terminator: "\n", quote_char: '"', batch_size: 1024, datetime_format: nil, date_format: nil, time_format: nil, float_scientific: nil, float_precision: nil, decimal_comma: false, null_value: nil, quote_style: nil, maintain_order: true, storage_options: nil, credential_provider: "auto", retries: nil, sync_on_close: nil, mkdir: false, lazy: false, engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS ) Utils._check_arg_is_1byte("separator", separator, false) Utils._check_arg_is_1byte("quote_char", quote_char, false) engine = _select_engine(engine) _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder) credential_provider_builder = _init_credential_provider_builder.( credential_provider, path, storage_options, "sink_csv" ) target = _to_sink_target(path) if !retries.nil? msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead." Utils.issue_deprecation_warning(msg) storage_options = storage_options || {} storage_options["max_retries"] = retries end sink_options = SinkOptions.new( mkdir: mkdir, maintain_order: maintain_order, sync_on_close: sync_on_close, storage_options: storage_options, credential_provider: credential_provider_builder ) ldf_rb = _ldf.sink_csv( target, sink_options, include_bom, compression, compression_level, check_extension, include_header, separator.ord, line_terminator, quote_char.ord, batch_size, datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value, quote_style ) if !lazy ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags) ldf = LazyFrame._from_rbldf(ldf_rb) ldf.collect(engine: engine) return nil end LazyFrame._from_rbldf(ldf_rb) end
def sink_delta(
- Example: Sink a large than fits into memory dataset to a Delta Lake table. -
Returns:
-
(Object)-
Parameters:
-
optimizations(Object) -- -
delta_merge_options(Hash) -- -
delta_write_options(Hash) -- -
credential_provider(Object) -- -
storage_options(Object) -- -
mode('error', 'append', 'overwrite', 'ignore', 'merge') -- -
target(Object) --
Other tags:
- Note: -
def sink_delta( target, mode: "error", storage_options: nil, credential_provider: "auto", delta_write_options: nil, delta_merge_options: nil, optimizations: DEFAULT_QUERY_OPT_FLAGS ) Polars.send(:_check_if_delta_available) # TODO # _check_for_unsupported_types(collect_schema.dtypes) if Utils.pathlike?(target) target = Polars.send(:_resolve_delta_lake_uri, target.to_s, strict: false) end _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder) if !target.is_a?(DeltaLake::Table) credential_provider_builder = _init_credential_provider_builder.( credential_provider, target, storage_options, "sink_delta" ) elsif !credential_provider.nil? && credential_provider != "auto" msg = "cannot use credential_provider when passing a DeltaTable object" raise ArgumentError, msg else credential_provider_builder = nil end credential_provider_creds = {} if credential_provider_builder raise Todo end # We aren't calling into polars-native write functions so we just update # the storage_options here. storage_options = if !storage_options.nil? || !credential_provider_builder.nil? (storage_options || {}).merge(credential_provider_creds) else nil end stream = collect_batches( engine: "streaming", maintain_order: true, chunk_size: nil, lazy: true, optimizations: optimizations ) if mode == "merge" if delta_merge_options.nil? msg = "you need to pass delta_merge_options with at least a given predicate for `MERGE` to work." raise ArgumentError, msg end if target.is_a?(::String) dt = DeltaLake::Table.new(target, storage_options: storage_options) else dt = target end dt.merge(stream, **delta_merge_options) else if delta_write_options.nil? delta_write_options = {} end DeltaLake.write( target, stream, mode: mode, storage_options: storage_options, **delta_write_options ) nil end end
def sink_ipc(
-
(DataFrame)-
Parameters:
-
optimizations() -- -
engine() -- -
lazy(Boolean) -- -
mkdir(Boolean) -- -
sync_on_close('data', 'all') -- -
retries(Integer) -- -
storage_options(String) -- -
maintain_order(Boolean) -- -
compression("lz4", "zstd") -- -
path(String) --
def sink_ipc( path, compression: "uncompressed", compat_level: nil, record_batch_size: nil, maintain_order: true, storage_options: nil, credential_provider: "auto", retries: nil, sync_on_close: nil, mkdir: false, lazy: false, engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS, _record_batch_statistics: false ) engine = _select_engine(engine) _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder) if !retries.nil? msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead." Utils.issue_deprecation_warning(msg) storage_options = storage_options || {} storage_options["max_retries"] = retries end credential_provider_builder = _init_credential_provider_builder.( credential_provider, path, storage_options, "sink_ipc" ) target = _to_sink_target(path) compat_level_rb = nil if compat_level.nil? compat_level_rb = true else raise Todo end if compression.nil? compression = "uncompressed" end sink_options = SinkOptions.new( mkdir: mkdir, maintain_order: maintain_order, sync_on_close: sync_on_close, storage_options: storage_options, credential_provider: credential_provider_builder ) ldf_rb = _ldf.sink_ipc( target, sink_options, compression, compat_level_rb, record_batch_size, _record_batch_statistics ) if !lazy ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags) ldf = LazyFrame._from_rbldf(ldf_rb) ldf.collect(engine: engine) return nil end LazyFrame._from_rbldf(ldf_rb) end
def sink_ndjson(
-
(DataFrame)-
Parameters:
-
optimizations() -- -
engine() -- -
lazy(Boolean) -- -
mkdir(Boolean) -- -
sync_on_close('data', 'all') -- -
retries(Integer) -- -
storage_options(String) -- -
maintain_order(Boolean) -- -
path(String) --
def sink_ndjson( path, compression: "uncompressed", compression_level: nil, check_extension: true, maintain_order: true, storage_options: nil, credential_provider: "auto", retries: nil, sync_on_close: nil, mkdir: false, lazy: false, engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS ) engine = _select_engine(engine) if !retries.nil? msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead." Utils.issue_deprecation_warning(msg) storage_options = storage_options || {} storage_options["max_retries"] = retries end _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder) credential_provider_builder = _init_credential_provider_builder.( credential_provider, path, storage_options, "sink_ndjson" ) target = _to_sink_target(path) sink_options = SinkOptions.new( mkdir: mkdir, maintain_order: maintain_order, sync_on_close: sync_on_close, storage_options: storage_options, credential_provider: credential_provider_builder ) ldf_rb = _ldf.sink_ndjson( target, compression, compression_level, check_extension, sink_options ) if !lazy ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags) ldf = LazyFrame._from_rbldf(ldf_rb) ldf.collect(engine: engine) return nil end LazyFrame._from_rbldf(ldf_rb) end
def sink_parquet(
-
(DataFrame)-
Parameters:
-
optimizations() -- -
engine() -- -
lazy(Boolean) -- -
mkdir(Boolean) -- -
sync_on_close('data', 'all') -- -
retries(Integer) -- -
storage_options(String) -- -
maintain_order(Boolean) -- -
data_page_size(Integer) -- -
row_group_size(Integer) -- -
statistics(Boolean) -- -
compression_level(Integer) -- -
compression("lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd") -- -
path(String) --
def sink_parquet( path, compression: "zstd", compression_level: nil, statistics: true, row_group_size: nil, data_page_size: nil, maintain_order: true, storage_options: nil, credential_provider: "auto", retries: nil, sync_on_close: nil, metadata: nil, mkdir: false, lazy: false, arrow_schema: nil, engine: "auto", optimizations: DEFAULT_QUERY_OPT_FLAGS ) engine = _select_engine(engine) if statistics == true statistics = { min: true, max: true, distinct_count: false, null_count: true } elsif statistics == false statistics = {} elsif statistics == "full" statistics = { min: true, max: true, distinct_count: true, null_count: true } end _init_credential_provider_builder = Polars.method(:_init_credential_provider_builder) if !retries.nil? msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead." Utils.issue_deprecation_warning(msg) storage_options = storage_options || {} storage_options["max_retries"] = retries end credential_provider_builder = _init_credential_provider_builder.( credential_provider, path, storage_options, "sink_parquet" ) target = _to_sink_target(path) sink_options = SinkOptions.new( mkdir: mkdir, maintain_order: maintain_order, sync_on_close: sync_on_close, storage_options: storage_options, credential_provider: credential_provider_builder ) ldf_rb = _ldf.sink_parquet( target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size, metadata, arrow_schema ) if !lazy ldf_rb = ldf_rb.with_optimizations(optimizations._rboptflags) ldf = LazyFrame._from_rbldf(ldf_rb) ldf.collect(engine: engine) return nil end LazyFrame._from_rbldf(ldf_rb) end
def slice(offset, length = nil)
-
(LazyFrame)-
Parameters:
-
length(Integer) -- -
offset(Integer) --
def slice(offset, length = nil) if length && length < 0 raise ArgumentError, "Negative slice lengths (#{length}) are invalid for LazyFrame" end _from_rbldf(_ldf.slice(offset, length)) end
def sort(by, *more_by, descending: false, nulls_last: false, maintain_order: false, multithreaded: true)
-
(LazyFrame)-
Parameters:
-
multithreaded(Boolean) -- -
maintain_order(Boolean) -- -
nulls_last(Boolean) -- -
descending(Boolean) -- -
more_by(Array) -- -
by(Object) --
def sort(by, *more_by, descending: false, nulls_last: false, maintain_order: false, multithreaded: true) if by.is_a?(::String) && more_by.empty? return _from_rbldf( _ldf.sort( by, descending, nulls_last, maintain_order, multithreaded ) ) end by = Utils.parse_into_list_of_expressions(by, *more_by) descending = Utils.extend_bool(descending, by.length, "descending", "by") nulls_last = Utils.extend_bool(nulls_last, by.length, "nulls_last", "by") _from_rbldf( _ldf.sort_by_exprs( by, descending, nulls_last, maintain_order, multithreaded ) ) end
def sql(query, table_name: "self")
- Example: Apply SQL transforms (aliasing "self" to "frame") then filter natively (you can freely mix SQL and native operations): -
Example: Query the LazyFrame using SQL: -
Other tags:
- Note: -
Note: -
Returns:
-
(Expr)-
Parameters:
-
table_name(String) -- -
query(String) --
def sql(query, table_name: "self") ctx = Polars::SQLContext.new name = table_name || "self" ctx.register(name, self) ctx.execute(query) end
def std(ddof: 1)
-
(LazyFrame)-
def std(ddof: 1) _from_rbldf(_ldf.std(ddof)) end
def sum
-
(LazyFrame)-
def sum _from_rbldf(_ldf.sum) end
def tail(n = 5)
-
(LazyFrame)-
Parameters:
-
n(Integer) --
def tail(n = 5) _from_rbldf(_ldf.tail(n)) end
def to_s
-
(String)-
def to_s <<~EOS naive plan: (run LazyFrame#explain(optimized: true) to see the optimized plan) #{explain(optimized: false)} EOS end
def top_k(
- Example: Get the rows which contain the 4 largest values when sorting on column b and a. -
Example: Get the rows which contain the 4 largest values in column b. -
Returns:
-
(LazyFrame)-
Parameters:
-
reverse(Object) -- -
by(Object) -- -
k(Integer) --
def top_k( k, by:, reverse: false ) by = Utils.parse_into_list_of_expressions(by) reverse = Utils.extend_bool(reverse, by.length, "reverse", "by") _from_rbldf(_ldf.top_k(k, by, reverse)) end
def unique(maintain_order: false, subset: nil, keep: "any")
-
(LazyFrame)-
Parameters:
-
keep("first", "last") -- -
subset(Object) -- -
maintain_order(Boolean) --
def unique(maintain_order: false, subset: nil, keep: "any") parsed_subset = nil if !subset.nil? parsed_subset = Utils.parse_into_list_of_expressions(subset, __require_selectors: true) end _from_rbldf(_ldf.unique(maintain_order, parsed_subset, keep)) end
def unnest(columns, *more_columns, separator: nil)
-
(LazyFrame)-
Parameters:
-
separator(String) -- -
more_columns(Array) -- -
columns(Object) --
def unnest(columns, *more_columns, separator: nil) subset = Utils.parse_list_into_selector(columns) | Utils.parse_list_into_selector( more_columns ) _from_rbldf(_ldf.unnest(subset._rbselector, separator)) end
def unpivot(
-
(LazyFrame)-
Parameters:
-
streamable(Boolean) -- -
value_name(String) -- -
variable_name(String) -- -
index(Object) -- -
on(Object) --
def unpivot( on = nil, index: nil, variable_name: nil, value_name: nil, streamable: true ) if !streamable warn "The `streamable` parameter for `LazyFrame.unpivot` is deprecated" end selector_on = on.nil? ? Selectors.empty : Utils.parse_list_into_selector(on) selector_index = index.nil? ? Selectors.empty : Utils.parse_list_into_selector(index) _from_rbldf( _ldf.unpivot( selector_on._rbselector, selector_index._rbselector, value_name, variable_name ) ) end
def update(
- Example: Update `df` values including null values in `new_df`, using a full outer join strategy that defines explicit join columns in each frame: -
Example: Update `df` values with the non-null values in `new_df`, using a full outer join strategy that defines explicit join columns in each frame: -
Example: Update `df` values with the non-null values in `new_df`, by row index, but only keeping those rows that are common to both frames: -
Example: Update `df` values with the non-null values in `new_df`, by row index: -
Other tags:
- Note: -
Note: -
Returns:
-
(LazyFrame)-
Parameters:
-
maintain_order('none', 'left', 'right', 'left_right', 'right_left') -- -
include_nulls(Boolean) -- -
right_on(Object) -- -
left_on(Object) -- -
how('left', 'inner', 'full') -- -
on(Object) -- -
other(LazyFrame) --
def update( other, on: nil, how: "left", left_on: nil, right_on: nil, include_nulls: false, maintain_order: "left" ) Utils.require_same_type(self, other) if ["outer", "outer_coalesce"].include?(how) how = "full" end if !["left", "inner", "full"].include?(how) msg = "`how` must be one of {{'left', 'inner', 'full'}}; found #{how.inspect}" raise ArgumentError, msg end slf = self row_index_used = false if on.nil? if left_on.nil? && right_on.nil? # no keys provided--use row index row_index_used = true row_index_name = "__POLARS_ROW_INDEX" slf = slf.with_row_index(name: row_index_name) other = other.with_row_index(name: row_index_name) left_on = right_on = [row_index_name] else # one of left or right is missing, raise error if left_on.nil? msg = "missing join columns for left frame" raise ArgumentError, msg end if right_on.nil? msg = "missing join columns for right frame" raise ArgumentError, msg end end else # move on into left/right_on to simplify logic left_on = right_on = on end if left_on.is_a?(::String) left_on = [left_on] end if right_on.is_a?(::String) right_on = [right_on] end left_schema = slf.collect_schema left_on.each do |name| if !left_schema.include?(name) msg = "left join column #{name.inspect} not found" raise ArgumentError, msg end end right_schema = other.collect_schema right_on.each do |name| if !right_schema.include?(name) msg = "right join column #{name.inspect} not found" raise ArgumentError, msg end end # no need to join if *only* join columns are in other (inner/left update only) if how != "full" && right_schema.length == right_on.length if row_index_used return slf.drop(row_index_name) end return slf end # only use non-idx right columns present in left frame right_other = Set.new(right_schema.to_h.keys).intersection(left_schema.to_h.keys) - Set.new(right_on) # When include_nulls is true, we need to distinguish records after the join that # were originally null in the right frame, as opposed to records that were null # because the key was missing from the right frame. # Add a validity column to track whether row was matched or not. if include_nulls validity = ["__POLARS_VALIDITY"] other = other.with_columns(F.lit(true).alias(validity[0])) else validity = [] end tmp_name = "__POLARS_RIGHT" drop_columns = right_other.map { |name| "#{name}#{tmp_name}" } + validity result = ( slf.join( other.select(*right_on, *right_other, *validity), left_on: left_on, right_on: right_on, how: how, suffix: tmp_name, coalesce: true, maintain_order: maintain_order ) .with_columns( right_other.map do |name| ( if include_nulls # use left value only when right value failed to join F.when(F.col(validity).is_null) .then(F.col(name)) .otherwise(F.col("#{name}#{tmp_name}")) else F.coalesce(["#{name}#{tmp_name}", F.col(name)]) end ).alias(name) end ) .drop(drop_columns) ) if row_index_used result = result.drop(row_index_name) end _from_rbldf(result._ldf) end
def var(ddof: 1)
-
(LazyFrame)-
def var(ddof: 1) _from_rbldf(_ldf.var(ddof)) end
def width
-
(Integer)-
def width _ldf.collect_schema.length end
def with_columns(*exprs, **named_exprs)
-
(LazyFrame)-
Parameters:
-
named_exprs(Hash) -- -
exprs(Object) --
def with_columns(*exprs, **named_exprs) structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", "0") != "0" rbexprs = Utils.parse_into_list_of_expressions(*exprs, **named_exprs, __structify: structify) _from_rbldf(_ldf.with_columns(rbexprs)) end
def with_columns_seq(
-
(LazyFrame)-
Parameters:
-
named_exprs(Hash) -- -
exprs(Array) --
def with_columns_seq( *exprs, **named_exprs ) structify = ENV.fetch("POLARS_AUTO_STRUCTIFY", 0).to_i != 0 rbexprs = Utils.parse_into_list_of_expressions( *exprs, **named_exprs, __structify: structify ) _from_rbldf(_ldf.with_columns_seq(rbexprs)) end
def with_row_index(name: "index", offset: 0)
- Note: -
Returns:
-
(LazyFrame)-
Parameters:
-
offset(Integer) -- -
name(String) --
def with_row_index(name: "index", offset: 0) _from_rbldf(_ldf.with_row_index(name, offset)) end