class Multiwoven::Integrations::Destination::DatabricksLakehouse::Client
def write(sync_config, records, action = "destination_insert")
def write(sync_config, records, action = "destination_insert") connection_config = sync_config.destination.connection_specification.with_indifferent_access table_name = "#{connection_config[:catalog]}.#{connection_config[:schema]}.#{sync_config.stream.name}" primary_key = sync_config.model.primary_key db = create_connection(connection_config) write_success = 0 write_failure = 0 log_message_array = [] records.each do |record| query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key) logger.debug("DATABRICKS:LAKEHOUSE:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}") begin arg = ["/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query)] response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query).to_json) if response.status == 200 write_success += 1 else write_failure += 1 end log_message_array << log_request_response("info", arg, response) rescue StandardError => e handle_exception(e, { context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) write_failure += 1 end end tracking_message(write_success, write_failure) rescue StandardError => e handle_exception(e, { context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) end