From b3640eee48959a32b316e7015160a260e2323509 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 12 Feb 2025 13:53:55 +0000 Subject: [PATCH] fix: update sync plugin to work with the latest Electric sync server (#531) * update sync plugin * docs stylecheck * Fit types in tests --------- Co-authored-by: tudor --- .changeset/gentle-ways-knock.md | 5 +++++ docs/docs/api.md | 3 ++- packages/pglite-sync/package.json | 2 +- packages/pglite-sync/src/index.ts | 23 ++++++++++++++++++----- packages/pglite-sync/test/sync.test.ts | 25 ------------------------- pnpm-lock.yaml | 11 ++++++----- 6 files changed, 32 insertions(+), 37 deletions(-) create mode 100644 .changeset/gentle-ways-knock.md diff --git a/.changeset/gentle-ways-knock.md b/.changeset/gentle-ways-knock.md new file mode 100644 index 00000000..7963a65c --- /dev/null +++ b/.changeset/gentle-ways-knock.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite-sync': patch +--- + +Update the sync plugin to work with the latest Electric sync server diff --git a/docs/docs/api.md b/docs/docs/api.md index df3b3d90..99c2d2f1 100644 --- a/docs/docs/api.md +++ b/docs/docs/api.md @@ -362,9 +362,10 @@ await pg.describeQuery('SELECT * FROM test WHERE name = $1', ['test']) ``` ### clone + `.clone(): Promise` -Clones the current instance. This is useful when a series of operations, like unit or integration test, need to be run on the same database without having to recreate the database each time, or for each test. +Clones the current instance. This is useful when a series of operations, like unit or integration test, need to be run on the same database without having to recreate the database each time, or for each test. ## Properties diff --git a/packages/pglite-sync/package.json b/packages/pglite-sync/package.json index 8ade39ad..1dcfd0dc 100644 --- a/packages/pglite-sync/package.json +++ b/packages/pglite-sync/package.json @@ -45,7 +45,7 @@ "dist" ], "dependencies": { - "@electric-sql/client": "~0.9.0" + "@electric-sql/client": "1.0.0-beta.3" }, "devDependencies": { "@electric-sql/pglite": "workspace:*", diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index 4bd8ce58..0531fe5b 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -1,4 +1,4 @@ -import type { Offset, ShapeStreamOptions } from '@electric-sql/client' +import type { Offset, Row, ShapeStreamOptions } from '@electric-sql/client' import { ChangeMessage, ShapeStream, @@ -12,6 +12,10 @@ import type { Transaction, } from '@electric-sql/pglite' +interface LegacyChangeMessage> extends ChangeMessage { + offset?: Offset +} + export type MapColumnsMap = Record export type MapColumnsFn = (message: ChangeMessage) => Record export type MapColumns = MapColumnsMap | MapColumnsFn @@ -154,7 +158,7 @@ async function createPlugin( // _very_ large shapes - either we should commit batches to // a temporary table and copy over the transactional result // or use a separate connection to hold a long transaction - let messageAggregator: ChangeMessage[] = [] + let messageAggregator: LegacyChangeMessage[] = [] let truncateNeeded = false // let lastLSN: string | null = null // Removed until Electric has stabilised on LSN metadata let lastCommitAt: number = 0 @@ -248,8 +252,9 @@ async function createPlugin( metadataSchema, shapeKey: options.shapeKey, shapeId: shapeHandle, - lastOffset: - messageAggregator[messageAggregator.length - 1].offset, + lastOffset: getMessageOffset( + messageAggregator[messageAggregator.length - 1], + ), }) } }) @@ -359,7 +364,7 @@ async function createPlugin( return stream.isUpToDate }, get shapeId() { - return stream.shapeHandle + return stream.shapeHandle! }, stream, subscribe: (cb: () => void, error: (err: Error) => void) => { @@ -667,3 +672,11 @@ function subscriptionMetadataTableName(metadatSchema: string) { } const subscriptionTableName = `shape_subscriptions_metadata` + +function getMessageOffset(message: LegacyChangeMessage): Offset { + if (message.offset) { + return message.offset + } else { + return `${message.headers.lsn}_${message.headers.op_position}` as Offset + } +} diff --git a/packages/pglite-sync/test/sync.test.ts b/packages/pglite-sync/test/sync.test.ts index 5f6f4fe9..b2201d14 100644 --- a/packages/pglite-sync/test/sync.test.ts +++ b/packages/pglite-sync/test/sync.test.ts @@ -64,7 +64,6 @@ describe('pglite-sync', () => { // insert await feedMessage({ headers: { operation: 'insert' }, - offset: '-1', key: 'id1', value: { id: 1, @@ -83,7 +82,6 @@ describe('pglite-sync', () => { // update await feedMessage({ headers: { operation: 'update' }, - offset: '-1', key: 'id1', value: { id: 1, @@ -102,7 +100,6 @@ describe('pglite-sync', () => { // delete await feedMessage({ headers: { operation: 'delete' }, - offset: '-1', key: 'id1', value: { id: 1, @@ -315,7 +312,6 @@ describe('pglite-sync', () => { await feedMessages([ { headers: { operation: 'insert' }, - offset: `1_${numInserts}`, key: `id${numInserts}`, value: { id: numInserts, @@ -326,7 +322,6 @@ describe('pglite-sync', () => { { headers: { control: 'must-refetch' } }, { headers: { operation: 'insert' }, - offset: `2_1`, key: `id21`, value: { id: 21, @@ -466,7 +461,6 @@ describe('pglite-sync', () => { // insert await feedMessage({ headers: { operation: 'insert' }, - offset: '-1', key: 'id1', value: { id: 1, @@ -485,7 +479,6 @@ describe('pglite-sync', () => { // update with no columns to update await feedMessage({ headers: { operation: 'update' }, - offset: '-1', key: 'id1', value: { id: 1, @@ -555,7 +548,6 @@ describe('pglite-sync', () => { await feedMessage({ headers: { operation: 'insert' }, - offset: '-1', key: 'id1', value: { id: 'id1', @@ -619,7 +611,6 @@ describe('pglite-sync', () => { ), { headers: { operation: 'update' as const }, - offset: `1_${numInserts}`, key: `id0`, value: { id: 0, @@ -683,7 +674,6 @@ describe('pglite-sync', () => { const specialCharMessages: Message[] = [ { headers: { operation: 'insert' }, - offset: '1_0', key: 'id1', value: { id: 1, @@ -693,7 +683,6 @@ describe('pglite-sync', () => { }, { headers: { operation: 'insert' }, - offset: '2_0', key: 'id2', value: { id: 2, @@ -703,7 +692,6 @@ describe('pglite-sync', () => { }, { headers: { operation: 'insert' }, - offset: '3_0', key: 'id3', value: { id: 3, @@ -784,7 +772,6 @@ describe('pglite-sync', () => { (_, idx) => ({ headers: { operation: 'insert' }, - offset: `1_${idx}`, key: `id${idx}`, value: { id: idx, @@ -872,7 +859,6 @@ describe('pglite-sync', () => { // await feedMessages([ // { // headers: { operation: 'insert' }, - // offset: '1_1', // Transaction 1 // key: 'id1', // value: { // id: 1, @@ -882,7 +868,6 @@ describe('pglite-sync', () => { // }, // { // headers: { operation: 'insert' }, - // offset: '1_2', // Same transaction // key: 'id2', // value: { // id: 2, @@ -892,7 +877,6 @@ describe('pglite-sync', () => { // }, // { // headers: { operation: 'insert' }, - // offset: '2_1', // New transaction // key: 'id3', // value: { // id: 3, @@ -974,19 +958,16 @@ describe('pglite-sync', () => { await feedMessages([ { headers: { operation: 'insert' }, - offset: '1_1', key: 'id1', value: { id: 1, task: 'task1', done: false }, }, { headers: { operation: 'insert' }, - offset: '2_1', key: 'id2', value: { id: 2, task: 'task2', done: false }, }, { headers: { operation: 'insert' }, - offset: '3_1', key: 'id3', value: { id: 3, task: 'task3', done: false }, }, @@ -1054,19 +1035,16 @@ describe('pglite-sync', () => { await feedMessages([ { headers: { operation: 'insert' }, - offset: '1_1', key: 'id1', value: { id: 1, task: 'task1', done: false }, }, { headers: { operation: 'insert' }, - offset: '1_2', key: 'id2', value: { id: 2, task: 'task2', done: false }, }, { headers: { operation: 'insert' }, - offset: '1_3', key: 'id3', value: { id: 3, task: 'task3', done: false }, }, @@ -1230,7 +1208,6 @@ describe('pglite-sync', () => { await feedMessages([ { headers: { operation: 'insert' }, - offset: '1_1', key: 'id1', value: { id: 1, @@ -1240,7 +1217,6 @@ describe('pglite-sync', () => { }, { headers: { operation: 'insert' }, - offset: '1_2', key: 'id2', value: { id: 2, @@ -1257,7 +1233,6 @@ describe('pglite-sync', () => { await feedMessages([ { headers: { operation: 'insert' }, - offset: '1_3', key: 'id3', value: { id: 3, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5e012b26..ba0d87ef 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -249,8 +249,8 @@ importers: packages/pglite-sync: dependencies: '@electric-sql/client': - specifier: ~0.9.0 - version: 0.9.0 + specifier: 1.0.0-beta.3 + version: 1.0.0-beta.3 devDependencies: '@electric-sql/pglite': specifier: workspace:* @@ -636,8 +636,8 @@ packages: search-insights: optional: true - '@electric-sql/client@0.9.0': - resolution: {integrity: sha512-UL2Gep9wPdGMTE0oEWVi0HA8R293R2OzFfHeAsN2LABYYl/boXss7nseNEiIV5+RjHPH7Tm8NsjH9iJW2rZkrQ==} + '@electric-sql/client@1.0.0-beta.3': + resolution: {integrity: sha512-x3bzYlX+IRwBAILPxzu3ARkXzmrAQtVOuJCKCxlSqENuJa4zvLPF4f8vC6HMOiiJiHPAntJjfI3Hb0lrt2PTxA==} '@embedded-postgres/darwin-arm64@15.5.1-beta.11': resolution: {integrity: sha512-5m96qe7TFR/wzL05fyl1TRKfm+I73gIdDea+vXh60MQzUUfX9FXSiR8id6TI4aRhomUrd/l8hLTq8E2ymTCIFw==} @@ -1891,6 +1891,7 @@ packages: bun@1.1.30: resolution: {integrity: sha512-ysRL1pq10Xba0jqVLPrKU3YIv0ohfp3cTajCPtpjCyppbn3lfiAVNpGoHfyaxS17OlPmWmR67UZRPw/EueQuug==} + cpu: [arm64, x64] os: [darwin, linux, win32] hasBin: true @@ -4800,7 +4801,7 @@ snapshots: transitivePeerDependencies: - '@algolia/client-search' - '@electric-sql/client@0.9.0': + '@electric-sql/client@1.0.0-beta.3': optionalDependencies: '@rollup/rollup-darwin-arm64': 4.24.0