class Multiwoven::Integrations::Source::GoogleDrive::Client
def build_query(folder_name)
def build_query(folder_name) raise ArgumentError, "Folder name is required" if folder_name.blank? # Escape single quotes to prevent query injection escaped_folder = folder_name.gsub("'", "\\\\'") folder_query = "#{MIMETYPE_GOOGLE_DRIVE_FOLDER} and (name = '#{escaped_folder}')" response = @google_drive.list_files(include_items_from_all_drives: true, supports_all_drives: true, q: folder_query, fields: FIELDS) raise ArgumentError, "Specified folder does not exist" if response.files.empty? parent_id = response.files.first.id "'#{parent_id}' in parents and mimeType != 'application/vnd.google-apps.folder'" end
def check_connection(connection_config)
def check_connection(connection_config) connection_config = connection_config.with_indifferent_access if unstructured_data?(connection_config) || semistructured_data?(connection_config) create_drive_connection(connection_config) folder_name = connection_config[:folder_name] build_query(folder_name) else create_connection(connection_config) end success_status rescue StandardError, NotImplementedError => e failure_status(e) end
def create_connection(connection_config)
def create_connection(connection_config) raise NotImplementedError, "Connection failed: Structured data is not supported yet" end
def create_drive_connection(connection_config)
def create_drive_connection(connection_config) credentials = connection_config[:credentials_json] @google_drive = Google::Apis::DriveV3::DriveService.new @google_drive.authorization = Google::Auth::ServiceAccountCredentials.make_creds( json_key_io: StringIO.new(credentials.to_json), scope: GOOGLE_SHEETS_SCOPE ) end
def discover(connection_config)
def discover(connection_config) connection_config = connection_config.with_indifferent_access streams = if unstructured_data?(connection_config) [create_unstructured_stream] elsif semistructured_data?(connection_config) [create_semistructured_stream] else raise NotImplementedError, "Discovery failed: Structured data is not supported yet" end catalog = Catalog.new(streams: streams) catalog.to_multiwoven_message rescue StandardError, NotImplementedError => e handle_exception(e, { context: "GOOGLE_DRIVE:DISCOVER:EXCEPTION", type: "error" }) end
def download_file_to_local(folder_name, file_name, sync_id)
def download_file_to_local(folder_name, file_name, sync_id) query = build_query(folder_name) download_path = ENV["FILE_DOWNLOAD_PATH"] file = if download_path File.join(download_path, "syncs", sync_id, File.basename(file_name)) else Tempfile.new(["google_drive_file_syncs_#{sync_id}", File.extname(file_name)]).path end # Escape single quotes to prevent query injection escaped_name = file_name.gsub("'", "\\\\'") query = "#{query} and name = '#{escaped_name}'" records = get_files(@google_drive, query, 1, 0) raise StandardError, "File not found." if records.empty? @google_drive.get_file(records.first.id, download_dest: file) [RecordMessage.new( data: { element_id: records.first.id, local_path: file, file_name: file_name, file_path: file_name, size: records.first.size, file_type: records.first.file_extension, created_date: records.first.created_time, modified_date: records.first.modified_time, text: "" }, emitted_at: Time.now.to_i ).to_multiwoven_message] rescue StandardError => e raise StandardError, "Failed to download file #{file_name}: #{e.message}" end
def get_files(client, query, limit, _offset)
def get_files(client, query, limit, _offset) next_page_token = nil total_fetched = 0 result = [] while total_fetched < limit batch_limit = [MAX_PER_PAGE, limit - total_fetched].min response = if next_page_token client.list_files(include_items_from_all_drives: true, supports_all_drives: true, q: query, fields: FIELDS, page_size: batch_limit, page_token: next_page_token) else client.list_files(include_items_from_all_drives: true, supports_all_drives: true, q: query, fields: FIELDS, page_size: batch_limit) end break if response.files.empty? result.push(*response.files) next_page_token = response.next_page_token break unless response.next_page_token total_fetched += response.files.size end result end
def handle_unstructured_data(sync_config)
def handle_unstructured_data(sync_config) connection_config = sync_config.source.connection_specification.with_indifferent_access folder_name = connection_config[:folder_name] command = sync_config.model.query.strip create_drive_connection(connection_config) case command when LIST_FILES_CMD list_files_in_folder(folder_name) when /^#{DOWNLOAD_FILE_CMD}\s+(.+)$/ file_name = ::Regexp.last_match(1).strip file_name = file_name.gsub(/^["']|["']$/, "") # Remove leading/trailing quotes file_name = file_name.gsub("\\", "\\\\\\") # Escape backslashes download_file_to_local(folder_name, file_name, sync_config.sync_id) else raise ArgumentError, "Invalid command. Supported commands: #{LIST_FILES_CMD}, #{DOWNLOAD_FILE_CMD} <file_path>" end end
def list_files_in_folder(folder_name)
def list_files_in_folder(folder_name) query = build_query(folder_name) records = get_files(@google_drive, query, 10_000, 0) records.map do |row| RecordMessage.new( data: { element_id: row.id, file_name: row.name, file_path: row.name, size: row.size, file_type: row.file_extension, created_date: row.created_time, modified_date: row.modified_time, text: "" }, emitted_at: Time.now.to_i ).to_multiwoven_message end end
def read(sync_config)
def read(sync_config) connection_config = sync_config.source.connection_specification.with_indifferent_access return handle_unstructured_data(sync_config) if unstructured_data?(connection_config) || semistructured_data?(connection_config) raise NotImplementedError, "Read failed: Structured data is not supported yet" rescue StandardError, NotImplementedError => e handle_exception(e, { context: "GOOGLE_DRIVE:READ:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) end