lib/multiwoven/integrations/destination/amazon_s3/client.rb



# frozen_string_literal: true

module Multiwoven::Integrations::Destination
  module AmazonS3
    include Multiwoven::Integrations::Core
    class Client < DestinationConnector
      def check_connection(connection_config)
        connection_config = connection_config.with_indifferent_access
        conn = create_connection(connection_config)
        conn.head_bucket(bucket: connection_config[:bucket_name])
        ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
      rescue StandardError => e
        ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
      end

      def discover(_connection_config = nil)
        catalog_json = read_json(CATALOG_SPEC_PATH)
        catalog = build_catalog(catalog_json)
        catalog.to_multiwoven_message
      rescue StandardError => e
        handle_exception(e, {
                           context: "AMAZONS3:DISCOVER:EXCEPTION",
                           type: "error"
                         })
      end

      def write(sync_config, records, _action = "destination_insert")
        records_size = records.size
        log_message_array = []
        write_success = upload_csv_content(sync_config, records)
        write_failure = records_size - write_success
        log_message_array << log_request_response("info", @args, @response)
        tracking_message(write_success, write_failure, log_message_array)
      rescue StandardError => e
        handle_exception(e, {
                           context: "AMAZONS3:WRITE:EXCEPTION",
                           type: "error",
                           sync_id: sync_config.sync_id,
                           sync_run_id: sync_config.sync_run_id
                         })
      end

      private

      def create_connection(connection_config)
        Aws::S3::Client.new(
          region: connection_config[:region],
          access_key_id: connection_config[:access_key_id],
          secret_access_key: connection_config[:secret_access_key]
        )
      end

      def upload_csv_content(sync_config, records)
        connection_config = sync_config.destination.connection_specification.with_indifferent_access
        conn = create_connection(connection_config)
        file_name = generate_local_file_name(connection_config)
        csv_content = generate_csv_content(records)
        begin
          @args = ["create", connection_config[:bucket_name], "#{connection_config[:file_path]}#{file_name}", csv_content]
          @response = conn.put_object(
            bucket: connection_config[:bucket_name],
            key: "#{connection_config[:file_path]}#{file_name}",
            body: csv_content
          )
          write_success = records.size
        rescue StandardError => e
          handle_exception(e, {
                             context: "AMAZONS3:RECORD:WRITE:EXCEPTION",
                             type: "error",
                             sync_id: sync_config.sync_id,
                             sync_run_id: sync_config.sync_run_id
                           })
          write_success = 0
        end
        write_success
      end

      def generate_csv_content(records)
        CSV.generate do |csv|
          headers = records.first.keys
          csv << headers
          records.each { |record| csv << record.values_at(*headers) }
        end
      end

      def generate_local_file_name(connection_config)
        timestamp = Time.now.strftime("%Y%m%d-%H%M%S")
        "#{connection_config[:file_name]}_#{timestamp}.#{connection_config[:format_type]}"
      end
    end
  end
end