class Multiwoven::Integrations::Destination::Qdrant::Client
def build_catalog(data)
def build_catalog(data) streams = data["result"]["collections"].map { |collection| build_stream(collection) } Multiwoven::Integrations::Protocol::Catalog.new( streams: streams, request_rate_limit: 60, request_rate_limit_unit: "minute", request_rate_concurrency: 10 ) end
def build_stream(collection)
def build_stream(collection) response = Multiwoven::Integrations::Core::HttpClient.request( "#{@api_url}/collections/#{collection["name"]}", HTTP_GET, headers: auth_headers(@api_key) ) payload = { "type" => "object", "properties" => {} } if success?(response) data = JSON.parse(response.body) payload_schema = data["result"]["payload_schema"] payload_schema.each { |key, value| payload["properties"][key] = map_qdrant_types(value) } unless payload_schema.empty? end Multiwoven::Integrations::Protocol::Stream.new( name: collection["name"], action: "update", method: HTTP_PUT, supported_sync_modes: %w[incremental], json_schema: { "type" => "object", "required" => %w[id vector payload], "properties" => { "id" => { "type" => "string" }, "payload" => payload, "vector" => { "type" => "vector" } } } ) end
def check_connection(connection_config)
def check_connection(connection_config) connection_config = connection_config.with_indifferent_access api_url = connection_config[:api_url] api_key = connection_config[:api_key] response = Multiwoven::Integrations::Core::HttpClient.request( api_url, HTTP_GET, headers: auth_headers(api_key) ) if success?(response) success_status else failure_status(nil) end rescue StandardError => e handle_exception(e, { context: "QDRANT:CHECK_CONNECTION:EXCEPTION", type: "error" }) failure_status(e) end
def discover(connection_config = nil)
def discover(connection_config = nil) connection_config = connection_config.with_indifferent_access @api_url = connection_config[:api_url] @api_key = connection_config[:api_key] response = Multiwoven::Integrations::Core::HttpClient.request( "#{@api_url}/collections", HTTP_GET, headers: auth_headers(@api_key) ) data = JSON.parse(response.body) catalog = build_catalog(data) catalog.to_multiwoven_message rescue StandardError => e handle_exception(e, { context: "QDRANT:DISCOVER:EXCEPTION", type: "error" }) end
def map_qdrant_types(value)
def map_qdrant_types(value) case value["data_type"] when "integer" { "type" => "integer" } when "float" { "type" => "number" } when "bool" { "type" => "boolean" } when "geo" { "type" => "object", "required" => %w[lon lat], "properties" => { "lon" => { "type" => "number" }, "lat" => { "type" => "number" } } } else # datetime, keyword, text, uuid { "type" => "string" } end end
def upsert_points(api_url, api_key, collection_name, payload)
def upsert_points(api_url, api_key, collection_name, payload) Multiwoven::Integrations::Core::HttpClient.request( api_url + "/collections/#{collection_name}/points", HTTP_PUT, payload: payload, headers: auth_headers(api_key) ) end
def write(sync_config, records, _action = "upsert")
def write(sync_config, records, _action = "upsert") connection_config = sync_config.destination.connection_specification.with_indifferent_access collection_name = sync_config.stream.name primary_key = sync_config.model.primary_key log_message_array = [] api_url = connection_config[:api_url] api_key = connection_config[:api_key] write_success = 0 write_failure = 0 records.each do |record| points = [] points.push({ id: record[primary_key], vector: JSON.parse(record["vector"]), payload: record["payload"] }) begin response = upsert_points(api_url, api_key, collection_name, { points: points }) if success?(response) write_success += 1 log_message_array << log_request_response("info", { points: points }, JSON.parse(response.body)) else # write_failure could be duplicated if JSON.parse errors. write_failure += 1 log_message_array << log_request_response("error", { points: points }, JSON.parse(response.body)) end rescue StandardError => e handle_exception(e, { context: "QDRANT:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) write_failure += 1 log_message_array << log_request_response("error", { points: points }, e.message) end end tracking_message(write_success, write_failure, log_message_array) rescue StandardError => e handle_exception(e, { context: "QDRANT:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) end