lib/multiwoven/integrations/source/odoo/client.rb



# frozen_string_literal: true

module Multiwoven::Integrations::Source
  module Odoo
    include Multiwoven::Integrations::Core
    class Client < SourceConnector
      def check_connection(connection_config)
        connection_config = connection_config.with_indifferent_access
        create_connection(connection_config)
        success_status
      rescue StandardError => e
        failure_status(e)
      end

      def discover(connection_config)
        connection_config = connection_config.with_indifferent_access
        create_connection(connection_config)

        models = @client.execute_kw(connection_config[:database], @uid, connection_config[:password],
                                    "ir.model", "search_read", [[["transient", "=", false], ["abstract", "=", false]]], { 'fields': %w[name model] })

        catalog = Catalog.new(streams: create_streams(connection_config, models))
        catalog.to_multiwoven_message
      rescue StandardError => e
        handle_exception(e, {
                           context: "ODOO:DISCOVER:EXCEPTION",
                           type: "error"
                         })
      end

      def read(sync_config)
        connection_config = sync_config.source.connection_specification.with_indifferent_access
        create_connection(connection_config)
        query = sync_config.model.query
        query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?
        query(connection_config, query)
      rescue StandardError => e
        handle_exception(e, {
                           context: "ODOO:READ:EXCEPTION",
                           type: "error",
                           sync_id: sync_config.sync_id,
                           sync_run_id: sync_config.sync_run_id
                         })
      end

      private

      def create_streams(connection_config, models)
        models.map do |model|
          fields = @client.execute_kw(connection_config[:database], @uid, connection_config[:password],
                                      model["model"], "fields_get", [], { 'attributes': %w[name type] })
          Multiwoven::Integrations::Protocol::Stream.new(name: model["model"], action: StreamAction["fetch"],
                                                         supported_sync_modes: %w[incremental], json_schema: convert_to_json_schema(fields))
        end
      end

      def convert_to_json_schema(fields)
        json_schema = {
          "type" => "object",
          "properties" => {}
        }
        fields.each do |field|
          column_name = field[1]["name"]
          type = field[1]["type"]
          json_schema["properties"][column_name] = {
            "type" => type
          }
        end
        json_schema
      end

      def create_connection(connection_config)
        common = XMLRPC::Client.new2("#{connection_config[:url]}/xmlrpc/2/common")
        common.call("version")
        @uid = common.call("authenticate", connection_config[:database], connection_config[:username],
                           connection_config[:password], { 'raise_exception': true })
        @client = XMLRPC::Client.new2("#{connection_config[:url]}/xmlrpc/2/object").proxy
        connection_config
      end

      def query(connection, query)
        limit = 0
        offset = 0
        order = "id DESC"

        limit = query.match(/LIMIT (\d+)/)[1].to_i if query.include? "LIMIT"
        offset = query.match(/OFFSET (\d+)/)[1].to_i if query.include? "OFFSET"
        order = query.match(/ORDER BY (.*) LIMIT/)[1] if query.include? "ORDER BY"
        model = query.match(/FROM ([aA-zZ.aA-zZ]*)/i)[1]

        columns = if query.include? "SELECT *"
                    []
                  else
                    query.match(/SELECT (.*) FROM/)[1].strip.downcase.split(", ")
                  end

        records = @client.execute_kw(connection[:database], @uid, connection[:password],
                                     model, "search_read", [], { limit: limit,
                                                                 offset: offset, order: order, 'fields': columns })
        records.map do |row|
          RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
        end
      end
    end
  end
end