lib/multiwoven/integrations/core/utils.rb
# frozen_string_literal: true
module Multiwoven
module Integrations::Core
module Utils
def keys_to_symbols(hash)
if hash.is_a?(Hash)
hash.each_with_object({}) do |(key, value), result|
result[key.to_sym] = keys_to_symbols(value)
end
elsif hash.is_a?(Array)
hash.map { |item| keys_to_symbols(item) }
else
hash
end
end
def convert_to_json_schema(column_definitions)
json_schema = {
"type" => "object",
"properties" => {}
}
column_definitions.each do |column|
column_name = column[:column_name]
type = column[:type]
optional = column[:optional]
json_type = map_type_to_json_schema(type)
json_schema["properties"][column_name] = {
"type" => json_type
}
json_schema["properties"][column_name]["type"] = [json_type, "null"] if optional
end
json_schema
end
def map_type_to_json_schema(type)
case type
when "NUMBER"
"integer"
when "vector"
"vector"
else
"string" # Default type
end
end
def logger
Integrations::Service.logger
end
def report_exception(exception, meta = {})
reporter = Integrations::Service.exception_reporter
reporter&.report(exception, meta)
end
def log_request_response(level, request, response, record_identifier = nil)
Integrations::Protocol::LogMessage.new(
record_identifier: record_identifier,
name: self.class.name,
level: level,
message: { request: request.to_s, response: response.to_s, level: level }.to_json
)
end
def create_log_message(context, type, message, record_identifier = nil)
Integrations::Protocol::LogMessage.new(
record_identifier: record_identifier,
name: context,
level: type,
message: message
).to_multiwoven_message
end
def handle_exception(exception, meta = {})
logger.error(
"#{hash_to_string(meta)}: #{exception.message}"
)
report_exception(exception, meta)
create_log_message(meta[:context], meta[:type], exception.message)
end
def hash_to_string(hash)
hash.map { |key, value| "#{key} = #{value}" }.join(", ")
end
def extract_data(record_object, properties)
data_attributes = record_object.with_indifferent_access
data_attributes.select { |key, _| properties.key?(key.to_sym) }
end
def success?(response)
response && %w[200 201].include?(response.code.to_s)
end
def build_catalog(catalog_json)
streams = catalog_json["streams"].map { |stream_json| build_stream(stream_json) }
Multiwoven::Integrations::Protocol::Catalog.new(
streams: streams,
request_rate_limit: catalog_json["request_rate_limit"] || 60,
request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute",
request_rate_concurrency: catalog_json["request_rate_concurrency"] || 10,
schema_mode: catalog_json["schema_mode"] || "schema"
)
end
def build_stream(stream_json)
Multiwoven::Integrations::Protocol::Stream.new(
name: stream_json["name"],
url: stream_json["url"],
action: stream_json["action"],
request_method: stream_json["method"],
batch_support: stream_json["batch_support"] || false,
batch_size: stream_json["batch_size"] || 1,
json_schema: stream_json["json_schema"],
request_rate_limit: stream_json["request_rate_limit"].to_i,
request_rate_limit_unit: stream_json["request_rate_limit_unit"] || "minute",
request_rate_concurrency: stream_json["request_rate_concurrency"].to_i,
supported_sync_modes: stream_json["supported_sync_modes"]
)
end
end
end
end