lib/multiwoven/integrations/destination/http/client.rb



# frozen_string_literal: true

module Multiwoven
  module Integrations
    module Destination
      module Http
        include Multiwoven::Integrations::Core
        class Client < DestinationConnector
          MAX_CHUNK_SIZE = 10
          def check_connection(connection_config)
            connection_config = connection_config.with_indifferent_access
            destination_url = connection_config[:destination_url]
            headers = connection_config[:headers]
            request = Multiwoven::Integrations::Core::HttpClient.request(
              destination_url,
              HTTP_POST,
              payload: {},
              headers: headers
            )
            if success?(request)
              success_status
            else
              failure_status(nil)
            end
          rescue StandardError => e
            handle_exception(e, {
                               context: "HTTP:CHECK_CONNECTION:EXCEPTION",
                               type: "error"
                             })
            failure_status(e)
          end

          def discover(_connection_config = nil)
            catalog_json = read_json(CATALOG_SPEC_PATH)
            catalog = build_catalog(catalog_json)
            catalog.to_multiwoven_message
          rescue StandardError => e
            handle_exception(e, {
                               context: "HTTP:DISCOVER:EXCEPTION",
                               type: "error"
                             })
          end

          def write(sync_config, records, _action = "create")
            connection_config = sync_config.destination.connection_specification.with_indifferent_access
            url = connection_config[:destination_url]
            headers = connection_config[:headers]
            write_success = 0
            write_failure = 0
            records.each_slice(MAX_CHUNK_SIZE) do |chunk|
              payload = create_payload(chunk)
              response = Multiwoven::Integrations::Core::HttpClient.request(
                url,
                sync_config.stream.request_method,
                payload: payload,
                headers: headers
              )
              if success?(response)
                write_success += chunk.size
              else
                write_failure += chunk.size
              end
            rescue StandardError => e
              handle_exception(e, {
                                 context: "HTTP:RECORD:WRITE:EXCEPTION",
                                 type: "error",
                                 sync_id: sync_config.sync_id,
                                 sync_run_id: sync_config.sync_run_id
                               })
              write_failure += chunk.size
            end

            tracker = Multiwoven::Integrations::Protocol::TrackingMessage.new(
              success: write_success,
              failed: write_failure
            )
            tracker.to_multiwoven_message
          rescue StandardError => e
            handle_exception(e, {
                               context: "HTTP:RECORD:WRITE:EXCEPTION",
                               type: "error",
                               sync_id: sync_config.sync_id,
                               sync_run_id: sync_config.sync_run_id
                             })
          end

          private

          def create_payload(records)
            {
              "records" => records.map do |record|
                {
                  "fields" => record
                }
              end
            }
          end

          def extract_body(response)
            response_body = response.body
            JSON.parse(response_body) if response_body
          end
        end
      end
    end
  end
end