class Multiwoven::Integrations::Source::SalesforceConsumerGoodsCloud::Client
rubocop:disable Metrics/ClassLength
def authenticate_client
def authenticate_client @client.authenticate! end
def check_connection(connection_config)
def check_connection(connection_config) connection_config = connection_config.with_indifferent_access initialize_client(connection_config) authenticate_client success_status rescue StandardError => e failure_status(e) end
def create_connection(connection_config)
def create_connection(connection_config) initialize_client(connection_config) end
def create_json_schema_for_object(metadata)
def create_json_schema_for_object(metadata) fields_schema = metadata["fields"].map do |field| { "#{field[:name]}": salesforce_field_to_json_schema_type(field) } end.reduce(:merge) json_schema = { "$schema": "http://json-schema.org/draft-07/schema#", "title": metadata["name"], "type": "object", "additionalProperties": true, "properties": fields_schema } required = metadata["fields"].map do |field| field["name"] if field["nillable"] == false end.compact primary_key = metadata["fields"].map do |field| field["name"] if field["nillable"] == false && field["unique"] == true end.compact { "name": metadata["name"], "action": "create", "json_schema": json_schema, "required": required, "supported_sync_modes": %w[full_refresh incremental], "source_defined_cursor": true, "default_cursor_field": ["updated"], "source_defined_primary_key": [primary_key] } end
def discover(connection_config)
def discover(connection_config) connection_config = connection_config.with_indifferent_access initialize_client(connection_config) catalog = build_catalog(load_catalog.with_indifferent_access) streams = catalog[:streams] SALESFORCE_OBJECTS.each do |object| object_description = @client.describe(object) streams << JSON.parse(create_json_schema_for_object(object_description).to_json) end catalog.to_multiwoven_message rescue StandardError => e handle_exception("SALESFORCE:CRM:DISCOVER:EXCEPTION", "error", e) end
def failure_status(error)
def failure_status(error) ConnectionStatus.new(status: ConnectionStatusType["failed"], message: error.message).to_multiwoven_message end
def initialize_client(config)
def initialize_client(config) config = config.with_indifferent_access @client = Restforce.new(username: config[:username], password: config[:password] + config[:security_token], host: config[:host], client_id: config[:client_id], client_secret: config[:client_secret], api_version: API_VERSION) end
def load_catalog
def load_catalog read_json(CATALOG_SPEC_PATH) end
def log_debug(message)
def log_debug(message) Multiwoven::Integrations::Service.logger.debug(message) end
def query(connection, query)
def query(connection, query) exclude_keys = ["attributes"] queried_data = connection.query(query) results = queried_data.map do |record| record.reject { |key, _| exclude_keys.include?(key) } end results.map do |row| RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message end end
def read(sync_config)
def read(sync_config) connection_config = sync_config.source.connection_specification.with_indifferent_access initialize_client(connection_config) return [] if sync_config.offset&.> 2000 # TODO: Salesforce imposes a limit on the use of OFFSET in SOQL queries, where you cannot skip(offset) more than 2000 records. # This limitation can hinder the retrieval of large datasets in a single query. # To overcome this, we need a cursor-based pagination strategy instead of relying on OFFSET. # query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil? query = sync_config.model.query exclude_keys = ["attributes"] queried_data = @client.query(query) results = queried_data.map do |record| record.reject { |key, _| exclude_keys.include?(key) } end results.map do |row| RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message end rescue StandardError => e handle_exception("SALESFORCE:CRM:WRITE:EXCEPTION", "error", e) end
def salesforce_field_to_json_schema_type(sf_field) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/PerceivedComplexity
def salesforce_field_to_json_schema_type(sf_field) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/PerceivedComplexity case sf_field["type"] when "string", "Email", "Phone", "Text", "TextArea", "TextEncrypted", "URL", "Picklist (Single)" if sf_field["nillable"] { "type": %w[string null] } else { "type": "string" } end when "double", "Currency", "Percent" if sf_field["nillable"] { "type": %w[number null] } else { "type": "number" } end when "boolean", "Checkbox" if sf_field["nillable"] { "type": %w[boolean null] } else { "type": "boolean" } end when "int", "AutoNumber" if sf_field["nillable"] { "type": %w[integer null] } else { "type": "integer" } end when "date" if sf_field["nillable"] { "type": %w[string null], "format": "date" } else { "type": "string", "format": "date" } end when "datetime", "DateTime" if sf_field["nillable"] { "type": %w[string null], "format": "date-time" } else { "type": "string", "format": "date-time" } end when "time" if sf_field["nillable"] { "type": %w[string null], "format": "time" } else { "type": "string", "format": "time" } end when "textarea", "Text Area (Long)", "Text Area (Rich)" if sf_field["nillable"] { "type": %w[string null] } else { "type": "string" } end when "picklist", "multipicklist", "Picklist (Multi-select)" if sf_field[:picklistValues] && sf_field["nillable"] enum_values = sf_field[:picklistValues].map { |val| val["value"] } { "type": %w[array null], "items": { "type": "string" }, "enum": enum_values } elsif sf_field[:picklistValues] enum_values = sf_field[:picklistValues].map { |val| val["value"] } { "type": "array", "items": { "type": "string" }, "enum": enum_values } else { "type": "array", "items": { "type": "string" } } end when "reference", "Reference (Lookup & Master-Detail)" if sf_field["nillable"] { "type": %w[string null] } else { "type": "string" } end when "location", "Geolocation" if sf_field["nillable"] { "type": %w[object null], "properties": { "latitude": { "type": "number" }, "longitude": { "type": "number" } } } else { "type": "object", "properties": { "latitude": { "type": "number" }, "longitude": { "type": "number" } } } end else if sf_field["nillable"] { "type": %w[string null] } else { "type": "string" } end end end
def success_status
def success_status ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message end
def tracking_message(success, failure)
def tracking_message(success, failure) Multiwoven::Integrations::Protocol::TrackingMessage.new( success: success, failed: failure ).to_multiwoven_message end