From 7d29315c61af8123596f27b97f0e04da8baf678b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mar=C3=ADn=20Alcaraz?= Date: Tue, 13 Aug 2024 14:02:39 -0700 Subject: [PATCH] [Google Sheets] Add SyncMode support (#2214) Co-authored-by: Maryam Sharif --- .../__tests__/postSheet2.operations.test.ts | 104 ++++++++ .../src/destinations/google-sheets/index.ts | 4 +- .../postSheet2/generated-types.ts | 39 +++ .../google-sheets/postSheet2/index.ts | 106 ++++++++ .../google-sheets/postSheet2/operations2.ts | 248 ++++++++++++++++++ 5 files changed, 500 insertions(+), 1 deletion(-) create mode 100644 packages/destination-actions/src/destinations/google-sheets/__tests__/postSheet2.operations.test.ts create mode 100644 packages/destination-actions/src/destinations/google-sheets/postSheet2/generated-types.ts create mode 100644 packages/destination-actions/src/destinations/google-sheets/postSheet2/index.ts create mode 100644 packages/destination-actions/src/destinations/google-sheets/postSheet2/operations2.ts diff --git a/packages/destination-actions/src/destinations/google-sheets/__tests__/postSheet2.operations.test.ts b/packages/destination-actions/src/destinations/google-sheets/__tests__/postSheet2.operations.test.ts new file mode 100644 index 0000000000..25ecc585ee --- /dev/null +++ b/packages/destination-actions/src/destinations/google-sheets/__tests__/postSheet2.operations.test.ts @@ -0,0 +1,104 @@ +import { ExecuteInput } from '@segment/actions-core' +import { Settings } from '../generated-types' +import { Payload } from '../postSheet2/generated-types' +import PostSheet from '../postSheet2/index' +import { GoogleSheets, GetResponse } from '../googleapis/index' +import { CONSTANTS } from '../constants' + +jest.mock('../constants', () => ({ + CONSTANTS: { + MAX_CELLS: 300000 + } +})) + +const mockGoogleSheets = { + get: jest.fn(), + batchUpdate: jest.fn(), + append: jest.fn() +} + +jest.mock('../googleapis/index', () => { + const original = jest.requireActual('../googleapis/index') + return { + ...original, + GoogleSheets: jest.fn().mockImplementation(() => { + return mockGoogleSheets + }) + } +}) + +describe('Google Sheets', () => { + describe('postSheet2', () => { + beforeEach(() => { + mockGoogleSheets.get.mockClear() + mockGoogleSheets.batchUpdate.mockClear() + mockGoogleSheets.append.mockClear() + }) + + const add_data: Partial> = { + payload: [ + { + record_identifier: 'record_id', + spreadsheet_id: 'spreadsheet_id', + spreadsheet_name: 'spreadsheet_name', + data_format: 'data_format', + fields: { column1: 'value1', column2: 'value2' } + } + ], + syncMode: 'add' + } + + it('should call append if the new data is not found in get response', async () => { + const getResponse: Partial = { + values: [['unknown_id']] + } + + mockGoogleSheets.get.mockResolvedValue({ + data: getResponse + }) + + await PostSheet.performBatch?.(jest.fn(), add_data as ExecuteInput) + + expect(GoogleSheets).toHaveBeenCalled() + expect(mockGoogleSheets.get).toHaveBeenCalled() + expect(mockGoogleSheets.append).toHaveBeenCalled() + expect(mockGoogleSheets.batchUpdate).toHaveBeenCalled() // batchUpdate always gets called to write columns + }) + + it('should call update (and not append) if the new data is found in get response', async () => { + // Make sure the spreadsheet contains the event from the payload + const getResponse: Partial = { + values: [[add_data.payload?.[0].record_identifier as string]] + } + + add_data.syncMode = 'update' + + mockGoogleSheets.get.mockResolvedValue({ + data: getResponse + }) + + await PostSheet.performBatch?.(jest.fn(), add_data as ExecuteInput) + + expect(GoogleSheets).toHaveBeenCalled() + expect(mockGoogleSheets.get).toHaveBeenCalled() + expect(mockGoogleSheets.append).not.toHaveBeenCalled() + expect(mockGoogleSheets.batchUpdate).toHaveBeenCalled() + }) + + it('should fail because number of cells limit is reached', async () => { + // Make sure the spreadsheet contains the event from the payload + CONSTANTS.MAX_CELLS = 1 + const getResponse: Partial = { + values: [['id'], ['1234'], ['12345']] + } + + mockGoogleSheets.get.mockResolvedValue({ + data: getResponse + }) + + await expect( + PostSheet.performBatch?.(jest.fn(), add_data as ExecuteInput) + ).rejects.toThrowError('Sheet has reached maximum limit') + }) + }) +}) diff --git a/packages/destination-actions/src/destinations/google-sheets/index.ts b/packages/destination-actions/src/destinations/google-sheets/index.ts index 76cfd5f068..0e1f8bc177 100644 --- a/packages/destination-actions/src/destinations/google-sheets/index.ts +++ b/packages/destination-actions/src/destinations/google-sheets/index.ts @@ -2,6 +2,7 @@ import type { DestinationDefinition } from '@segment/actions-core' import type { Settings } from './generated-types' import postSheet from './postSheet' +import postSheet2 from './postSheet2' interface RefreshTokenResponse { access_token: string scope: string @@ -51,7 +52,8 @@ const destination: DestinationDefinition = { // }, actions: { - postSheet + postSheet, + postSheet2 } } diff --git a/packages/destination-actions/src/destinations/google-sheets/postSheet2/generated-types.ts b/packages/destination-actions/src/destinations/google-sheets/postSheet2/generated-types.ts new file mode 100644 index 0000000000..ccf4a47b88 --- /dev/null +++ b/packages/destination-actions/src/destinations/google-sheets/postSheet2/generated-types.ts @@ -0,0 +1,39 @@ +// Generated file. DO NOT MODIFY IT BY HAND. + +export interface Payload { + /** + * Property which uniquely identifies each row in the spreadsheet. + */ + record_identifier: string + /** + * The identifier of the spreadsheet. You can find this value in the URL of the spreadsheet. e.g. https://docs.google.com/spreadsheets/d/{SPREADSHEET_ID}/edit + */ + spreadsheet_id: string + /** + * The name of the spreadsheet. You can find this value on the tab at the bottom of the spreadsheet. Please provide a valid name of a sheet that already exists. + */ + spreadsheet_name: string + /** + * The way Google will interpret values. If you select raw, values will not be parsed and will be stored as-is. If you select user entered, values will be parsed as if you typed them into the UI. Numbers will stay as numbers, but strings may be converted to numbers, dates, etc. following the same rules that are applied when entering text into a cell via the Google Sheets UI. + */ + data_format: string + /** + * + * The fields to write to the spreadsheet. + * + * On the left-hand side, input the name of the field as it will appear in the Google Sheet. + * + * On the right-hand side, select the field from your data model that maps to the given field in your sheet. + * + * --- + * + * + */ + fields: { + [k: string]: unknown + } + /** + * Set as true to ensure Segment sends data to Google Sheets in batches. Please do not set to false. + */ + enable_batching?: boolean +} diff --git a/packages/destination-actions/src/destinations/google-sheets/postSheet2/index.ts b/packages/destination-actions/src/destinations/google-sheets/postSheet2/index.ts new file mode 100644 index 0000000000..72040717ad --- /dev/null +++ b/packages/destination-actions/src/destinations/google-sheets/postSheet2/index.ts @@ -0,0 +1,106 @@ +import { ActionDefinition, IntegrationError } from '@segment/actions-core' +import type { Settings } from '../generated-types' +import type { Payload } from './generated-types' + +import { processData } from './operations2' + +const action: ActionDefinition = { + title: 'Post Sheet (Simplified)', + description: 'Write values to a Google Sheets spreadsheet.', + defaultSubscription: 'event = "updated" or event = "new"', + syncMode: { + description: 'Define how the records from your destination will be synced.', + label: 'How to sync records', + default: 'upsert', + choices: [ + { + label: + 'If a record with the specified identifier is found, it will be updated. If not, a new row will be created.', + value: 'upsert' + }, + { + label: "Add a new record when the specified identifier doesn't exist. If it does, it will be skipped.", + value: 'add' + }, + { + label: + "Update a record if a match with the specified identifier is found. Do nothing if the row doesn't exist.", + value: 'update' + } + ] + }, + // TODO: Hide record_identifier and operation_type + fields: { + record_identifier: { + label: 'Record Identifier', + description: 'Property which uniquely identifies each row in the spreadsheet.', + type: 'string', + required: true, + default: { '@path': '$.__segment_id' } + }, + spreadsheet_id: { + label: 'Spreadsheet ID', + description: + 'The identifier of the spreadsheet. You can find this value in the URL of the spreadsheet. e.g. https://docs.google.com/spreadsheets/d/{SPREADSHEET_ID}/edit', + type: 'string', + required: true, + default: '' + }, + spreadsheet_name: { + label: 'Spreadsheet Name', + description: + 'The name of the spreadsheet. You can find this value on the tab at the bottom of the spreadsheet. Please provide a valid name of a sheet that already exists.', + type: 'string', + required: true, + default: 'Sheet1' + }, + data_format: { + label: 'Data Format', + description: + 'The way Google will interpret values. If you select raw, values will not be parsed and will be stored as-is. If you select user entered, values will be parsed as if you typed them into the UI. Numbers will stay as numbers, but strings may be converted to numbers, dates, etc. following the same rules that are applied when entering text into a cell via the Google Sheets UI.', + type: 'string', + required: true, + default: 'RAW', + choices: [ + { label: 'Raw', value: 'RAW' }, + { label: 'User Entered', value: 'USER_ENTERED' } + ] + }, + fields: { + label: 'Fields', + description: ` + The fields to write to the spreadsheet. + + On the left-hand side, input the name of the field as it will appear in the Google Sheet. + + On the right-hand side, select the field from your data model that maps to the given field in your sheet. + + --- + + `, + type: 'object', + required: true, + defaultObjectUI: 'keyvalue:only' + }, + enable_batching: { + type: 'boolean', + label: 'Batch Data to Google Sheets', + description: 'Set as true to ensure Segment sends data to Google Sheets in batches. Please do not set to false.', + default: true + } + }, + perform: (request, { payload, syncMode }) => { + if (!syncMode) { + throw new IntegrationError('Sync mode is required for this action.', 'INVALID_REQUEST_DATA', 400) + } + return processData(request, [payload], syncMode) + }, + performBatch: (request, { payload, syncMode }) => { + if (!syncMode) { + throw new IntegrationError('Sync mode is required for this action.', 'INVALID_REQUEST_DATA', 400) + } + return processData(request, payload, syncMode) + } +} + +export default action diff --git a/packages/destination-actions/src/destinations/google-sheets/postSheet2/operations2.ts b/packages/destination-actions/src/destinations/google-sheets/postSheet2/operations2.ts new file mode 100644 index 0000000000..67c8393f2e --- /dev/null +++ b/packages/destination-actions/src/destinations/google-sheets/postSheet2/operations2.ts @@ -0,0 +1,248 @@ +import type { Payload } from './generated-types' +import { IntegrationError, RequestClient } from '@segment/actions-core' +import { GoogleSheets, GetResponse } from '../googleapis/index' +import { CONSTANTS } from '../constants' + +import A1 from '@segment/a1-notation' + +type Fields = { + [k: string]: string +} + +interface Identifiable { + identifier: string +} + +interface Indexable { + index: number +} + +type UpdateBatch = Identifiable & Indexable & { event: Fields } +type AppendBatch = Identifiable & { event: Fields } + +/** + * Invariant settings that are common to all events in the payload. + */ +export type MappingSettings = { + spreadsheetId: string + spreadsheetName: string + dataFormat: string + columns: string[] +} + +// The data in the spreadsheet begins in row 2, because it's assumed that the first row will contain the column names. +const DATA_ROW_OFFSET = 2 + +/** + * Utility function that converts the event properties into an array of strings that Google Sheets API can understand. + * Note that the identifier is forced as the first column. + * @param identifier value used to imbue fields with a uniqueness constraint + * @param fields list of properties contained in the event + * @param columns list of properties that will be committed to the spreadsheet + * @returns a string object that has used the `fields` data to populate the `columns` ordering + * + * @example + * fields: + { + "CLOSE_DATE": "2022-07-08T00:00:00Z", + "CLOSE_DATE_EOQ": "2022-07-08", + "ENTRY_POINT": "Website Demo Request", + "E_ARR_POST_LAUNCH_C": "100000.0", + "FINANCE_ENTRY_POINT": "Inbound High Intent" + } + columns: ["ENTRY_POINT", "MISSING_COLUMN", "CLOSE_DATE"] + + return => ["Website Demo Request", "", "2022-07-08T00:00:00Z"] + + */ +const generateColumnValuesFromFields = (identifier: string, fields: Fields, columns: string[]) => { + const retVal = columns.map((col) => fields[col] ?? '') + retVal.unshift(identifier) // Write identifier as first column + return retVal +} + +/** + * Processes the response of the Google Sheets GET call and parses the events into separate operation buckets. + * @param response result of the Google Sheets API get call + * @param events data to be written to the spreadsheet + * @param mappingSettings + * @param syncMode + * @returns + */ +function processGetSpreadsheetResponse( + response: GetResponse, + events: Payload[], + mappingSettings: MappingSettings, + syncMode: string +) { + const numColumns = mappingSettings.columns.length + const numRows = response.values?.length + + if (numRows * numColumns > CONSTANTS.MAX_CELLS) { + throw new IntegrationError('Sheet has reached maximum limit', 'INVALID_REQUEST_DATA', 400) + } + + const updateBatch: UpdateBatch[] = [] + const appendBatch: AppendBatch[] = [] + + // Use a hashmap to efficiently find if the event already exists in the spreadsheet (update) or not (append). + const eventMap = new Map(events.map((e) => [e.record_identifier, e])) + + // The operation type used to come from RETL as an event, and it was either "new" or "updated". + // Now it comes from Sync Mode, and it can be "upsert", "add" or "update". + // If the operation type is "upsert" or "add", we should append the event to the sheet. + // If the operation type is "update", we should update the event in the sheet. + // Delete is not supported. + + if (response.values && response.values.length > 0) { + for (let i = 0; i < response.values.length; i++) { + const targetIdentifier = response.values[i][0] + if (eventMap.has(targetIdentifier)) { + // The event being processed already exists in the spreadsheet. + // In here we will determine if we should update the event or not. + // SyncModes that are not "upsert" or "update" will not update the event. + const targetEvent = eventMap.get(targetIdentifier) as Payload + if (syncMode == 'upsert' || syncMode == 'update') { + updateBatch.push({ + identifier: targetIdentifier, + event: targetEvent.fields as Fields, + index: i + }) + } + eventMap.delete(targetIdentifier) + } + } + } + + // At this point, eventMap contains all the rows we couldn't find in the spreadsheet. + // If the sync mode is "upsert" or "add", we should append the event to the sheet. + eventMap.forEach((value, key) => { + if (syncMode == 'upsert' || syncMode == 'add') { + appendBatch.push({ + identifier: key, + event: value.fields as Fields + }) + } + }) + + return { appendBatch, updateBatch } +} + +/** + * Commits all passed events to the correct row in the spreadsheet, as well as the columns header row. + * @param mappingSettings configuration object detailing parameters for the call + * @param updateBatch array of events to commit to the spreadsheet + * @param gs interface object capable of interacting with Google Sheets API + */ +async function processUpdateBatch(mappingSettings: MappingSettings, updateBatch: UpdateBatch[], gs: GoogleSheets) { + // Utility function used to calculate which range an event should be written to + const getRange = (targetIndex: number, columnCount: number) => { + const targetRange = new A1(1, targetIndex) + targetRange.addX(columnCount) + return targetRange.toString() + } + + const batchPayload = updateBatch.map(({ identifier, event, index }) => { + // Flatten event fields to be just the values + const values = generateColumnValuesFromFields(identifier, event, mappingSettings.columns) + return { + range: `${mappingSettings.spreadsheetName}!${getRange(index + DATA_ROW_OFFSET, values.length)}`, + values: [values] + } + }) + + // Always add to the payload a write to the first row (containing column names) in case that columns have been updated + const headerRowRange = new A1(1, 1, 1, mappingSettings.columns.length + 1) + batchPayload.push({ + range: `${mappingSettings.spreadsheetName}!${headerRowRange.toString()}`, + values: [['id', ...mappingSettings.columns]] + }) + + return gs.batchUpdate(mappingSettings, batchPayload) +} + +// TODO: Re-enable delete once locking is supported. +/** + * Clears all passed events from the spreadsheet. + * @param mappingSettings configuration object detailing parameters for the call + * @param updateBatch array of events to clear from the spreadsheet + * @param gs interface object capable of interacting with Google Sheets API + */ +// async function processDeleteBatch( +// mappingSettings: MappingSettings, +// deleteBatch: { identifier: string; targetIndex: number }[], +// gs: GoogleSheets +// ) { +// if (deleteBatch.length <= 0) { +// return +// } + +// // TODO: fix a1-notation package to support 1:1 notation +// const deletePayload = deleteBatch.map(({ targetIndex }) => { +// return { +// range: `${mappingSettings.spreadsheetName}!${targetIndex}:${targetIndex}` +// } +// }) + +// return gs +// .batchClear(mappingSettings, deletePayload) +// .then(() => { +// console.log('delete') +// }) +// .catch((error) => { +// console.log(error) +// }) +// } + +/** + * Commits all passed events to the bottom of the spreadsheet. + * @param mappingSettings configuration object detailing parameters for the call + * @param appendBatch array of events to commit to the spreadsheet + * @param gs interface object capable of interacting with Google Sheets API + * @returns + */ +async function processAppendBatch(mappingSettings: MappingSettings, appendBatch: AppendBatch[], gs: GoogleSheets) { + if (appendBatch.length <= 0) { + return + } + + // Flatten event fields to be just the values + const values = appendBatch.map(({ identifier, event }) => + generateColumnValuesFromFields(identifier, event, mappingSettings.columns) + ) + + return gs.append(mappingSettings, `A${DATA_ROW_OFFSET}`, values) +} + +/** + * Takes an array of events and dynamically decides whether to append, update or delete rows from the spreadsheet. + * @param request request object used to perform HTTP calls + * @param events array of events to commit to the spreadsheet + * @param syncMode + */ +async function processData(request: RequestClient, events: Payload[], syncMode: string) { + // These are assumed to be constant across all events + const mappingSettings = { + spreadsheetId: events[0].spreadsheet_id, + spreadsheetName: events[0].spreadsheet_name, + dataFormat: events[0].data_format, + columns: Object.getOwnPropertyNames(events[0].fields) + } + + const gs: GoogleSheets = new GoogleSheets(request) + + // Get all of the row identifiers (assumed to be in the first column A) + const response = await gs.get(mappingSettings, `A${DATA_ROW_OFFSET}:A`) + + // Use the retrieved row identifiers along with the incoming events to decide which ones should be appended or updated. + const { appendBatch, updateBatch } = processGetSpreadsheetResponse(response.data, events, mappingSettings, syncMode) + + const promises = [ + processUpdateBatch(mappingSettings, updateBatch, gs), + processAppendBatch(mappingSettings, appendBatch, gs) + ] + + return await Promise.all(promises) +} + +export { processData }