class Multiwoven::Integrations::Source::Odoo::Client
def check_connection(connection_config)
def check_connection(connection_config) connection_config = connection_config.with_indifferent_access create_connection(connection_config) success_status rescue StandardError => e failure_status(e) end
def convert_to_json_schema(fields)
def convert_to_json_schema(fields) json_schema = { "type" => "object", "properties" => {} } fields.each do |field| column_name = field[1]["name"] type = field[1]["type"] json_schema["properties"][column_name] = { "type" => type } end json_schema end
def create_connection(connection_config)
def create_connection(connection_config) common = XMLRPC::Client.new2("#{connection_config[:url]}/xmlrpc/2/common") common.call("version") @uid = common.call("authenticate", connection_config[:database], connection_config[:username], connection_config[:password], { 'raise_exception': true }) @client = XMLRPC::Client.new2("#{connection_config[:url]}/xmlrpc/2/object").proxy connection_config end
def create_streams(connection_config, models)
def create_streams(connection_config, models) models.map do |model| fields = @client.execute_kw(connection_config[:database], @uid, connection_config[:password], model["model"], "fields_get", [], { 'attributes': %w[name type] }) Multiwoven::Integrations::Protocol::Stream.new(name: model["model"], action: StreamAction["fetch"], supported_sync_modes: %w[incremental], json_schema: convert_to_json_schema(fields)) end end
def discover(connection_config)
def discover(connection_config) connection_config = connection_config.with_indifferent_access create_connection(connection_config) models = @client.execute_kw(connection_config[:database], @uid, connection_config[:password], "ir.model", "search_read", [[["transient", "=", false], ["abstract", "=", false]]], { 'fields': %w[name model] }) catalog = Catalog.new(streams: create_streams(connection_config, models)) catalog.to_multiwoven_message rescue StandardError => e handle_exception(e, { context: "ODOO:DISCOVER:EXCEPTION", type: "error" }) end
def query(connection, query)
def query(connection, query) limit = 0 offset = 0 order = "id DESC" limit = query.match(/LIMIT (\d+)/)[1].to_i if query.include? "LIMIT" offset = query.match(/OFFSET (\d+)/)[1].to_i if query.include? "OFFSET" order = query.match(/ORDER BY (.*) LIMIT/)[1] if query.include? "ORDER BY" model = query.match(/FROM ([aA-zZ.aA-zZ]*)/i)[1] columns = if query.include? "SELECT *" [] else query.match(/SELECT (.*) FROM/)[1].strip.downcase.split(", ") end records = @client.execute_kw(connection[:database], @uid, connection[:password], model, "search_read", [], { limit: limit, offset: offset, order: order, 'fields': columns }) records.map do |row| RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message end end
def read(sync_config)
def read(sync_config) connection_config = sync_config.source.connection_specification.with_indifferent_access create_connection(connection_config) query = sync_config.model.query query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil? query(connection_config, query) rescue StandardError => e handle_exception(e, { context: "ODOO:READ:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) end