lib/multiwoven/integrations/core/source_connector.rb



# frozen_string_literal: true

module Multiwoven
  module Integrations::Core
    class SourceConnector < BaseConnector
      # accepts Protocol::SyncConfig
      def read(_sync_config)
        raise "Not implemented"
        # setup sync configs
        # call query(connection, query)
        # Returns list of RecordMessage
      end

      private

      # This needs to be implemented as private method
      # In every source connector. This will be used for model preview
      def create_connection(connector_config)
        # return a connection to the client's source
      end

      # This needs to be implemented as private method
      # In every source connector. This will be used for model preview
      def query(connection, query)
        # return list of RecordMessage
      end

      def batched_query(sql_query, limit, offset)
        offset = offset.to_i
        limit = limit.to_i
        raise ArgumentError, "Offset and limit must be non-negative" if offset.negative? || limit.negative?

        # Removing any trailing semicolons
        sql_query.chomp!(";")

        # Checking if the query already has a LIMIT clause
        raise ArgumentError, "Query already contains a LIMIT clause" if sql_query.match?(/LIMIT \d+/i)

        # Appending the LIMIT and OFFSET clauses to the SQL query
        "#{sql_query} LIMIT #{limit} OFFSET #{offset}"
      end

      def send_request(options = {})
        Multiwoven::Integrations::Core::HttpClient.request(
          options[:url],
          options[:http_method],
          payload: options[:payload],
          headers: options[:headers],
          config: options[:config]
        )
      end

      def send_streaming_request(options = {})
        Multiwoven::Integrations::Core::StreamingHttpClient.request(
          options[:url],
          options[:http_method],
          payload: options[:payload],
          headers: options[:headers],
          config: options[:config]
        ) do |chunk|
          yield chunk if block_given? # Pass each chunk for processing (streaming response)
        end
      end
    end
  end
end