lib/multiwoven/integrations/source/google_drive/client.rb



# frozen_string_literal: true

module Multiwoven::Integrations::Source
  module GoogleDrive
    include Multiwoven::Integrations::Core

    FIELDS = "files(id, name, parents, mimeType, fileExtension, size, createdTime, modifiedTime), nextPageToken"
    MAX_PER_PAGE = 1000
    MIMETYPE_GOOGLE_DRIVE_FOLDER = "mimeType = 'application/vnd.google-apps.folder'"

    class Client < UnstructuredSourceConnector
      def check_connection(connection_config)
        connection_config = connection_config.with_indifferent_access

        if unstructured_data?(connection_config) || semistructured_data?(connection_config)
          create_drive_connection(connection_config)
          folder_name = connection_config[:folder_name]
          build_query(folder_name)
        else
          create_connection(connection_config)
        end
        success_status
      rescue StandardError, NotImplementedError => e
        failure_status(e)
      end

      def discover(connection_config)
        connection_config = connection_config.with_indifferent_access
        streams = if unstructured_data?(connection_config)
                    [create_unstructured_stream]
                  elsif semistructured_data?(connection_config)
                    [create_semistructured_stream]
                  else
                    raise NotImplementedError, "Discovery failed: Structured data is not supported yet"
                  end
        catalog = Catalog.new(streams: streams)
        catalog.to_multiwoven_message
      rescue StandardError, NotImplementedError => e
        handle_exception(e, {
                           context: "GOOGLE_DRIVE:DISCOVER:EXCEPTION",
                           type: "error"
                         })
      end

      def read(sync_config)
        connection_config = sync_config.source.connection_specification.with_indifferent_access

        return handle_unstructured_data(sync_config) if unstructured_data?(connection_config) || semistructured_data?(connection_config)

        raise NotImplementedError, "Read failed: Structured data is not supported yet"
      rescue StandardError, NotImplementedError => e
        handle_exception(e, {
                           context: "GOOGLE_DRIVE:READ:EXCEPTION",
                           type: "error",
                           sync_id: sync_config.sync_id,
                           sync_run_id: sync_config.sync_run_id
                         })
      end

      private

      def create_connection(connection_config)
        raise NotImplementedError, "Connection failed: Structured data is not supported yet"
      end

      def create_drive_connection(connection_config)
        credentials = connection_config[:credentials_json]
        @google_drive = Google::Apis::DriveV3::DriveService.new
        @google_drive.authorization = Google::Auth::ServiceAccountCredentials.make_creds(
          json_key_io: StringIO.new(credentials.to_json),
          scope: GOOGLE_SHEETS_SCOPE
        )
      end

      def handle_unstructured_data(sync_config)
        connection_config = sync_config.source.connection_specification.with_indifferent_access
        folder_name = connection_config[:folder_name]
        command = sync_config.model.query.strip
        create_drive_connection(connection_config)

        case command
        when LIST_FILES_CMD
          list_files_in_folder(folder_name)
        when /^#{DOWNLOAD_FILE_CMD}\s+(.+)$/
          file_name = ::Regexp.last_match(1).strip
          file_name = file_name.gsub(/^["']|["']$/, "") # Remove leading/trailing quotes
          file_name = file_name.gsub("\\", "\\\\\\") # Escape backslashes
          download_file_to_local(folder_name, file_name, sync_config.sync_id)
        else
          raise ArgumentError, "Invalid command. Supported commands: #{LIST_FILES_CMD}, #{DOWNLOAD_FILE_CMD} <file_path>"
        end
      end

      def list_files_in_folder(folder_name)
        query = build_query(folder_name)
        records = get_files(@google_drive, query, 10_000, 0)
        records.map do |row|
          RecordMessage.new(
            data: {
              element_id: row.id,
              file_name: row.name,
              file_path: row.name,
              size: row.size,
              file_type: row.file_extension,
              created_date: row.created_time,
              modified_date: row.modified_time,
              text: ""
            },
            emitted_at: Time.now.to_i
          ).to_multiwoven_message
        end
      end

      def download_file_to_local(folder_name, file_name, sync_id)
        query = build_query(folder_name)
        download_path = ENV["FILE_DOWNLOAD_PATH"]
        file = if download_path
                 File.join(download_path, "syncs", sync_id, File.basename(file_name))
               else
                 Tempfile.new(["google_drive_file_syncs_#{sync_id}", File.extname(file_name)]).path
               end

        # Escape single quotes to prevent query injection
        escaped_name = file_name.gsub("'", "\\\\'")
        query = "#{query} and name = '#{escaped_name}'"

        records = get_files(@google_drive, query, 1, 0)
        raise StandardError, "File not found." if records.empty?

        @google_drive.get_file(records.first.id, download_dest: file)

        [RecordMessage.new(
          data: {
            element_id: records.first.id,
            local_path: file,
            file_name: file_name,
            file_path: file_name,
            size: records.first.size,
            file_type: records.first.file_extension,
            created_date: records.first.created_time,
            modified_date: records.first.modified_time,
            text: ""
          },
          emitted_at: Time.now.to_i
        ).to_multiwoven_message]
      rescue StandardError => e
        raise StandardError, "Failed to download file #{file_name}: #{e.message}"
      end

      def build_query(folder_name)
        raise ArgumentError, "Folder name is required" if folder_name.blank?

        # Escape single quotes to prevent query injection
        escaped_folder = folder_name.gsub("'", "\\\\'")
        folder_query = "#{MIMETYPE_GOOGLE_DRIVE_FOLDER} and (name = '#{escaped_folder}')"
        response = @google_drive.list_files(include_items_from_all_drives: true, supports_all_drives: true, q: folder_query, fields: FIELDS)
        raise ArgumentError, "Specified folder does not exist" if response.files.empty?

        parent_id = response.files.first.id
        "'#{parent_id}' in parents and mimeType != 'application/vnd.google-apps.folder'"
      end

      def get_files(client, query, limit, _offset)
        next_page_token = nil
        total_fetched = 0
        result = []

        while total_fetched < limit
          batch_limit = [MAX_PER_PAGE, limit - total_fetched].min
          response = if next_page_token
                       client.list_files(include_items_from_all_drives: true, supports_all_drives: true, q: query, fields: FIELDS, page_size: batch_limit, page_token: next_page_token)
                     else
                       client.list_files(include_items_from_all_drives: true, supports_all_drives: true, q: query, fields: FIELDS, page_size: batch_limit)
                     end
          break if response.files.empty?

          result.push(*response.files)
          next_page_token = response.next_page_token
          break unless response.next_page_token

          total_fetched += response.files.size
        end

        result
      end
    end
  end
end