class DeltaLake::Table

def self.exists?(table_uri, storage_options: nil)

def self.exists?(table_uri, storage_options: nil)
  RawDeltaTable.is_deltatable(table_uri, storage_options)
end

def _stringify_partition_values(partition_filters)

private
def _stringify_partition_values(partition_filters)
  if partition_filters.nil?
    return partition_filters
  end
  raise Todo
end

def _table

private
def _table
  @table
end

def alter

def alter
  TableAlterer.new(self)
end

def delete(

def delete(
  predicate = nil,
  writer_properties: nil,
  post_commithook_properties: nil,
  commit_properties: nil
)
  metrics =
    @table.delete(
      predicate,
      writer_properties,
      post_commithook_properties,
      commit_properties
    )
  JSON.parse(metrics).transform_keys(&:to_sym)
end

def file_uris(partition_filters: nil)

def file_uris(partition_filters: nil)
  @table.file_uris(_stringify_partition_values(partition_filters))
end

def files(partition_filters: nil)

def files(partition_filters: nil)
  @table.files(_stringify_partition_values(partition_filters))
end

def history(limit: nil)

def history(limit: nil)
  backwards_enumerate = lambda do |iterable, start_end, &block|
    n = start_end
    iterable.each do |elem|
      block.call(n, elem)
      n -= 1
    end
  end
  commits = @table.history(limit)
  history = []
  backwards_enumerate.(commits, @table.get_latest_version) do |version, commit_info_raw|
    commit = JSON.parse(commit_info_raw)
    commit["version"] = version
    history << commit
  end
  history
end

def initialize(

def initialize(
  table_uri,
  version: nil,
  storage_options: nil,
  without_files: false,
  log_buffer_size: nil
)
  @storage_options = storage_options
  @table =
    RawDeltaTable.new(
      table_uri,
      version,
      storage_options,
      without_files,
      log_buffer_size
    )
end

def load_as_version(version)

def load_as_version(version)
  if version.is_a?(Integer)
    @table.load_version(version)
  elsif version.is_a?(Time)
    @table.load_with_datetime(version.utc.iso8601(9))
  elsif version.is_a?(String)
    @table.load_with_datetime(version)
  else
    raise TypeError, "Invalid datatype provided for version, only Integer, String, and Time are accepted."
  end
end

def load_cdf(

def load_cdf(
  starting_version: 0,
  ending_version: nil,
  starting_timestamp: nil,
  ending_timestamp: nil,
  columns: nil
)
  @table.load_cdf(
    starting_version,
    ending_version,
    starting_timestamp,
    ending_timestamp,
    columns
  )
end

def merge(

def merge(
  source,
  predicate,
  source_alias: nil,
  target_alias: nil,
  error_on_type_mismatch: true,
  writer_properties: nil,
  post_commithook_properties: nil,
  commit_properties: nil
)
  source = Utils.convert_data(source)
  rb_merge_builder =
    @table.create_merge_builder(
      source,
      predicate,
      source_alias,
      target_alias,
      !error_on_type_mismatch,
      writer_properties,
      post_commithook_properties,
      commit_properties
    )
  TableMerger.new(rb_merge_builder, @table)
end

def metadata

def metadata
  Metadata.new(@table)
end

def optimize

def optimize
  TableOptimizer.new(self)
end

def partitions

def partitions
  partitions = []
  @table.get_active_partitions.each do |partition|
    next unless partition
    partitions << partition.to_h
  end
  partitions
end

def protocol

def protocol
  ProtocolVersions.new(*@table.protocol_versions)
end

def repair(

def repair(
  dry_run: false,
  post_commithook_properties: nil,
  commit_properties: nil
)
  metrics =
    @table.repair(
      dry_run,
      commit_properties,
      post_commithook_properties
    )
  deserialized_metrics = JSON.parse(metrics)
  deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL] = JSON.parse(
    deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL]
  )
  deserialized_metrics.transform_keys(&:to_sym)
end

def restore(

def restore(
  target,
  ignore_missing_files: false,
  protocol_downgrade_allowed: false,
  commit_properties: nil
)
  if target.is_a?(Time)
    metrics =
      @table.restore(
        target.utc.iso8601(9),
        ignore_missing_files,
        protocol_downgrade_allowed,
        commit_properties
      )
  else
    metrics =
      @table.restore(
        target,
        ignore_missing_files,
        protocol_downgrade_allowed,
        commit_properties
      )
  end
  JSON.parse(metrics)
end

def schema

def schema
  @table.schema
end

def table_uri

def table_uri
  @table.table_uri
end

def to_polars(eager: true, rechunk: false, columns: nil)

def to_polars(eager: true, rechunk: false, columns: nil)
  require "polars-df"
  sources = file_uris
  if sources.empty?
    # TODO pass schema
    lf = Polars::LazyFrame.new
  else
    delta_keys = [
      "AWS_S3_ALLOW_UNSAFE_RENAME",
      "AWS_S3_LOCKING_PROVIDER",
      "CONDITIONAL_PUT",
      "DELTA_DYNAMO_TABLE_NAME"
    ]
    storage_options = @storage_options&.reject { |k, _| delta_keys.include?(k.to_s.upcase) }
    lf =
      Polars.scan_parquet(
        sources,
        hive_partitioning: true,
        storage_options: storage_options,
        rechunk: rechunk,
        allow_missing_columns: true
      )
    if columns
      # by_name requires polars-df > 0.15.0
      lf = lf.select(Polars.cs.by_name(*columns))
    end
  end
  eager ? lf.collect : lf
end

def transaction_version(app_id)

def transaction_version(app_id)
  @table.transaction_version(app_id)
end

def update_incremental

def update_incremental
  @table.update_incremental
end

def vacuum(

def vacuum(
  retention_hours: nil,
  dry_run: true,
  enforce_retention_duration: true,
  post_commithook_properties: nil,
  commit_properties: nil
)
  if retention_hours
    if retention_hours < 0
      raise ArgumentError, "The retention periods should be positive."
    end
  end
  @table.vacuum(
    dry_run,
    retention_hours,
    enforce_retention_duration,
    commit_properties,
    post_commithook_properties
  )
end

def version

def version
  @table.version
end