Skip to content

Commit

Permalink
Fix MultiShapeStream to correctly refresh shape streams that are behi…
Browse files Browse the repository at this point in the history
…nd the max LSN
  • Loading branch information
samwillis committed Feb 25, 2025
1 parent 5085fcf commit cff02ec
Showing 1 changed file with 48 additions and 6 deletions.
54 changes: 48 additions & 6 deletions packages/experimental/src/multi-shape-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ export class MultiShapeStream<
checkForUpdatesAfter?: number

#checkForUpdatesTimeout?: ReturnType<typeof setTimeout> | undefined
#shapesToSkipCheckForUpdates = new Set<keyof TShapeRows>()

// We keep track of the last lsn of data and up-to-date messages for each shape
// so that we can skip checkForUpdates if the lsn of the up-to-date message is
// greater than the last lsn of data.
#lastDataLsns: { [K in keyof TShapeRows]: number }
#lastUpToDateLsns: { [K in keyof TShapeRows]: number }

readonly #subscribers = new Map<
number,
Expand Down Expand Up @@ -149,6 +154,12 @@ export class MultiShapeStream<
}),
])
) as { [K in keyof TShapeRows]: ShapeStream<TShapeRows[K]> }
this.#lastDataLsns = Object.fromEntries(
Object.entries(shapes).map(([key]) => [key, -Infinity])
) as { [K in keyof TShapeRows]: number }
this.#lastUpToDateLsns = Object.fromEntries(
Object.entries(shapes).map(([key]) => [key, -Infinity])
) as { [K in keyof TShapeRows]: number }
if (start) this.#start()
}

Expand All @@ -162,7 +173,34 @@ export class MultiShapeStream<
}
shape.subscribe(
async (messages) => {
this.#scheduleCheckForUpdates(key)
// Whats the max lsn of the up-to-date messages?
const upToDateLsns = messages
.filter(isControlMessage)
.map(({ headers }) => (headers.global_last_seen_lsn as number) ?? 0)
if (upToDateLsns.length > 0) {
const maxUpToDateLsn = Math.max(...upToDateLsns)
const lastMaxUpToDateLsn = this.#lastUpToDateLsns[key]
if (maxUpToDateLsn > lastMaxUpToDateLsn) {
this.#lastUpToDateLsns[key] = maxUpToDateLsn
}
}

// Whats the max lsn of the data messages?
const dataLsns = messages
.filter(isChangeMessage)
.map(({ headers }) => (headers.lsn as number) ?? 0)
if (dataLsns.length > 0) {
const maxDataLsn = Math.max(...dataLsns)
const lastMaxDataLsn = this.#lastDataLsns[key]
if (maxDataLsn > lastMaxDataLsn) {
this.#lastDataLsns[key] = maxDataLsn
}
// There is new data, so we need to schedule a check for updates on
// other shapes
this.#scheduleCheckForUpdates()
}

// Publish the messages to the multi-shape stream subscribers
const multiShapeMessages = messages.map(
(message) =>
({
Expand All @@ -178,21 +216,25 @@ export class MultiShapeStream<
this.#started = true
}

#scheduleCheckForUpdates(fromShape: keyof TShapeRows) {
this.#shapesToSkipCheckForUpdates.add(fromShape)
#scheduleCheckForUpdates() {
this.#checkForUpdatesTimeout ??= setTimeout(() => {
this.#checkForUpdates()
this.#checkForUpdatesTimeout = undefined
}, this.checkForUpdatesAfter)
}

async #checkForUpdates() {
const maxDataLsn = Math.max(...Object.values(this.#lastDataLsns))
const refreshPromises = this.#shapeEntries()
.filter(([key]) => !this.#shapesToSkipCheckForUpdates.has(key))
.filter(([key]) => {
// We only need to refresh shapes that have not seen an up-to-date message
// lower than the max lsn of the data messages we have received.
const lastUpToDateLsn = this.#lastUpToDateLsns[key]
return lastUpToDateLsn < maxDataLsn
})
.map(([_, shape]) => {
return shape.forceDisconnectAndRefresh()
})
this.#shapesToSkipCheckForUpdates.clear()
await Promise.all(refreshPromises)
}

Expand Down

0 comments on commit cff02ec

Please sign in to comment.