module Multiwoven::Integrations::Core::Utils
def build_catalog(catalog_json)
def build_catalog(catalog_json) streams = catalog_json["streams"].map { |stream_json| build_stream(stream_json) } Multiwoven::Integrations::Protocol::Catalog.new( streams: streams, request_rate_limit: catalog_json["request_rate_limit"] || 60, request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute", request_rate_concurrency: catalog_json["request_rate_concurrency"] || 10, schema_mode: catalog_json["schema_mode"] || "schema" ) end
def build_stream(stream_json)
def build_stream(stream_json) Multiwoven::Integrations::Protocol::Stream.new( name: stream_json["name"], url: stream_json["url"], action: stream_json["action"], request_method: stream_json["method"], batch_support: stream_json["batch_support"] || false, batch_size: stream_json["batch_size"] || 1, json_schema: stream_json["json_schema"], request_rate_limit: stream_json["request_rate_limit"].to_i, request_rate_limit_unit: stream_json["request_rate_limit_unit"] || "minute", request_rate_concurrency: stream_json["request_rate_concurrency"].to_i, supported_sync_modes: stream_json["supported_sync_modes"] ) end
def convert_to_json_schema(column_definitions)
def convert_to_json_schema(column_definitions) json_schema = { "type" => "object", "properties" => {} } column_definitions.each do |column| column_name = column[:column_name] type = column[:type] optional = column[:optional] json_type = map_type_to_json_schema(type) json_schema["properties"][column_name] = { "type" => json_type } json_schema["properties"][column_name]["type"] = [json_type, "null"] if optional end json_schema end
def create_log_message(context, type, exception)
def create_log_message(context, type, exception) Integrations::Protocol::LogMessage.new( name: context, level: type, message: exception.message ).to_multiwoven_message end
def extract_data(record_object, properties)
def extract_data(record_object, properties) data_attributes = record_object.with_indifferent_access data_attributes.select { |key, _| properties.key?(key.to_sym) } end
def handle_exception(exception, meta = {})
def handle_exception(exception, meta = {}) logger.error( "#{hash_to_string(meta)}: #{exception.message}" ) report_exception(exception, meta) create_log_message(meta[:context], meta[:type], exception) end
def hash_to_string(hash)
def hash_to_string(hash) hash.map { |key, value| "#{key} = #{value}" }.join(", ") end
def keys_to_symbols(hash)
def keys_to_symbols(hash) if hash.is_a?(Hash) hash.each_with_object({}) do |(key, value), result| result[key.to_sym] = keys_to_symbols(value) end elsif hash.is_a?(Array) hash.map { |item| keys_to_symbols(item) } else hash end end
def logger
def logger Integrations::Service.logger end
def map_type_to_json_schema(type)
def map_type_to_json_schema(type) case type when "NUMBER" "integer" else "string" # Default type end end
def report_exception(exception, meta = {})
def report_exception(exception, meta = {}) reporter = Integrations::Service.exception_reporter reporter&.report(exception, meta) end
def success?(response)
def success?(response) response && %w[200 201].include?(response.code.to_s) end