lib/polars/iceberg_dataset.rb
module Polars # @private class IcebergDataset def initialize( source, snapshot_id:, storage_options: ) @source = source @snapshot_id = snapshot_id @storage_options = storage_options end def to_lazyframe scan = @source.scan(snapshot_id: @snapshot_id) files = scan.plan_files table = scan.table snapshot = scan.snapshot schema = snapshot ? table.schema_by_id(snapshot[:schema_id]) : table.current_schema if files.empty? # TODO improve schema = schema.fields.to_h do |field| dtype = case field[:type] when "int" Polars::Int32 when "long" Polars::Int64 when "double" Polars::Float64 when "string" Polars::String when "timestamp" Polars::Datetime else raise Todo end [field[:name], dtype] end LazyFrame.new(schema: schema) else sources = files.map { |v| v[:data_file_path] } column_mapping = [ "iceberg-column-mapping", arrow_schema(schema) ] deletion_files = [ "iceberg-position-delete", files.map.with_index .select { |v, i| v[:deletes].any? } .to_h { |v, i| [i, v[:deletes].map { |d| d[:file_path] }] } ] scan_options = { storage_options: @storage_options, cast_options: Polars::ScanCastOptions._default_iceberg, missing_columns: "insert", extra_columns: "ignore", _column_mapping: column_mapping, _deletion_files: deletion_files } Polars.scan_parquet(sources, **scan_options) end end private def arrow_schema(schema) fields = schema.fields.map do |field| type = case field[:type] when "boolean" "boolean" when "int" "int32" when "long" "int64" when "float" "float32" when "double" "float64" else raise Todo end { name: field[:name], type: type, nullable: !field[:required], metadata: { "PARQUET:field_id" => field[:id].to_s } } end {fields: fields} end end end