lib/polars/io/delta.rb



module Polars
  module IO
    # Reads into a DataFrame from a Delta lake table.
    #
    # @param source [Object]
    #   DeltaTable or a Path or URI to the root of the Delta lake table.
    # @param version [Object]
    #   Numerical version or timestamp version of the Delta lake table.
    # @param columns [Array]
    #   Columns to select. Accepts a list of column names.
    # @param rechunk [Boolean]
    #   Make sure that all columns are contiguous in memory by
    #   aggregating the chunks into a single array.
    # @param storage_options [Hash]
    #   Extra options for the storage backends supported by `deltalake-rb`.
    # @param delta_table_options [Hash]
    #   Additional keyword arguments while reading a Delta lake Table.
    #
    # @return [DataFrame]
    def read_delta(
      source,
      version: nil,
      columns: nil,
      rechunk: nil,
      storage_options: nil,
      delta_table_options: nil
    )
      df =
        scan_delta(
          source,
          version: version,
          storage_options: storage_options,
          delta_table_options: delta_table_options,
          rechunk: rechunk
        )

      if !columns.nil?
        df = df.select(columns)
      end
      df.collect
    end

    # Lazily read from a Delta lake table.
    #
    # @param source [Object]
    #   DeltaTable or a Path or URI to the root of the Delta lake table.
    # @param version [Object]
    #   Numerical version or timestamp version of the Delta lake table.
    # @param storage_options [Hash]
    #   Extra options for the storage backends supported by `deltalake-rb`.
    # @param delta_table_options [Hash]
    #   Additional keyword arguments while reading a Delta lake Table.
    # @param rechunk [Boolean]
    #   Make sure that all columns are contiguous in memory by
    #   aggregating the chunks into a single array.
    #
    # @return [LazyFrame]
    def scan_delta(
      source,
      version: nil,
      storage_options: nil,
      delta_table_options: nil,
      rechunk: nil
    )
      dl_tbl =
        _get_delta_lake_table(
          source,
          version: version,
          storage_options: storage_options,
          delta_table_options: delta_table_options
        )

      dl_tbl.to_polars(eager: false, rechunk: rechunk || false)
    end

    private

    def _resolve_delta_lake_uri(table_uri, strict: true)
      require "uri"

      parsed_result = URI(table_uri)

      resolved_uri =
        if parsed_result.scheme == ""
          Utils.normalize_filepath(table_uri)
        else
          table_uri
        end

      resolved_uri
    end

    def _get_delta_lake_table(
      table_path,
      version: nil,
      storage_options: nil,
      delta_table_options: nil
    )
      _check_if_delta_available

      if table_path.is_a?(DeltaLake::Table)
        return table_path
      end
      delta_table_options ||= {}
      resolved_uri = _resolve_delta_lake_uri(table_path)
      if !version.is_a?(::String) && !version.is_a?(::Time)
        dl_tbl =
          DeltaLake::Table.new(
            resolved_uri,
            version: version,
            storage_options: storage_options,
            **delta_table_options
          )
      else
        dl_tbl =
          DeltaLake::Table.new(
            resolved_uri,
            storage_options: storage_options,
            **delta_table_options
          )
        dl_tbl.load_as_version(version)
      end

      dl_tbl = DeltaLake::Table.new(table_path)
      dl_tbl
    end

    def _check_if_delta_available
      if !defined?(DeltaLake)
        raise Error, "Delta Lake not available"
      end
    end
  end
end