Skip to content

Commit

Permalink
PostgreSQL case sensitivity in listen (#530)
Browse files Browse the repository at this point in the history
* case sensitivity et all.


---------

Co-authored-by: tudor <[email protected]>
  • Loading branch information
copiltembel and tudor authored Feb 12, 2025
1 parent b3640ee commit 6bdd74e
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 19 deletions.
5 changes: 5 additions & 0 deletions .changeset/giant-spiders-beam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite': patch
---

listen and unlisten case sensitivity behaviour aligned to default PostgreSQL behaviour'
2 changes: 2 additions & 0 deletions docs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ const unsub = await pg.listen('test', (payload) => {
await pg.query("NOTIFY test, 'Hello, world!'")
```

Channel names are case sensitive if double-quoted (`pg.listen('"TeST"')`). Otherwise channel name will be lower cased (`pg.listen('TeStiNG')` == `pg.listen('testing')`).

### unlisten

`.unlisten(channel: string, callback?: (payload: string) => void): Promise<void>`
Expand Down
39 changes: 27 additions & 12 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import type {
PGliteOptions,
} from './interface.js'
import PostgresModFactory, { type PostgresMod } from './postgresMod.js'
import { getFsBundle, instantiateWasm, startWasmDownload } from './utils.js'
import {
getFsBundle,
instantiateWasm,
startWasmDownload,
toPostgresName,
} from './utils.js'

// Importing the source as the built version is not ESM compatible
import { Parser as ProtocolParser, serialize } from '@electric-sql/pg-protocol'
Expand Down Expand Up @@ -716,13 +721,22 @@ export class PGlite
* @param callback The callback to call when a notification is received
*/
async listen(channel: string, callback: (payload: string) => void) {
if (!this.#notifyListeners.has(channel)) {
this.#notifyListeners.set(channel, new Set())
const pgChannel = toPostgresName(channel)
if (!this.#notifyListeners.has(pgChannel)) {
this.#notifyListeners.set(pgChannel, new Set())
}
this.#notifyListeners.get(pgChannel)!.add(callback)
try {
await this.exec(`LISTEN ${channel}`)
} catch (e) {
this.#notifyListeners.get(pgChannel)!.delete(callback)
if (this.#notifyListeners.get(pgChannel)?.size === 0) {
this.#notifyListeners.delete(pgChannel)
}
throw e
}
this.#notifyListeners.get(channel)!.add(callback)
await this.exec(`LISTEN "${channel}"`)
return async () => {
await this.unlisten(channel, callback)
await this.unlisten(pgChannel, callback)
}
}

Expand All @@ -732,15 +746,16 @@ export class PGlite
* @param callback The callback to remove
*/
async unlisten(channel: string, callback?: (payload: string) => void) {
const pgChannel = toPostgresName(channel)
if (callback) {
this.#notifyListeners.get(channel)?.delete(callback)
if (this.#notifyListeners.get(channel)?.size === 0) {
await this.exec(`UNLISTEN "${channel}"`)
this.#notifyListeners.delete(channel)
this.#notifyListeners.get(pgChannel)?.delete(callback)
if (this.#notifyListeners.get(pgChannel)?.size === 0) {
await this.exec(`UNLISTEN ${channel}`)
this.#notifyListeners.delete(pgChannel)
}
} else {
await this.exec(`UNLISTEN "${channel}"`)
this.#notifyListeners.delete(channel)
await this.exec(`UNLISTEN ${channel}`)
this.#notifyListeners.delete(pgChannel)
}
}

Expand Down
17 changes: 17 additions & 0 deletions packages/pglite/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,20 @@ export function debounceMutex<A extends any[], R>(
return promise
}
}

/**
* Postgresql handles quoted names as CaseSensitive and unquoted as lower case.
* If input is quoted, returns an unquoted string (same casing)
* If input is unquoted, returns a lower-case string
*/
export function toPostgresName(input: string): string {
let output
if (input.startsWith('"') && input.endsWith('"')) {
// Postgres sensitive case
output = input.substring(1, input.length - 1)
} else {
// Postgres case insensitive - all to lower
output = input.toLowerCase()
}
return output
}
13 changes: 7 additions & 6 deletions packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type {
} from '../interface.js'
import type { PGlite } from '../pglite.js'
import { BasePGlite } from '../base.js'
import { uuid } from '../utils.js'
import { toPostgresName, uuid } from '../utils.js'

export type PGliteWorkerOptions<E extends Extensions = Extensions> =
PGliteOptions<E> & {
Expand Down Expand Up @@ -359,14 +359,15 @@ export class PGliteWorker
channel: string,
callback: (payload: string) => void,
): Promise<() => Promise<void>> {
await this.waitReady
if (!this.#notifyListeners.has(channel)) {
this.#notifyListeners.set(channel, new Set())
const pgChannel = toPostgresName(channel)

if (!this.#notifyListeners.has(pgChannel)) {
this.#notifyListeners.set(pgChannel, new Set())
}
this.#notifyListeners.get(channel)?.add(callback)
this.#notifyListeners.get(pgChannel)!.add(callback)
await this.exec(`LISTEN ${channel}`)
return async () => {
await this.unlisten(channel, callback)
await this.unlisten(pgChannel, callback)
}
}

Expand Down
127 changes: 126 additions & 1 deletion packages/pglite/tests/notify.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, it, expect } from 'vitest'
import { describe, it, expect, vi } from 'vitest'
import { PGlite } from '../dist/index.js'
import { expectToThrowAsync } from './test-utils.js'

describe('notify API', () => {
it('notify', async () => {
Expand Down Expand Up @@ -41,4 +42,128 @@ describe('notify API', () => {

await new Promise((resolve) => setTimeout(resolve, 1000))
})

it('check notify case sensitivity + special chars as Postgresql', async () => {
const pg = new PGlite()

const allLower1 = vi.fn()
await pg.listen('postgresdefaultlower', allLower1)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)

const autoLowerTest1 = vi.fn()
await pg.listen('PostgresDefaultLower', autoLowerTest1)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)

const autoLowerTest2 = vi.fn()
await pg.listen('PostgresDefaultLower', autoLowerTest2)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)

const autoLowerTest3 = vi.fn()
await pg.listen('postgresdefaultlower', autoLowerTest3)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)

const caseSensitive1 = vi.fn()
await pg.listen('"tesT2"', caseSensitive1)
await pg.query(`NOTIFY "tesT2", 'paYloAd2'`)

const caseSensitive2 = vi.fn()
await pg.listen('"tesT3"', caseSensitive2)
await pg.query(`NOTIFY tesT3, 'paYloAd2'`)

const caseSensitive3 = vi.fn()
await pg.listen('testNotCalled2', caseSensitive3)
await pg.query(`NOTIFY "testNotCalled2", 'paYloAd2'`)

const quotedWithSpaces = vi.fn()
await pg.listen('"Quoted Channel With Spaces"', quotedWithSpaces)
await pg.query(`NOTIFY "Quoted Channel With Spaces", 'payload1'`)

const unquotedWithSpaces = vi.fn()
await expectToThrowAsync(
pg.listen('Unquoted Channel With Spaces', unquotedWithSpaces),
)
await expectToThrowAsync(
pg.query(`NOTIFY Unquoted Channel With Spaces, 'payload1'`),
)

const otherCharsWithQuotes = vi.fn()
await pg.listen('"test&me"', otherCharsWithQuotes)
await pg.query(`NOTIFY "test&me", 'paYloAd2'`)

const otherChars = vi.fn()
await expectToThrowAsync(pg.listen('test&me', otherChars))
await expectToThrowAsync(pg.query(`NOTIFY test&me, 'payload1'`))

expect(allLower1).toHaveBeenCalledTimes(4)
expect(autoLowerTest1).toHaveBeenCalledTimes(3)
expect(autoLowerTest2).toHaveBeenCalledTimes(2)
expect(autoLowerTest3).toHaveBeenCalledTimes(1)
expect(caseSensitive1).toHaveBeenCalledOnce()
expect(caseSensitive2).not.toHaveBeenCalled()
expect(caseSensitive3).not.toHaveBeenCalled()
expect(otherCharsWithQuotes).toHaveBeenCalledOnce()
expect(quotedWithSpaces).toHaveBeenCalledOnce()
expect(unquotedWithSpaces).not.toHaveBeenCalled()
})

it('check unlisten case sensitivity + special chars as Postgresql', async () => {
const pg = new PGlite()

const allLower1 = vi.fn()
{
const unsub1 = await pg.listen('postgresdefaultlower', allLower1)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)
await unsub1()
}

const autoLowerTest1 = vi.fn()
{
const unsub2 = await pg.listen('PostgresDefaultLower', autoLowerTest1)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)
await unsub2()
}

const autoLowerTest2 = vi.fn()
{
const unsub3 = await pg.listen('PostgresDefaultLower', autoLowerTest2)
await pg.query(`NOTIFY postgresdefaultlower, 'payload1'`)
await unsub3()
}

const autoLowerTest3 = vi.fn()
{
const unsub4 = await pg.listen('postgresdefaultlower', autoLowerTest3)
await pg.query(`NOTIFY PostgresDefaultLower, 'payload1'`)
await unsub4()
}

const caseSensitive1 = vi.fn()
{
await pg.listen('"CaSESEnsiTIvE"', caseSensitive1)
await pg.query(`NOTIFY "CaSESEnsiTIvE", 'payload1'`)
await pg.unlisten('"CaSESEnsiTIvE"')
await pg.query(`NOTIFY "CaSESEnsiTIvE", 'payload1'`)
}

const quotedWithSpaces = vi.fn()
{
await pg.listen('"Quoted Channel With Spaces"', quotedWithSpaces)
await pg.query(`NOTIFY "Quoted Channel With Spaces", 'payload1'`)
await pg.unlisten('"Quoted Channel With Spaces"')
}

const otherCharsWithQuotes = vi.fn()
{
await pg.listen('"test&me"', otherCharsWithQuotes)
await pg.query(`NOTIFY "test&me", 'payload'`)
await pg.unlisten('"test&me"')
}

expect(allLower1).toHaveBeenCalledOnce()
expect(autoLowerTest1).toHaveBeenCalledOnce()
expect(autoLowerTest2).toHaveBeenCalledOnce()
expect(autoLowerTest3).toHaveBeenCalledOnce()
expect(caseSensitive1).toHaveBeenCalledOnce()
expect(otherCharsWithQuotes).toHaveBeenCalledOnce()
})
})

0 comments on commit 6bdd74e

Please sign in to comment.