class Multiwoven::Integrations::Destination::Qdrant::Client
def write(sync_config, records, _action = "upsert", _identifier_key = nil)
def write(sync_config, records, _action = "upsert", _identifier_key = nil) connection_config = sync_config.destination.connection_specification.with_indifferent_access collection_name = sync_config.stream.name primary_key = sync_config.model.primary_key log_message_array = [] api_url = connection_config[:api_url] api_key = connection_config[:api_key] write_success = 0 write_failure = 0 records.each do |record| points = [] points.push({ id: record[primary_key], vector: JSON.parse(record["vector"]), payload: record["payload"] }) begin response = upsert_points(api_url, api_key, collection_name, { points: points }) if success?(response) write_success += 1 log_message_array << log_request_response("info", { points: points }, JSON.parse(response.body)) else # write_failure could be duplicated if JSON.parse errors. write_failure += 1 log_message_array << log_request_response("error", { points: points }, JSON.parse(response.body)) end rescue StandardError => e handle_exception(e, { context: "QDRANT:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) write_failure += 1 log_message_array << log_request_response("error", { points: points }, e.message) end end tracking_message(write_success, write_failure, log_message_array) rescue StandardError => e handle_exception(e, { context: "QDRANT:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) end