class Multiwoven::Integrations::Destination::MicrosoftExcel::Client
def check_connection(connection_config)
def check_connection(connection_config) connection_config = connection_config.with_indifferent_access drive_id = create_connection(connection_config) if drive_id success_status else failure_status(nil) end rescue StandardError => e handle_exception(e, { context: "MICROSOFT:EXCEL:CHECK_CONNECTION:EXCEPTION", type: "error" }) failure_status(e) end
def create_connection(connection_config)
def create_connection(connection_config) token = connection_config[:token] response = Multiwoven::Integrations::Core::HttpClient.request( MS_EXCEL_AUTH_ENDPOINT, HTTP_GET, headers: auth_headers(token) ) JSON.parse(response.body)["id"] end
def create_streams(records, catalog_json)
def create_streams(records, catalog_json) group_by_table(records).flat_map do |_, record| record.map do |_, r| Multiwoven::Integrations::Protocol::Stream.new( name: r[:workbook], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]), request_rate_limit: catalog_json["request_rate_limit"] || 60, request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute", request_rate_concurrency: catalog_json["request_rate_concurrency"] || 1 ) end end end
def discover(connection_config)
def discover(connection_config) catalog_json = read_json(CATALOG_SPEC_PATH) connection_config = connection_config.with_indifferent_access token = connection_config[:token] drive_id = create_connection(connection_config) records = get_file(token, drive_id) records.each do |record| file_id = record[:id] record[:worksheets] = get_file_data(token, drive_id, file_id) end catalog = Catalog.new(streams: create_streams(records, catalog_json)) catalog.to_multiwoven_message rescue StandardError => e handle_exception(e, { context: "MICROSOFT:EXCEL:DISCOVER:EXCEPTION", type: "error" }) end
def get_all_sheets(token, drive_id, item_id)
def get_all_sheets(token, drive_id, item_id) base_url = format(MS_EXCEL_WORKSHEETS_API, drive_id: drive_id, item_id: item_id) worksheet_response = Multiwoven::Integrations::Core::HttpClient.request( base_url, HTTP_GET, headers: auth_headers(token) ) JSON.parse(worksheet_response.body)["value"] end
def get_file(token, drive_id)
def get_file(token, drive_id) url = format(MS_EXCEL_FILES_API, drive_id: drive_id) response = Multiwoven::Integrations::Core::HttpClient.request( url, HTTP_GET, headers: auth_headers(token) ) files = JSON.parse(response.body)["value"] excel_files = files.select { |file| file["name"].match(/\.(xlsx|xls|xlsm)$/) } excel_files.map { |file| { name: file["name"], id: file["id"] } } end
def get_file_data(token, drive_id, item_id)
def get_file_data(token, drive_id, item_id) result = [] worksheets_data = get_all_sheets(token, drive_id, item_id) worksheets_data.each do |sheet| sheet_name = sheet["name"] sheet_url = format(MS_EXCEL_SHEET_RANGE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name) sheet_response = Multiwoven::Integrations::Core::HttpClient.request( sheet_url, HTTP_GET, headers: auth_headers(token) ) sheets_data = JSON.parse(sheet_response.body) column_names = if sheets_data.key?("error") ["Column A"] else sheets_data["values"].first end result << { sheet_name: sheet_name, column_names: column_names } end result end
def get_table(token, drive_id, item_id, sheet_name)
def get_table(token, drive_id, item_id, sheet_name) table_url = format(MS_EXCEL_TABLE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name) response = Multiwoven::Integrations::Core::HttpClient.request( table_url, HTTP_GET, headers: auth_headers(token) ) JSON.parse(response.body)["value"].first end
def group_by_table(records)
def group_by_table(records) result = {} records.each_with_index do |entries, entries_index| entries[:worksheets].each_with_index do |sheet, entry_index| workbook_sheet = "#{entries[:name]}, #{sheet[:sheet_name]}" columns = sheet[:column_names].map do |column_name| column_name = "empty column" if column_name.empty? { column_name: column_name, data_type: "String", is_nullable: true } end result[entries_index] ||= {} result[entries_index][entry_index] = { workbook: workbook_sheet, columns: columns } end end result end
def process_write_request(write_url, payload, token, sync_config)
def process_write_request(write_url, payload, token, sync_config) write_success = 0 write_failure = 0 log_message_array = [] begin response = Multiwoven::Integrations::Core::HttpClient.request( write_url, HTTP_POST, payload: payload, headers: auth_headers(token) ) if success?(response) write_success += 1 else write_failure += 1 end log_message_array << log_request_response("info", [HTTP_POST, write_url, payload], response) rescue StandardError => e handle_exception(e, { context: "MICROSOFT:EXCEL:RECORD:WRITE:EXCEPTION", type: "error", sync_id: sync_config.sync_id, sync_run_id: sync_config.sync_run_id }) write_failure += 1 log_message_array << log_request_response("error", [HTTP_POST, write_url, payload], e.message) end tracking_message(write_success, write_failure, log_message_array) end
def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert") connection_config = sync_config.destination.connection_specification.with_indifferent_access token = connection_config[:token] file_name = sync_config.stream.name.split(", ").first sheet_name = sync_config.stream.name.split(", ").last drive_id = create_connection(connection_config) excel_files = get_file(token, drive_id) worksheet = excel_files.find { |file| file[:name] == file_name } item_id = worksheet[:id] table = get_table(token, drive_id, item_id, sheet_name) write_url = format(MS_EXCEL_TABLE_ROW_WRITE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name, table_name: table["name"]) payload = { values: records.map(&:values) } process_write_request(write_url, payload, token, sync_config) end