module Polars
module IO
# Read into a DataFrame from Arrow IPC (Feather v2) file.
#
# @param source [Object]
# Path to a file or a file-like object.
# @param columns [Object]
# Columns to select. Accepts a list of column indices (starting at zero) or a list
# of column names.
# @param n_rows [Integer]
# Stop reading from IPC file after reading `n_rows`.
# @param memory_map [Boolean]
# Try to memory map the file. This can greatly improve performance on repeated
# queries as the OS may cache pages.
# Only uncompressed IPC files can be memory mapped.
# @param storage_options [Hash]
# Extra options that make sense for a particular storage connection.
# @param row_index_name [String]
# If not nil, this will insert a row count column with give name into the
# DataFrame.
# @param row_index_offset [Integer]
# Offset to start the row_count column (only use if the name is set).
# @param rechunk [Boolean]
# Make sure that all data is contiguous.
#
# @return [DataFrame]
def read_ipc(
source,
columns: nil,
n_rows: nil,
memory_map: true,
storage_options: nil,
row_index_name: nil,
row_index_offset: 0,
rechunk: true
)
storage_options ||= {}
_prepare_file_arg(source, **storage_options) do |data|
_read_ipc_impl(
data,
columns: columns,
n_rows: n_rows,
row_index_name: row_index_name,
row_index_offset: row_index_offset,
rechunk: rechunk,
memory_map: memory_map
)
end
end
# @private
def _read_ipc_impl(
file,
columns: nil,
n_rows: nil,
row_index_name: nil,
row_index_offset: 0,
rechunk: true,
memory_map: true
)
if Utils.pathlike?(file)
file = Utils.normalize_filepath(file)
end
if columns.is_a?(::String)
columns = [columns]
end
if file.is_a?(::String) && file.include?("*")
raise Todo
end
projection, columns = Utils.handle_projection_columns(columns)
rbdf =
RbDataFrame.read_ipc(
file,
columns,
projection,
n_rows,
Utils.parse_row_index_args(row_index_name, row_index_offset),
memory_map
)
Utils.wrap_df(rbdf)
end
# Read into a DataFrame from Arrow IPC record batch stream.
#
# See "Streaming format" on https://arrow.apache.org/docs/python/ipc.html.
#
# @param source [Object]
# Path to a file or a file-like object.
# @param columns [Array]
# Columns to select. Accepts a list of column indices (starting at zero) or a list
# of column names.
# @param n_rows [Integer]
# Stop reading from IPC stream after reading `n_rows`.
# @param storage_options [Hash]
# Extra options that make sense for a particular storage connection.
# @param row_index_name [String]
# Insert a row index column with the given name into the DataFrame as the first
# column. If set to `nil` (default), no row index column is created.
# @param row_index_offset [Integer]
# Start the row index at this offset. Cannot be negative.
# Only used if `row_index_name` is set.
# @param rechunk [Boolean]
# Make sure that all data is contiguous.
#
# @return [DataFrame]
def read_ipc_stream(
source,
columns: nil,
n_rows: nil,
storage_options: nil,
row_index_name: nil,
row_index_offset: 0,
rechunk: true
)
storage_options ||= {}
_prepare_file_arg(source, **storage_options) do |data|
_read_ipc_stream_impl(
data,
columns: columns,
n_rows: n_rows,
row_index_name: row_index_name,
row_index_offset: row_index_offset,
rechunk: rechunk
)
end
end
# @private
def _read_ipc_stream_impl(
source,
columns: nil,
n_rows: nil,
row_index_name: nil,
row_index_offset: 0,
rechunk: true
)
if Utils.pathlike?(source)
source = Utils.normalize_filepath(source)
end
if columns.is_a?(String)
columns = [columns]
end
projection, columns = Utils.handle_projection_columns(columns)
pydf = RbDataFrame.read_ipc_stream(
source,
columns,
projection,
n_rows,
Utils.parse_row_index_args(row_index_name, row_index_offset),
rechunk
)
Utils.wrap_df(pydf)
end
# Get a schema of the IPC file without reading data.
#
# @param source [Object]
# Path to a file or a file-like object.
#
# @return [Hash]
def read_ipc_schema(source)
if Utils.pathlike?(source)
source = Utils.normalize_filepath(source)
end
Plr.ipc_schema(source)
end
# Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns.
#
# This allows the query optimizer to push down predicates and projections to the scan
# level, thereby potentially reducing memory overhead.
#
# @param source [String]
# Path to a IPC file.
# @param n_rows [Integer]
# Stop reading from IPC file after reading `n_rows`.
# @param cache [Boolean]
# Cache the result after reading.
# @param rechunk [Boolean]
# Reallocate to contiguous memory when all chunks/ files are parsed.
# @param row_index_name [String]
# If not nil, this will insert a row count column with give name into the
# DataFrame.
# @param row_index_offset [Integer]
# Offset to start the row_count column (only use if the name is set).
# @param glob [Boolean]
# Expand path given via globbing rules.
# @param storage_options [Hash]
# Extra options that make sense for a particular storage connection.
# @param credential_provider [Object]
# Provide a function that can be called to provide cloud storage
# credentials. The function is expected to return a hash of
# credential keys along with an optional credential expiry time.
# @param retries [Integer]
# Number of retries if accessing a cloud instance fails.
# @param file_cache_ttl [Integer]
# Amount of time to keep downloaded cloud files since their last access time,
# in seconds. Uses the `POLARS_FILE_CACHE_TTL` environment variable
# (which defaults to 1 hour) if not given.
# @param hive_partitioning [Boolean]
# Infer statistics and schema from Hive partitioned URL and use them
# to prune reads. This is unset by default (i.e. `nil`), meaning it is
# automatically enabled when a single directory is passed, and otherwise
# disabled.
# @param hive_schema [Hash]
# The column names and data types of the columns by which the data is partitioned.
# If set to `nil` (default), the schema of the Hive partitions is inferred.
# @param try_parse_hive_dates [Boolean]
# Whether to try parsing hive values as date/datetime types.
# @param include_file_paths [String]
# Include the path of the source file(s) as a column with this name.
#
# @return [LazyFrame]
def scan_ipc(
source,
n_rows: nil,
cache: true,
rechunk: false,
row_index_name: nil,
row_index_offset: 0,
glob: true,
storage_options: nil,
credential_provider: "auto",
retries: nil,
file_cache_ttl: nil,
hive_partitioning: nil,
hive_schema: nil,
try_parse_hive_dates: true,
include_file_paths: nil,
_record_batch_statistics: false
)
sources = get_sources(source)
if !retries.nil?
msg = "the `retries` parameter was deprecated in 0.25.0; specify 'max_retries' in `storage_options` instead."
Utils.issue_deprecation_warning(msg)
storage_options = storage_options || {}
storage_options["max_retries"] = retries
end
if !file_cache_ttl.nil?
msg = "the `file_cache_ttl` parameter was deprecated in 0.25.0; specify 'file_cache_ttl' in `storage_options` instead."
Utils.issue_deprecation_warning(msg)
storage_options = storage_options || {}
storage_options["file_cache_ttl"] = file_cache_ttl
end
credential_provider_builder = _init_credential_provider_builder(
credential_provider, sources, storage_options, "scan_parquet"
)
rblf =
RbLazyFrame.new_from_ipc(
sources,
_record_batch_statistics,
ScanOptions.new(
row_index: !row_index_name.nil? ? [row_index_name, row_index_offset] : nil,
pre_slice: !n_rows.nil? ? [0, n_rows] : nil,
include_file_paths: include_file_paths,
glob: glob,
hive_partitioning: hive_partitioning,
hive_schema: hive_schema,
try_parse_hive_dates: try_parse_hive_dates,
rechunk: rechunk,
cache: cache,
storage_options: storage_options,
credential_provider: credential_provider_builder
)
)
Utils.wrap_ldf(rblf)
end
end
end