lib/deltalake.rb



# ext
begin
  require "deltalake/#{RUBY_VERSION.to_f}/deltalake"
rescue LoadError
  require "deltalake/deltalake"
end

# stdlib
require "json"
require "time"

# modules
require_relative "deltalake/field"
require_relative "deltalake/metadata"
require_relative "deltalake/schema"
require_relative "deltalake/table"
require_relative "deltalake/table_alterer"
require_relative "deltalake/table_merger"
require_relative "deltalake/table_optimizer"
require_relative "deltalake/utils"
require_relative "deltalake/version"

module DeltaLake
  class Error < StandardError; end
  class TableNotFoundError < Error; end
  class DeltaProtocolError < Error; end
  class CommitFailedError < Error; end
  class SchemaMismatchError < Error; end

  class Todo < Error
    def message
      "not implemented yet"
    end
  end

  ProtocolVersions =
    Struct.new(
      :min_reader_version,
      :min_writer_version,
      :writer_features,
      :reader_features
    )

  CommitProperties =
    Struct.new(
      :custom_metadata,
      :max_commit_retries,
      # TODO
      # :app_transactions,
      keyword_init: true
    )

  PostCommitHookProperties =
    Struct.new(
      :create_checkpoint,
      :cleanup_expired_logs,
      keyword_init: true
    )

  class ArrowArrayStream
    def arrow_c_stream
      self
    end
  end

  class << self
    def write(
      table_or_uri,
      data,
      partition_by: nil,
      mode: "error",
      name: nil,
      description: nil,
      configuration: nil,
      schema_mode: nil,
      storage_options: nil,
      predicate: nil,
      target_file_size: nil,
      writer_properties: nil,
      commit_properties: nil,
      post_commithook_properties: nil
    )
      table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)

      if partition_by.is_a?(String)
        partition_by = [partition_by]
      end

      if !table.nil? && mode == "ignore"
        return
      end

      data = Utils.convert_data(data)

      if table
        table._table.write(
          data,
          mode,
          schema_mode,
          partition_by,
          predicate,
          target_file_size,
          name,
          description,
          configuration,
          writer_properties,
          commit_properties,
          post_commithook_properties
        )
      else
        write_deltalake_rust(
          table_uri,
          data,
          mode,
          schema_mode,
          partition_by,
          predicate,
          target_file_size,
          name,
          description,
          configuration,
          storage_options,
          writer_properties,
          commit_properties,
          post_commithook_properties
        )
      end
    end

    private

    def try_get_table_and_table_uri(table_or_uri, storage_options)
      if !table_or_uri.is_a?(String) && !table_or_uri.is_a?(Table)
        raise ArgumentError, "table_or_uri must be a String or Table"
      end

      if table_or_uri.is_a?(String)
        table = try_get_deltatable(table_or_uri, storage_options)
        table_uri = table_or_uri.to_s
      else
        table = table_or_uri
        table_uri = table._table.table_uri
      end

      [table, table_uri]
    end

    def try_get_deltatable(table_uri, storage_options)
      Table.new(table_uri, storage_options: storage_options)
    rescue TableNotFoundError
      nil
    end
  end
end