From 82e17df43d5fa3ed0cfd5d5e402475da2d3b44d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0pa=C4=8Dek?= Date: Wed, 1 Mar 2023 12:44:14 +0100 Subject: [PATCH] Move hrana-client out of sqld repo --- .gitignore | 6 + LICENSE | 20 +++ README.md | 50 ++++++ examples/jwt_auth.mjs | 6 + examples/readme_example.js | 58 ++++++ jest.config.js | 6 + package-cjs.json | 3 + package.json | 57 ++++++ src/__tests__/index.ts | 173 ++++++++++++++++++ src/convert.ts | 102 +++++++++++ src/id_alloc.ts | 53 ++++++ src/index.ts | 359 +++++++++++++++++++++++++++++++++++++ src/proto.ts | 129 +++++++++++++ test_server.py | 133 ++++++++++++++ tsconfig.base.json | 16 ++ tsconfig.build-cjs.json | 9 + tsconfig.build-esm.json | 9 + tsconfig.json | 7 + 18 files changed, 1196 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 examples/jwt_auth.mjs create mode 100644 examples/readme_example.js create mode 100644 jest.config.js create mode 100644 package-cjs.json create mode 100644 package.json create mode 100644 src/__tests__/index.ts create mode 100644 src/convert.ts create mode 100644 src/id_alloc.ts create mode 100644 src/index.ts create mode 100644 src/proto.ts create mode 100644 test_server.py create mode 100644 tsconfig.base.json create mode 100644 tsconfig.build-cjs.json create mode 100644 tsconfig.build-esm.json create mode 100644 tsconfig.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a13312 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +node_modules +/lib-esm +/lib-cjs +package-lock.json +*.tsbuildinfo +Session.vim diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..fd1a31e --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +MIT License + +Copyright 2023 the sqld authors + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..7004968 --- /dev/null +++ b/README.md @@ -0,0 +1,50 @@ +# Hrana client for TypeScript + +This package implements a Hrana client for TypeScript. Hrana is a protocol based on WebSockets that can be used to connect to sqld. It is more efficient than the postgres wire protocol (especially for edge deployments) and it supports interactive stateful SQL connections (called "streams") which are not supported by the HTTP API. + +> This package is intended mostly for internal use. Consider using the [`@libsql/client`][libsql-client] package, which will automatically use Hrana if you connect to a `ws://` or `wss://` URL. + +[libsql-client]: https://www.npmjs.com/package/@libsql/client + +## Usage + +```typescript +import * as hrana from "@libsql/hrana-client"; + +// Open a `hrana.Client`, which works like a connection pool in standard SQL +// databases, but it uses just a single network connection internally +const url = process.env.URL ?? "ws://localhost:2023"; // Address of the sqld server +const jwt = process.env.JWT; // JWT token for authentication +const client = hrana.open(url, jwt); + +// Open a `hrana.Stream`, which is an interactive SQL stream. This corresponds +// to a "connection" from other SQL databases +const stream = client.openStream(); + +// Fetch all rows returned by a SQL statement +const books = await stream.query("SELECT title, year FROM book WHERE author = 'Jane Austen'"); +// The rows are returned in an Array... +for (const book of books) { + // every returned row works as an array (`book[1]`) and as an object (`book.year`) + console.log(`${book.title} from ${book.year}`); +} + +// Fetch a single row +const book = await stream.queryRow("SELECT title, MIN(year) FROM book"); +if (book !== undefined) { + console.log(`The oldest book is ${book.title} from year ${book[1]}`); +} + +// Fetch a single value, using a bound parameter +const year = await stream.queryValue(["SELECT MAX(year) FROM book WHERE author = ?", ["Jane Austen"]]); +if (year !== undefined) { + console.log(`Last book from Jane Austen was published in ${year}`); +} + +// Execute a statement that does not return any rows +const res = await stream.execute(["DELETE FROM book WHERE author = ?", ["J. K. Rowling"]]) +console.log(`${res.rowsAffected} books have been cancelled`); + +// When you are done, remember to close the client +client.close(); +``` diff --git a/examples/jwt_auth.mjs b/examples/jwt_auth.mjs new file mode 100644 index 0000000..e6273a9 --- /dev/null +++ b/examples/jwt_auth.mjs @@ -0,0 +1,6 @@ +import * as hrana from "@libsql/hrana-client"; + +const client = hrana.open(process.env.URL ?? "ws://localhost:2023", process.env.JWT); +const stream = client.openStream(); +console.log(await stream.queryValue("SELECT 1")); +client.close(); diff --git a/examples/readme_example.js b/examples/readme_example.js new file mode 100644 index 0000000..1ffe1d7 --- /dev/null +++ b/examples/readme_example.js @@ -0,0 +1,58 @@ +import * as hrana from "@libsql/hrana-client"; + +// Open a `hrana.Client`, which works like a connection pool in standard SQL +// databases, but it uses just a single network connection internally +const url = process.env.URL ?? "ws://localhost:2023"; // Address of the sqld server +const jwt = process.env.JWT; // JWT token for authentication +const client = hrana.open(url, jwt); + +// Open a `hrana.Stream`, which is an interactive SQL stream. This corresponds +// to a "connection" from other SQL databases +const stream = client.openStream(); + +await stream.execute(`CREATE TABLE book ( + id INTEGER PRIMARY KEY NOT NULL, + author TEXT NOT NULL, + title TEXT NOT NULL, + year INTEGER NOT NULL +)`); +await stream.execute(`INSERT INTO book (author, title, year) VALUES + ('Jane Austen', 'Sense and Sensibility', 1811), + ('Jane Austen', 'Pride and Prejudice', 1813), + ('Jane Austen', 'Mansfield Park', 1814), + ('Jane Austen', 'Emma', 1815), + ('Jane Austen', 'Persuasion', 1818), + ('Jane Austen', 'Lady Susan', 1871), + ('Daniel Defoe', 'Robinson Crusoe', 1719), + ('Daniel Defoe', 'A Journal of the Plague Year', 1722), + ('J. K. Rowling', 'Harry Potter and the Philosopher''s Stone', 1997), + ('J. K. Rowling', 'The Casual Vacancy', 2012), + ('J. K. Rowling', 'The Ickabog', 2020) +`); + +// Fetch all rows returned by a SQL statement +const books = await stream.query("SELECT title, year FROM book WHERE author = 'Jane Austen'"); +// The rows are returned in an Array... +for (const book of books) { + // every returned row works as an array (`book[1]`) and as an object (`book.year`) + console.log(`${book.title} from ${book.year}`); +} + +// Fetch a single row +const book = await stream.queryRow("SELECT title, MIN(year) FROM book"); +if (book !== undefined) { + console.log(`The oldest book is ${book.title} from year ${book[1]}`); +} + +// Fetch a single value, using a bound parameter +const year = await stream.queryValue(["SELECT MAX(year) FROM book WHERE author = ?", ["Jane Austen"]]); +if (year !== undefined) { + console.log(`Last book from Jane Austen was published in ${year}`); +} + +// Execute a statement that does not return any rows +const res = await stream.execute(["DELETE FROM book WHERE author = ?", ["J. K. Rowling"]]) +console.log(`${res.rowsAffected} books have been cancelled`); + +// When you are done, remember to close the client +client.close(); diff --git a/jest.config.js b/jest.config.js new file mode 100644 index 0000000..006b4a9 --- /dev/null +++ b/jest.config.js @@ -0,0 +1,6 @@ +export default { + preset: "ts-jest/presets/default-esm", + moduleNameMapper: { + '^(\\.{1,2}/.*)\\.js$': '$1', + }, +} diff --git a/package-cjs.json b/package-cjs.json new file mode 100644 index 0000000..1cd945a --- /dev/null +++ b/package-cjs.json @@ -0,0 +1,3 @@ +{ + "type": "commonjs" +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..6114fb0 --- /dev/null +++ b/package.json @@ -0,0 +1,57 @@ +{ + "name": "@libsql/hrana-client", + "version": "0.1.0", + "keywords": [ + "hrana", + "libsql", + "sqld", + "database" + ], + "description": "Hrana client for connecting to sqld over a WebSocket", + "repository": { + "type": "git", + "url": "github:libsql/hrana-client-ts" + }, + "homepage": "https://github.com/libsql/hrana-client-ts", + "authors": [ + "Jan Špaček " + ], + "license": "MIT", + + "type": "module", + "main": "lib-cjs/index.js", + "types": "lib-esm/index.d.ts", + "exports": { + ".": { + "types": "./lib-esm/index.d.ts", + "import": "./lib-esm/index.js", + "require": "./lib-cjs/index.js" + } + }, + + "scripts": { + "prebuild": "rm -rf ./lib-cjs ./lib-esm", + "build": "npm run build:cjs && npm run build:esm", + "build:cjs": "tsc -p tsconfig.build-cjs.json", + "build:esm": "tsc -p tsconfig.build-esm.json", + "postbuild": "cp package-cjs.json ./lib-cjs/package.json", + + "test": "jest" + }, + + "files": [ + "lib-cjs/**", + "lib-esm/**" + ], + + "dependencies": { + "isomorphic-ws": "^5.0.0" + }, + "devDependencies": { + "@types/jest": "^29.4.0", + "@types/ws": "^8.5.4", + "jest": "^29.4.0", + "ts-jest": "^29.0.5", + "typescript": "^4.9.4" + } +} diff --git a/src/__tests__/index.ts b/src/__tests__/index.ts new file mode 100644 index 0000000..f17bedd --- /dev/null +++ b/src/__tests__/index.ts @@ -0,0 +1,173 @@ +import * as hrana from ".."; + +function withClient(f: (c: hrana.Client) => Promise): () => Promise { + return async () => { + const c = hrana.open("ws://localhost:2023"); + try { + await f(c); + } finally { + c.close(); + } + }; +} + +test("Stream.queryValue()", withClient(async (c) => { + const s = c.openStream(); + expect(await s.queryValue("SELECT 1")).toStrictEqual(1); + expect(await s.queryValue("SELECT 'elephant'")).toStrictEqual("elephant"); + expect(await s.queryValue("SELECT 42.5")).toStrictEqual(42.5); + expect(await s.queryValue("SELECT NULL")).toStrictEqual(null); +})); + +test("Stream.queryRow()", withClient(async (c) => { + const s = c.openStream(); + + const row = await s.queryRow( + "SELECT 1 AS one, 'elephant' AS two, 42.5 AS three, NULL as four"); + expect(row[0]).toStrictEqual(1); + expect(row[1]).toStrictEqual("elephant"); + expect(row[2]).toStrictEqual(42.5); + expect(row[3]).toStrictEqual(null); + + expect(row[0]).toStrictEqual(row.one); + expect(row[1]).toStrictEqual(row.two); + expect(row[2]).toStrictEqual(row.three); + expect(row[3]).toStrictEqual(row.four); +})); + +test("Stream.query()", withClient(async (c) => { + const s = c.openStream(); + + await s.execute("BEGIN"); + await s.execute("DROP TABLE IF EXISTS t"); + await s.execute("CREATE TABLE t (one, two, three, four)"); + await s.execute( + `INSERT INTO t VALUES + (1, 'elephant', 42.5, NULL), + (2, 'hippopotamus', '123', 0.0)` + ); + + const rows = await s.query("SELECT * FROM t ORDER BY one"); + expect(rows.length).toStrictEqual(2); + expect(rows.rowsAffected).toStrictEqual(0); + + const row0 = rows[0]; + expect(row0[0]).toStrictEqual(1); + expect(row0[1]).toStrictEqual("elephant"); + expect(row0["three"]).toStrictEqual(42.5); + expect(row0["four"]).toStrictEqual(null); + + const row1 = rows[1]; + expect(row1["one"]).toStrictEqual(2); + expect(row1["two"]).toStrictEqual("hippopotamus"); + expect(row1[2]).toStrictEqual("123"); + expect(row1[3]).toStrictEqual(0.0); +})); + +test("Stream.execute()", withClient(async (c) => { + const s = c.openStream(); + + let res = await s.execute("BEGIN"); + expect(res.rowsAffected).toStrictEqual(0); + + res = await s.execute("DROP TABLE IF EXISTS t"); + expect(res.rowsAffected).toStrictEqual(0); + + res = await s.execute("CREATE TABLE t (num, word)"); + expect(res.rowsAffected).toStrictEqual(0); + + res = await s.execute("INSERT INTO t VALUES (1, 'one'), (2, 'two'), (3, 'three')"); + expect(res.rowsAffected).toStrictEqual(3); + + const rows = await s.query("SELECT * FROM t ORDER BY num"); + expect(rows.length).toStrictEqual(3); + expect(rows.rowsAffected).toStrictEqual(0); + + res = await s.execute("DELETE FROM t WHERE num >= 2"); + expect(res.rowsAffected).toStrictEqual(2); + + res = await s.execute("UPDATE t SET num = 4, word = 'four'"); + expect(res.rowsAffected).toStrictEqual(1); + + res = await s.execute("DROP TABLE t"); + expect(res.rowsAffected).toStrictEqual(0); + + await s.execute("COMMIT"); +})); + +test("Stream.executeRaw()", withClient(async (c) => { + const s = c.openStream(); + + let res = await s.executeRaw({ + "sql": "SELECT 1 as one, ? as two, NULL as three", + "args": [{"type": "text", "value": "1+1"}], + "want_rows": true, + }); + + expect(res.cols).toStrictEqual([ + {"name": "one"}, + {"name": "two"}, + {"name": "three"}, + ]); + expect(res.rows).toStrictEqual([ + [ + {"type": "integer", "value": "1"}, + {"type": "text", "value": "1+1"}, + {"type": "null"}, + ], + ]); +})); + +test("concurrent streams are separate", withClient(async (c) => { + const s1 = c.openStream(); + await s1.execute("DROP TABLE IF EXISTS t"); + await s1.execute("CREATE TABLE t (number)"); + await s1.execute("INSERT INTO t VALUES (1)"); + + const s2 = c.openStream(); + + await s1.execute("BEGIN"); + + await s2.execute("BEGIN"); + await s2.execute("INSERT INTO t VALUES (10)"); + + expect(await s1.queryValue("SELECT SUM(number) FROM t")).toStrictEqual(1); + expect(await s2.queryValue("SELECT SUM(number) FROM t")).toStrictEqual(11); +})); + +test("concurrent operations are correctly ordered", withClient(async (c) => { + const s = c.openStream(); + await s.execute("DROP TABLE IF EXISTS t"); + await s.execute("CREATE TABLE t (stream, value)"); + + async function stream(streamId: number): Promise { + const s = c.openStream(); + + let value = "s" + streamId; + await s.execute(["INSERT INTO t VALUES (?, ?)", [streamId, value]]); + + const promises: Array> = []; + const expectedValues = []; + for (let i = 0; i < 10; ++i) { + const promise = s.queryValue([ + "UPDATE t SET value = value || ? WHERE stream = ? RETURNING value", + ["_" + i, streamId], + ]); + value = value + "_" + i; + promises.push(promise); + expectedValues.push(value); + } + + for (let i = 0; i < promises.length; ++i) { + expect(await promises[i]).toStrictEqual(expectedValues[i]); + } + + s.close(); + } + + const promises = []; + for (let i = 0; i < 10; ++i) { + promises.push(stream(i)); + } + await Promise.all(promises); +})); diff --git a/src/convert.ts b/src/convert.ts new file mode 100644 index 0000000..7a50f49 --- /dev/null +++ b/src/convert.ts @@ -0,0 +1,102 @@ +import type * as proto from "./proto.js"; + +/** A statement that you can send to the database. Either a plain SQL string, or an SQL string together with + * values for the `?` parameters. + */ +export type Stmt = + | string + | [string, Array]; + +export function stmtToProto(stmtLike: Stmt, wantRows: boolean): proto.Stmt { + let sql; + let args: Array = []; + if (typeof stmtLike === "string") { + sql = stmtLike; + } else { + sql = stmtLike[0]; + args = stmtLike[1].map(valueToProto); + } + + return {"sql": sql, "args": args, "want_rows": wantRows}; +} + +/** JavaScript values that you can get from the database. */ +export type Value = + | null + | string + | number + | ArrayBuffer; + +export function valueToProto(value: Value): proto.Value { + if (value === null) { + return {"type": "null"}; + } else if (typeof value === "number") { + return {"type": "float", "value": +value}; + } else if (value instanceof ArrayBuffer) { + throw new Error("ArrayBuffer is not yet supported"); + } else { + return {"type": "text", "value": ""+value}; + } +} + +export function valueFromProto(value: proto.Value): Value { + if (value["type"] === "null") { + return null; + } else if (value["type"] === "integer") { + return parseInt(value["value"], 10); + } else if (value["type"] === "float") { + return value["value"]; + } else if (value["type"] === "text") { + return value["value"]; + } else if (value["type"] === "blob") { + throw new Error("blob is not yet supported"); + } else { + throw new Error("Unexpected value type"); + } +} + +export function stmtResultFromProto(result: proto.StmtResult): StmtResult { + return {rowsAffected: result["affected_row_count"]}; +} + +export function rowArrayFromProto(result: proto.StmtResult): RowArray { + const array = new RowArray(result["affected_row_count"]); + for (const row of result["rows"]) { + array.push(rowFromProto(result, row)); + } + return array; +} + +export function rowFromProto(result: proto.StmtResult, row: Array): Row { + const array = row.map((value) => valueFromProto(value)); + + for (let i = 0; i < result["cols"].length; ++i) { + const colName = result["cols"][i]["name"]; + if (colName && !Object.hasOwn(array, colName)) { + Object.defineProperty(array, colName, { + value: array[i], + enumerable: true, + }); + } + } + + return array; +} + +export interface StmtResult { + rowsAffected: number; +} + +export class RowArray extends Array implements StmtResult { + constructor(public rowsAffected: number) { + super(); + Object.setPrototypeOf(this, RowArray.prototype); + } +} + +export type Row = any; + +export function errorFromProto(error: proto.Error): Error { + return new Error(`Server returned error ${JSON.stringify(error["message"])}`); +} + diff --git a/src/id_alloc.ts b/src/id_alloc.ts new file mode 100644 index 0000000..592e3e9 --- /dev/null +++ b/src/id_alloc.ts @@ -0,0 +1,53 @@ +// An allocator of non-negative integer ids. +// +// This clever data structure has these "ideal" properties: +// - It consumes memory proportional to the number of used ids (which is optimal). +// - All operations are O(1) time. +// - The allocated ids are small (with a slight modification, we could always provide the smallest possible +// id). +export default class IdAlloc { + // Set of all allocated ids + #usedIds: Set; + // Set of all free ids lower than `#usedIds.size` + #freeIds: Set; + + constructor() { + this.#usedIds = new Set(); + this.#freeIds = new Set(); + } + + // Returns an id that was free, and marks it as used. + alloc(): number { + // this "loop" is just a way to pick an arbitrary element from the `#freeIds` set + for (const freeId of this.#freeIds) { + this.#freeIds.delete(freeId); + this.#usedIds.add(freeId); + + // maintain the invariant of `#freeIds` + if (!this.#usedIds.has(this.#usedIds.size - 1)) { + this.#freeIds.add(this.#usedIds.size - 1); + } + return freeId; + } + + // the `#freeIds` set is empty, so there are no free ids lower than `#usedIds.size` + // this means that `#usedIds` is a set that contains all numbers from 0 to `#usedIds.size - 1`, + // so `#usedIds.size` is free + const freeId = this.#usedIds.size; + this.#usedIds.add(freeId); + return freeId; + } + + free(id: number) { + if (!this.#usedIds.delete(id)) { + throw new Error("Internal error: freeing an id that is not allocated"); + } + + // maintain the invariant of `#freeIds` + this.#freeIds.delete(this.#usedIds.size); + if (id < this.#usedIds.size) { + this.#freeIds.add(id); + } + } +} + diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..d8c3335 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,359 @@ +import WebSocket from "isomorphic-ws"; + +import type { Stmt, Value, StmtResult, RowArray, Row } from "./convert.js"; +import { + stmtToProto, rowArrayFromProto, rowFromProto, + stmtResultFromProto, valueFromProto, errorFromProto, +} from "./convert.js"; +import IdAlloc from "./id_alloc.js"; +import type * as proto from "./proto.js"; + +export type { Stmt, Value, StmtResult, RowArray, Row } from "./convert"; +export type { proto }; + +/** Open a Hrana client connected to the given `url`. */ +export function open(url: string, jwt?: string): Client { + const socket = new WebSocket(url, ["hrana1"]); + return new Client(socket, jwt ?? null); +} + +/** A client that talks to a SQL server using the Hrana protocol over a WebSocket. */ +export class Client { + #socket: WebSocket; + // List of messages that we queue until the socket transitions from the CONNECTING to the OPEN state. + #msgsWaitingToOpen: proto.ClientMsg[]; + // Stores the error that caused us to close the client (and the socket). If we are not closed, this is + // `undefined`. + #closed: Error | undefined; + + // Have we received a response to our "hello" from the server? + #recvdHello: boolean; + // A map from request id to the responses that we expect to receive from the server. + #responseMap: Map; + // An allocator of request ids. + #requestIdAlloc: IdAlloc; + // An allocator of stream ids. + #streamIdAlloc: IdAlloc; + + /** @private */ + constructor(socket: WebSocket, jwt: string | null) { + this.#socket = socket; + this.#socket.binaryType = "arraybuffer"; + this.#msgsWaitingToOpen = []; + this.#closed = undefined; + + this.#recvdHello = false; + this.#responseMap = new Map(); + this.#requestIdAlloc = new IdAlloc(); + this.#streamIdAlloc = new IdAlloc(); + + this.#socket.onopen = () => this.#onSocketOpen(); + this.#socket.onclose = (event) => this.#onSocketClose(event); + this.#socket.onerror = (event) => this.#onSocketError(event); + this.#socket.onmessage = (event) => this.#onSocketMessage(event); + + this.#send({"type": "hello", "jwt": jwt}); + } + + // Send (or enqueue to send) a message to the server. + #send(msg: proto.ClientMsg): void { + if (this.#closed !== undefined) { + throw new Error("Internal error: trying to send a message on a closed client"); + } + + if (this.#socket.readyState >= WebSocket.OPEN) { + this.#sendToSocket(msg); + } else { + this.#msgsWaitingToOpen.push(msg); + } + } + + // The socket transitioned from CONNECTING to OPEN + #onSocketOpen(): void { + for (const msg of this.#msgsWaitingToOpen) { + this.#sendToSocket(msg); + } + this.#msgsWaitingToOpen.length = 0; + } + + #sendToSocket(msg: proto.ClientMsg): void { + this.#socket.send(JSON.stringify(msg)); + } + + // Send a request to the server and invoke a callback when we get the response. + #sendRequest(request: proto.Request, callbacks: ResponseCallbacks) { + const requestId = this.#requestIdAlloc.alloc(); + this.#responseMap.set(requestId, {...callbacks, type: request.type}); + this.#send({"type": "request", "request_id": requestId, request}); + } + + // The socket encountered an error. + #onSocketError(event: Event | WebSocket.ErrorEvent): void { + const eventMessage = (event as {message?: string}).message; + const message = eventMessage ?? "Connection was closed due to an error"; + this.#setClosed(new Error(message)); + } + + // The socket was closed. + #onSocketClose(event: WebSocket.CloseEvent): void { + this.#setClosed(new Error(`WebSocket was closed with code ${event.code}: ${event.reason}`)); + } + + // Close the client with the given error. + #setClosed(error: Error): void { + if (this.#closed !== undefined) { + return; + } + this.#closed = error; + + for (const [requestId, responseState] of this.#responseMap.entries()) { + responseState.errorCallback(error); + this.#requestIdAlloc.free(requestId); + } + this.#responseMap.clear(); + + this.#socket.close(); + } + + // We received a message from the socket. + #onSocketMessage(event: WebSocket.MessageEvent): void { + if (typeof event.data !== "string") { + this.#socket.close(3003, "Only string messages are accepted"); + this.#setClosed(new Error("Received non-string message from server")) + return; + } + + try { + this.#handleMsg(event.data); + } catch (e) { + this.#socket.close(3007, "Could not handle message"); + this.#setClosed(e as Error); + } + } + + // Handle a message from the server. + #handleMsg(msgText: string): void { + const msg = JSON.parse(msgText) as proto.ServerMsg; + + if (msg["type"] === "hello_ok" || msg["type"] === "hello_error") { + if (this.#recvdHello) { + throw new Error("Received a duplicated hello response"); + } + this.#recvdHello = true; + + if (msg["type"] === "hello_error") { + throw errorFromProto(msg["error"]); + } + return; + } else if (!this.#recvdHello) { + throw new Error("Received a non-hello message before a hello response"); + } + + if (msg["type"] === "response_ok") { + const requestId = msg["request_id"]; + const responseState = this.#responseMap.get(requestId); + this.#responseMap.delete(requestId); + + if (responseState === undefined) { + throw new Error("Received unexpected OK response"); + } else if (responseState.type !== msg["response"]["type"]) { + throw new Error("Received unexpected type of response"); + } + + try { + responseState.responseCallback(msg["response"]); + } catch (e) { + responseState.errorCallback(e as Error); + throw e; + } + } else if (msg["type"] === "response_error") { + const requestId = msg["request_id"]; + const responseState = this.#responseMap.get(requestId); + this.#responseMap.delete(requestId); + + if (responseState === undefined) { + throw new Error("Received unexpected error response"); + } + responseState.errorCallback(errorFromProto(msg["error"])); + } else { + throw new Error("Received unexpected message type"); + } + } + + /** Open a {@link Stream}, a stream for executing SQL statements. */ + openStream(): Stream { + if (this.#closed !== undefined) { + throw new Error("Client is closed", {cause: this.#closed}); + } + + const streamId = this.#streamIdAlloc.alloc(); + const streamState = { + streamId, + closed: undefined, + }; + + const responseCallback = () => undefined; + const errorCallback = (e: Error) => this._closeStream(streamState, e); + + const request: proto.OpenStreamReq = { + "type": "open_stream", + "stream_id": streamId, + }; + this.#sendRequest(request, {responseCallback, errorCallback}); + + return new Stream(this, streamState); + } + + // Make sure that the stream is closed. + /** @private */ + _closeStream(streamState: StreamState, error: Error): void { + if (streamState.closed !== undefined || this.#closed !== undefined) { + return; + } + streamState.closed = error; + + const callback = () => { + this.#streamIdAlloc.free(streamState.streamId); + }; + const request: proto.CloseStreamReq = { + "type": "close_stream", + "stream_id": streamState.streamId, + }; + this.#sendRequest(request, {responseCallback: callback, errorCallback: callback}); + } + + // Execute a statement on a stream and invoke callbacks in `stmtState` when we get the results (or an + // error). + /** @private */ + _execute(streamState: StreamState, stmtState: StmtState): void { + const responseCallback = (response: proto.Response) => { + stmtState.resultCallback((response as proto.ExecuteResp)["result"]); + }; + const errorCallback = (error: Error) => { + stmtState.errorCallback(error); + } + + if (streamState.closed !== undefined) { + errorCallback(new Error("Stream was closed", {cause: streamState.closed})); + return; + } else if (this.#closed !== undefined) { + errorCallback(new Error("Client was closed", {cause: this.#closed})); + return; + } + + const request: proto.ExecuteReq = { + "type": "execute", + "stream_id": streamState.streamId, + "stmt": stmtState.stmt, + }; + this.#sendRequest(request, {responseCallback, errorCallback}); + } + + /** Close the client and the WebSocket. */ + close() { + this.#setClosed(new Error("Client was manually closed")); + } +} + +interface ResponseCallbacks { + responseCallback: (_: proto.Response) => void; + errorCallback: (_: Error) => void; +} + +interface ResponseState extends ResponseCallbacks { + type: string; +} + +interface StmtState { + stmt: proto.Stmt; + resultCallback: (_: proto.StmtResult) => void; + errorCallback: (_: Error) => void; +} + +interface StreamState { + streamId: number; + closed: Error | undefined; +} + +/** A stream for executing SQL statements (a "database connection"). */ +export class Stream { + #client: Client; + #state: StreamState; + + /** @private */ + constructor(client: Client, state: StreamState) { + this.#client = client; + this.#state = state; + } + + /** Execute a raw Hrana statement. */ + executeRaw(stmt: proto.Stmt): Promise { + return new Promise((resultCallback, errorCallback) => { + this.#client._execute(this.#state, {stmt, resultCallback, errorCallback}); + }); + } + + /** Execute a statement that returns rows. */ + query(stmt: Stmt): Promise { + return new Promise((rowsCallback, errorCallback) => { + this.#client._execute(this.#state, { + stmt: stmtToProto(stmt, true), + resultCallback(result) { + rowsCallback(rowArrayFromProto(result)) + }, + errorCallback, + }); + }); + } + + /** Execute a statement that returns at most a single row. */ + queryRow(stmt: Stmt): Promise { + return new Promise((rowCallback, errorCallback) => { + this.#client._execute(this.#state, { + stmt: stmtToProto(stmt, true), + resultCallback(result) { + if (result.rows.length >= 1) { + rowCallback(rowFromProto(result, result.rows[0])); + } else { + rowCallback(undefined); + } + }, + errorCallback, + }); + }); + } + + /** Execute a statement that returns at most a single value. */ + queryValue(stmt: Stmt): Promise { + return new Promise((valueCallback, errorCallback) => { + this.#client._execute(this.#state, { + stmt: stmtToProto(stmt, true), + resultCallback(result) { + if (result.rows.length >= 1 && result.rows[0].length >= 1) { + valueCallback(valueFromProto(result.rows[0][0])); + } else { + valueCallback(undefined); + } + }, + errorCallback, + }); + }); + } + + /** Execute a statement that does not return rows. */ + execute(stmt: Stmt): Promise { + return new Promise((doneCallback, errorCallback) => { + this.#client._execute(this.#state, { + stmt: stmtToProto(stmt, false), + resultCallback(result) { doneCallback(stmtResultFromProto(result)); }, + errorCallback, + }); + }); + } + + /** Close the stream. */ + close(): void { + this.#client._closeStream(this.#state, new Error("Stream was manually closed")); + } +} + diff --git a/src/proto.ts b/src/proto.ts new file mode 100644 index 0000000..f7cecc5 --- /dev/null +++ b/src/proto.ts @@ -0,0 +1,129 @@ +// TypeScript types for the messages in the Hrana protocol +// +// The structure of this file follows the specification in HRANA_SPEC.md + +export type int32 = number + +// ## Messages + +export type ClientMsg = + | HelloMsg + | RequestMsg + +export type ServerMsg = + | HelloOkMsg + | HelloErrorMsg + | ResponseOkMsg + | ResponseErrorMsg + +// ### Hello + +export type HelloMsg = { + "type": "hello", + "jwt": string | null, +} + +export type HelloOkMsg = { + "type": "hello_ok", +} + +export type HelloErrorMsg = { + "type": "hello_error", + "error": Error, +} + +// ### Request/response + +export type RequestMsg = { + "type": "request", + "request_id": int32, + "request": Request, +} + +export type ResponseOkMsg = { + "type": "response_ok", + "request_id": int32, + "response": Response, +} + +export type ResponseErrorMsg = { + "type": "response_error", + "request_id": int32, + "error": Error, +} + +// ### Errors + +export type Error = { + "message": string, +} + +// ## Requests + +export type Request = + | OpenStreamReq + | CloseStreamReq + | ExecuteReq + +export type Response = + | OpenStreamResp + | CloseStreamResp + | ExecuteResp + +// ### Open stream + +export type OpenStreamReq = { + "type": "open_stream", + "stream_id": int32, +} + +export type OpenStreamResp = { + "type": "open_stream", +} + +// ### Close stream + +export type CloseStreamReq = { + "type": "close_stream", + "stream_id": int32, +} + +export type CloseStreamResp = { + "type": "close_stream", +} + +// ### Execute a statement + +export type ExecuteReq = { + "type": "execute", + "stream_id": int32, + "stmt": Stmt, +} + +export type ExecuteResp = { + "type": "execute", + "result": StmtResult, +} + +export type Stmt = { + "sql": string, + "args": Array, + "want_rows": boolean, +} + +export type StmtResult = { + "cols": Array, + "rows": Array>, + "affected_row_count": number, +} + +export type Col = { + "name": string | null, +} + +export type Value = + | { "type": "null" } + | { "type": "integer", "value": string } + | { "type": "float", "value": number } + | { "type": "text", "value": string } + | { "type": "blob", "base64": string } diff --git a/test_server.py b/test_server.py new file mode 100644 index 0000000..f63785c --- /dev/null +++ b/test_server.py @@ -0,0 +1,133 @@ +import asyncio +import base64 +import collections +import json +import os +import sqlite3 +import sys +import tempfile + +import websockets + +async def main(): + server = await websockets.serve(handle_socket, "localhost", 2023, subprotocols=["hrana1"]) + for sock in server.sockets: + print(f"Listening on {sock.getsockname()!r}") + await server.wait_closed() + +async def handle_socket(websocket): + async def recv_msg(): + try: + msg_str = await websocket.recv() + except websockets.exceptions.ConnectionClosed: + return None + assert isinstance(msg_str, str) + msg = json.loads(msg_str) + return msg + + async def send_msg(msg): + msg_str = json.dumps(msg) + await websocket.send(msg_str) + + db_fd, db_file = tempfile.mkstemp(suffix=".db", prefix="hrana_client_test_") + os.close(db_fd) + print(f"Accepted connection from {websocket.remote_address}, using db {db_file!r}") + + Stream = collections.namedtuple("Stream", ["conn"]) + streams = {} + + async def handle_request(req): + if req["type"] == "open_stream": + conn = await asyncio.to_thread(lambda: sqlite3.connect(db_file, + check_same_thread=False, isolation_level=None)) + streams[int(req["stream_id"])] = Stream(conn) + return {"type": "open_stream"} + elif req["type"] == "close_stream": + stream = streams.pop(int(req["stream_id"]), None) + if stream is not None: + await asyncio.to_thread(lambda: stream.conn.close()) + return {"type": "close_stream"} + elif req["type"] == "execute": + stream = streams[int(req["stream_id"])] + result = await asyncio.to_thread(lambda: execute_stmt(stream.conn, req["stmt"])) + return {"type": "execute", "result": result} + else: + raise RuntimeError(f"Unknown req: {req!r}") + + def execute_stmt(conn, stmt): + params = [value_to_sqlite(arg) for arg in stmt["args"]] + cursor = conn.execute(stmt["sql"], params) + cols = [{"name": name} for name, *_ in cursor.description or []] + + rows = [] + for row in cursor: + if stmt["want_rows"]: + rows.append([value_from_sqlite(val) for val in row]) + + if cursor.rowcount >= 0: + affected_row_count = cursor.rowcount + else: + affected_row_count = 0 + return {"cols": cols, "rows": rows, "affected_row_count": affected_row_count} + + def value_to_sqlite(value): + if value["type"] == "null": + return None + elif value["type"] == "integer": + return int(value["value"]) + elif value["type"] == "float": + return float(value["value"]) + elif value["type"] == "text": + return str(value["value"]) + elif value["type"] == "blob": + return base64.b64decode(value["base64"]) + else: + raise RuntimeError(f"Unknown value: {value!r}") + + def value_from_sqlite(value): + if value is None: + return {"type": "null"} + elif isinstance(value, int): + return {"type": "integer", "value": str(value)} + elif isinstance(value, float): + return {"type": "float", "value": value} + elif isinstance(value, str): + return {"type": "text", "value": value} + elif isinstance(value, bytes): + return {"type": "blob", "value": base64.b64encode(value)} + else: + raise RuntimeError(f"Unknown SQLite value: {value!r}") + + async def handle_msg(msg): + if msg["type"] == "request": + response = await handle_request(msg["request"]) + await send_msg({ + "type": "response_ok", + "request_id": msg["request_id"], + "response": response, + }) + else: + raise RuntimeError(f"Unknown msg: {msg!r}") + + + hello_msg = await recv_msg() + assert hello_msg.get("type") == "hello" + await send_msg({"type": "hello_ok"}) + + try: + while True: + msg = await recv_msg() + if msg is None: + break + await handle_msg(msg) + except websockets.exceptions.ConnectionClosedError: + pass + finally: + for stream in streams.values(): + stream.conn.close() + os.unlink(db_file) + +try: + asyncio.run(main()) +except KeyboardInterrupt: + print() diff --git a/tsconfig.base.json b/tsconfig.base.json new file mode 100644 index 0000000..4c87976 --- /dev/null +++ b/tsconfig.base.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "moduleResolution": "node", + "lib": ["esnext"], + "target": "esnext", + + "esModuleInterop": true, + "isolatedModules": true, + + "rootDir": "src/", + + "strict": true + }, + "include": ["src/"], + "exclude": ["**/__tests__"] +} diff --git a/tsconfig.build-cjs.json b/tsconfig.build-cjs.json new file mode 100644 index 0000000..857027a --- /dev/null +++ b/tsconfig.build-cjs.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.base.json", + "compilerOptions": { + "module": "commonjs", + "declaration": false, + "outDir": "./lib-cjs/" + } +} + diff --git a/tsconfig.build-esm.json b/tsconfig.build-esm.json new file mode 100644 index 0000000..9a01705 --- /dev/null +++ b/tsconfig.build-esm.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.base.json", + "compilerOptions": { + "module": "esnext", + "declaration": true, + "outDir": "./lib-esm/" + } +} + diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..bc06427 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "./tsconfig.base.json", + "compilerOptions": { + "noEmit": true, + "incremental": true + } +}