lib/multiwoven/integrations/destination/google_sheets/client.rb



# frozen_string_literal: true

module Multiwoven
  module Integrations
    module Destination
      module GoogleSheets
        include Multiwoven::Integrations::Core

        class Client < DestinationConnector
          prepend Multiwoven::Integrations::Core::Fullrefresher
          prepend Multiwoven::Integrations::Core::RateLimiter
          MAX_CHUNK_SIZE = 10_000

          def check_connection(connection_config)
            connection_config = connection_config.with_indifferent_access
            authorize_client(connection_config)
            fetch_google_spread_sheets(connection_config)
            success_status
          rescue StandardError => e
            failure_status(e)
          end

          def discover(connection_config)
            connection_config = connection_config.with_indifferent_access
            authorize_client(connection_config)
            spreadsheets = fetch_google_spread_sheets(connection_config)
            catalog = build_catalog_from_spreadsheets(spreadsheets, connection_config)
            catalog.to_multiwoven_message
          rescue StandardError => e
            handle_exception(e, {
                               context: "GOOGLE_SHEETS:CRM:DISCOVER:EXCEPTION",
                               type: "error"
                             })
          end

          def write(sync_config, records, action = "create")
            setup_write_environment(sync_config, action)
            process_record_chunks(records, sync_config)
          rescue StandardError => e
            handle_exception(e, {
                               context: "GOOGLE_SHEETS:CRM:WRITE:EXCEPTION",
                               type: "error",
                               sync_id: sync_config.sync_id,
                               sync_run_id: sync_config.sync_run_id
                             })
          end

          def clear_all_records(sync_config)
            setup_write_environment(sync_config, "clear")
            connection_specification = sync_config.destination.connection_specification.with_indifferent_access
            spreadsheet = fetch_google_spread_sheets(connection_specification)
            sheet_ids = spreadsheet.sheets.map(&:properties).map(&:sheet_id)

            delete_extra_sheets(sheet_ids)

            unless sheet_ids.empty?
              clear_response = clear_sheet_data(spreadsheet.sheets.first.properties.title)
              return control_message("Successfully cleared data.", "succeeded") if clear_response&.cleared_range
            end

            control_message("Failed to clear data.", "failed")
          rescue StandardError => e
            control_message(e.message, "failed")
          end

          private

          # To define the level of access granted to your app, you need to identify and declare authorization scopes which is provided by google scopse https://developers.google.com/sheets/api/scopes
          def authorize_client(config)
            credentials = config[:credentials_json]
            @client = Google::Apis::SheetsV4::SheetsService.new
            @client.authorization = Google::Auth::ServiceAccountCredentials.make_creds(
              json_key_io: StringIO.new(credentials.to_json),
              scope: GOOGLE_SHEETS_SCOPE
            )
          end

          # Extract spreadsheet id from the spreadsheet link and return the metadata for all the sheets
          def fetch_google_spread_sheets(connection_config)
            spreadsheet_id = extract_spreadsheet_id(connection_config[:spreadsheet_link])
            @client.get_spreadsheet(spreadsheet_id)
          end

          # dynamically builds catalog based on spreadsheet metadata
          def build_catalog_from_spreadsheets(spreadsheet, connection_config)
            catalog = build_catalog(load_catalog)
            @spreadsheet_id = extract_spreadsheet_id(connection_config[:spreadsheet_link])

            spreadsheet.sheets.each do |sheet|
              process_sheet_for_catalog(sheet, catalog)
            end

            catalog
          end

          # Builds catalog for the single spreadsheet based on column name
          def process_sheet_for_catalog(sheet, catalog)
            sheet_name, last_column_index = extract_sheet_properties(sheet)
            column_names = fetch_column_names(sheet_name, last_column_index)
            catalog.streams << generate_json_schema(column_names, sheet_name) if column_names
          end

          def extract_sheet_properties(sheet)
            [sheet.properties.title, sheet.properties.grid_properties.column_count]
          end

          def fetch_column_names(sheet_name, last_column_index)
            header_range = generate_header_range(sheet_name, last_column_index)
            spread_sheet_value(header_range)&.flatten
          end

          def spread_sheet_value(header_range)
            @spread_sheet_value ||= @client.get_spreadsheet_values(@spreadsheet_id, header_range).values
          end

          def generate_header_range(sheet_name, last_column_index)
            "#{sheet_name}!A1:#{column_index_to_letter(last_column_index)}1"
          end

          def column_index_to_letter(index)
            ("A".."ZZZ").to_a[index - 1]
          end

          def generate_json_schema(column_names, sheet_name)
            {
              name: sheet_name,
              action: "create",
              batch_support: true,
              batch_size: 10_000,
              json_schema: generate_properties_schema(column_names),
              supported_sync_modes: %w[incremental full_refresh]
            }.with_indifferent_access
          end

          def generate_properties_schema(column_names)
            properties = column_names.each_with_object({}) do |field, props|
              props[field] = { "type" => "string" }
            end

            { "$schema" => JSON_SCHEMA_URL, "type" => "object", "properties" => properties }
          end

          def setup_write_environment(sync_config, action)
            @action = sync_config.stream.action || action
            connection_specification = sync_config.destination.connection_specification.with_indifferent_access
            @spreadsheet_id = extract_spreadsheet_id(connection_specification[:spreadsheet_link])
            authorize_client(connection_specification)
          end

          def extract_spreadsheet_id(link)
            link[GOOGLE_SPREADSHEET_ID_REGEX, 1] || link
          end

          # Batch has a limit of sending 2MB data. So creating a chunk of records to meet that limit
          def process_record_chunks(records, sync_config)
            log_message_array = []
            write_success = 0
            write_failure = 0

            records.each_slice(MAX_CHUNK_SIZE) do |chunk|
              values = prepare_chunk_values(chunk, sync_config.stream)
              request, response = *update_sheet_values(values, sync_config.stream.name)
              write_success += values.size
              log_message_array << log_request_response("info", request, response)
            rescue StandardError => e
              handle_exception(e, {
                                 context: "GOOGLE_SHEETS:RECORD:WRITE:EXCEPTION",
                                 type: "error",
                                 sync_id: sync_config.sync_id,
                                 sync_run_id: sync_config.sync_run_id
                               })
              write_failure += chunk.size
              log_message_array << log_request_response("error", request, e.message)
            end
            tracking_message(write_success, write_failure, log_message_array)
          end

          # We need to format the data to adhere to google sheets API format. This converts the sync mapped data to 2D array format expected by google sheets API
          def prepare_chunk_values(chunk, stream)
            last_column_index = spread_sheet_value(stream.name).count
            fields = fetch_column_names(stream.name, last_column_index)

            chunk.map do |row|
              row_values = Array.new(fields.size, nil)
              row.each do |key, value|
                index = fields.index(key.to_s)
                row_values[index] = value if index
              end
              row_values
            end
          end

          def update_sheet_values(values, stream_name)
            row_count = spread_sheet_value(stream_name).count
            range = "#{stream_name}!A#{row_count + 1}"
            value_range = Google::Apis::SheetsV4::ValueRange.new(range: range, values: values)

            batch_update_request = Google::Apis::SheetsV4::BatchUpdateValuesRequest.new(
              value_input_option: "RAW",
              data: [value_range]
            )

            # TODO: Remove & this is added for the test to pass we need
            response = @client&.batch_update_values(@spreadsheet_id, batch_update_request)
            [batch_update_request, response]
          end

          def load_catalog
            read_json(CATALOG_SPEC_PATH)
          end

          def delete_extra_sheets(sheet_ids)
            # Leave one sheet intact as a spreadsheet must have at least one sheet.
            # Delete all other sheets.
            (sheet_ids.length - 1).times do |i|
              request = Google::Apis::SheetsV4::BatchUpdateSpreadsheetRequest.new(
                requests: [{ delete_sheet: { sheet_id: sheet_ids[i + 1] } }]
              )
              @client.batch_update_spreadsheet(@spreadsheet_id, request)
            end
          end

          def clear_sheet_data(sheet_title)
            clear_request = Google::Apis::SheetsV4::ClearValuesRequest.new
            @client&.clear_values(@spreadsheet_id, "#{sheet_title}!A2:Z", clear_request)
          end

          def control_message(message, status)
            ControlMessage.new(
              type: "full_refresh",
              emitted_at: Time.now.to_i,
              status: ConnectionStatusType[status],
              meta: { detail: message }
            ).to_multiwoven_message
          end
        end
      end
    end
  end
end