class Multiwoven::Integrations::Destination::DatabricksLakehouse::Client

def check_connection(connection_config)

def check_connection(connection_config)
  connection_config = connection_config.with_indifferent_access
  db = create_connection(connection_config)
  response = db.get("/api/2.0/clusters/list")
  if response.status == 200
    success_status
  else
    failure_status(nil)
  end
rescue StandardError => e
  handle_exception(e, {
                     context: "DATABRICKS:LAKEHOUSE:CHECK_CONNECTION:EXCEPTION",
                     type: "error"
                   })
  failure_status(e)
end

def create_connection(connection_config)

def create_connection(connection_config)
  Faraday.new(url: connection_config[:host]) do |conn|
    conn.headers["Authorization"] = "Bearer #{connection_config[:api_token]}"
    conn.headers["Content-Type"] = "application/json"
    conn.adapter Faraday.default_adapter
  end
end

def create_streams(records)

def create_streams(records)
  message = []
  group_by_table(records).each_value do |r|
    message << Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
  end
  message
end

def discover(connection_config)

def discover(connection_config)
  connection_config = connection_config.with_indifferent_access
  table_query = "SHOW TABLES IN #{connection_config[:catalog]}.#{connection_config[:schema]};"
  db = create_connection(connection_config)
  records = []
  table_response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], table_query).to_json)
  table_response_body = JSON.parse(table_response.body)
  table_response_body["result"]["data_array"].each do |table|
    table_name = table[1]
    query = "DESCRIBE TABLE #{connection_config[:catalog]}.#{connection_config[:schema]}.#{table_name};"
    column_response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query).to_json)
    column_response_body = JSON.parse(column_response.body)
    records << [table_name, column_response_body["result"]["data_array"]]
  end
  catalog = Catalog.new(streams: create_streams(records))
  catalog.to_multiwoven_message
rescue StandardError => e
  handle_exception(
    "DATABRICKS:LAKEHOUSE:DISCOVER:EXCEPTION",
    "error",
    e
  )
end

def generate_body(warehouse_id, query)

def generate_body(warehouse_id, query)
  {
    warehouse_id: warehouse_id,
    statement: query,
    wait_timeout: "15s"
  }
end

def group_by_table(records)

def group_by_table(records)
  result = {}
  records.each_with_index do |entries, index|
    table_name = records[index][0]
    column = []
    entry_data = entries[1]
    entry_data.each do |entry|
      column << {
        column_name: entry[0],
        data_type: entry[1],
        is_nullable: true
      }
    end
    result[index] ||= {}
    result[index][:tablename] = table_name
    result[index][:columns] = column
  end
  result
end

def tracking_message(success, failure)

def tracking_message(success, failure)
  Multiwoven::Integrations::Protocol::TrackingMessage.new(
    success: success, failed: failure
  ).to_multiwoven_message
end

def write(sync_config, records, action = "destination_insert")

def write(sync_config, records, action = "destination_insert")
  connection_config = sync_config.destination.connection_specification.with_indifferent_access
  table_name = "#{connection_config[:catalog]}.#{connection_config[:schema]}.#{sync_config.stream.name}"
  primary_key = sync_config.model.primary_key
  db = create_connection(connection_config)
  write_success = 0
  write_failure = 0
  log_message_array = []
  records.each do |record|
    query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
    logger.debug("DATABRICKS:LAKEHOUSE:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
    begin
      arg = ["/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query)]
      response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query).to_json)
      if response.status == 200
        write_success += 1
      else
        write_failure += 1
      end
      log_message_array << log_request_response("info", arg, response)
    rescue StandardError => e
      handle_exception(e, {
                         context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION",
                         type: "error",
                         sync_id: sync_config.sync_id,
                         sync_run_id: sync_config.sync_run_id
                       })
      write_failure += 1
    end
  end
  tracking_message(write_success, write_failure)
rescue StandardError => e
  handle_exception(e, {
                     context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION",
                     type: "error",
                     sync_id: sync_config.sync_id,
                     sync_run_id: sync_config.sync_run_id
                   })
end