Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Jan 14, 2025
1 parent e5ea345 commit aca75d2
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/bun.js/bindings/ErrorCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export default [
["ERR_POSTGRES_IDLE_TIMEOUT", Error, "PostgresError"],
["ERR_POSTGRES_CONNECTION_TIMEOUT", Error, "PostgresError"],
["ERR_POSTGRES_LIFETIME_TIMEOUT", Error, "PostgresError"],
["ERR_POSTGRES_INVALID_TRANSACTION_STATE", Error, "PostgresError"],

// S3
["ERR_S3_MISSING_CREDENTIALS", Error],
Expand Down
224 changes: 221 additions & 3 deletions src/js/bun/sql.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const { hideFromStack } = require("internal/shared");

const enum QueryStatus {
active = 1 << 1,
cancelled = 1 << 2,
Expand All @@ -15,6 +17,11 @@ const enum SSLMode {
verify_full = 4,
}

function connectionClosedError() {
return $ERR_POSTGRES_CONNECTION_CLOSED("Connection closed");
}
hideFromStack(connectionClosedError);

class SQLResultArray extends PublicArray {
static [Symbol.toStringTag] = "SQLResults";

Expand All @@ -33,6 +40,7 @@ const _run = Symbol("run");
const _queryStatus = Symbol("status");
const _handler = Symbol("handler");
const PublicPromise = Promise;
type TransactionCallback = (sql: (strings: string, ...values: any[]) => Query) => Promise<any>;

const {
createConnection: _createConnection,
Expand Down Expand Up @@ -228,6 +236,99 @@ init(
},
);

function onQueryFinish(onClose) {
this.queries.delete(onClose);
}
class ConnectionWithState {
pool: ConnectionPool;
connection: ReturnType<typeof createConnection>;
state: "pending" | "connected" | "closed" = "pending";
storedError: Error | null = null;
queries: Set<(err: Error) => void> = new Set();

#onConnected(err, _) {
this.storedError = err;
this.state = err ? "closed" : "connected";
this.pool.release(this);
}
#onClose(err) {
this.state = "closed";
this.connection = null;
this.storedError = err;

// remove from ready connections if its there
this.pool.readyConnections.delete(this);
const queries = new Set(this.queries);
this.queries.clear();
for (const onClose of queries) {
onClose(err);
}

// we need to reconnect
// lets use a retry strategy
// TODO: implement retry strategy, maxLifetime, idleTimeout, connectionTimeout
}
constructor(connectionInfo, pool: ConnectionPool) {
//TODO: maxLifetime, idleTimeout, connectionTimeout
this.connection = createConnection(connectionInfo, this.#onConnected.bind(this), this.#onClose.bind(this));
this.state = "pending";
this.pool = pool;
}

bindQuery(query: Query, onClose: (err: Error) => void) {
this.queries.add(onClose);
// @ts-ignore
query.finally(onQueryFinish.bind(this, onClose));
}
}
class ConnectionPool {
connectionInfo: any;

connections: ConnectionWithState[];
readyConnections: Set<ConnectionWithState>;
waitingQueue: Array<(err: Error | null, result: any) => void> = [];
constructor(connectionInfo) {
this.connectionInfo = connectionInfo;

let max = connectionInfo.max;
if (max && typeof max !== "number") {
throw $ERR_INVALID_ARG_TYPE("max", "number", max);
} else {
max = 10; // same default as postgres.js
}
if (max < 1) {
throw $ERR_INVALID_ARG_VALUE("max", max, "must be greater than 0");
}

this.connections = new Array(max);
for (let i = 0; i < max; i++) {
this.connections[i] = new ConnectionWithState(this.connectionInfo, this);
}
this.readyConnections = new Set();
}

release(connection: ConnectionWithState) {
if (this.waitingQueue.length > 0) {
const pending = this.waitingQueue.shift();
pending?.(null, connection);
} else {
this.readyConnections.add(connection);
}
}

connect(onConnected: (err: Error | null, result: any) => void) {
if (this.readyConnections.size === 0) {
// wait for connection to be released
this.waitingQueue.push(onConnected);
return;
}
// unshift
const first = this.readyConnections.values().next().value;
this.readyConnections.delete(first);
onConnected(null, first);
}
}

function createConnection(
{
hostname,
Expand Down Expand Up @@ -535,6 +636,8 @@ function SQL(o) {
storedErrorForClosedConnection,
connectionInfo = loadOptions(o);

const pool = new ConnectionPool(connectionInfo);

function connectedHandler(query, handle, err) {
if (err) {
return query.reject(err);
Expand Down Expand Up @@ -564,7 +667,7 @@ function SQL(o) {
}

function closedConnectionHandler(query, handle) {
query.reject(storedErrorForClosedConnection || new Error("Connection closed"));
query.reject(storedErrorForClosedConnection || connectionClosedError());
}

function onConnected(err, result) {
Expand Down Expand Up @@ -654,9 +757,124 @@ function SQL(o) {
return pendingSQL(strings, values);
}

sql.begin = async (options_or_fn: string | TransactionCallback, fn?: TransactionCallback) => {
/*
BEGIN; -- works on POSTGRES, MySQL, and SQLite (need to change to BEGIN TRANSACTION on MSSQL)
-- Create a SAVEPOINT
SAVEPOINT my_savepoint; -- works on POSTGRES, MySQL, and SQLite (need to change to SAVE TRANSACTION on MSSQL)
-- QUERY
-- Roll back to SAVEPOINT if needed
ROLLBACK TO SAVEPOINT my_savepoint; -- works on POSTGRES, MySQL, and SQLite (need to change to ROLLBACK TRANSACTION on MSSQL)
-- Release the SAVEPOINT
RELEASE SAVEPOINT my_savepoint; -- works on POSTGRES, MySQL, and SQLite (MSSQL dont have RELEASE SAVEPOINT you just need to transaction again)
-- Commit the transaction
COMMIT; -- works on POSTGRES, MySQL, and SQLite (need to change to COMMIT TRANSACTION on MSSQL)
-- or rollback everything
ROLLBACK; -- works on POSTGRES, MySQL, and SQLite (need to change to ROLLBACK TRANSACTION on MSSQL)
*/

// this is a big TODO we need to make sure that each created query actually uses the same connection or fails
let current_connection;
let savepoints = 0;
try {
if (closed) {
throw connectionClosedError();
}
let callback = fn;
let options: string | undefined = options_or_fn as unknown as string;
if ($isCallable(options_or_fn)) {
callback = options_or_fn as unknown as TransactionCallback;
options = undefined;
} else if (typeof options_or_fn !== "string") {
throw $ERR_INVALID_ARG_VALUE("options", options_or_fn, "must be a string");
}
if (!$isCallable(callback)) {
throw $ERR_INVALID_ARG_VALUE("fn", callback, "must be a function");
}

if (options) {
//@ts-ignore
await sql(`BEGIN ${options}`);
} else {
//@ts-ignore
await sql("BEGIN");
}
// keep track of the connection that is being used
current_connection = connection;

// we need a function able to check for the current connection
const sql_with_savepoint = function (strings, ...values) {
return sql(strings, ...values);
};
// allow flush, close, options, then, and asyncDispose to be called on the sql_with_savepoint
sql_with_savepoint.flush = sql.flush;
sql_with_savepoint.close = sql.close;
sql_with_savepoint.options = sql.options;
sql_with_savepoint.then = sql.then;
// begin is not allowed on a transaction we need to use savepoint() instead
sql_with_savepoint.begin = function () {
throw $ERR_POSTGRES_INVALID_TRANSACTION_STATE("cannot call begin on a transaction use savepoint() instead");
};
sql_with_savepoint[Symbol.asyncDispose] = sql[Symbol.asyncDispose];

// this version accepts savepoints with is basically nested transactions
sql_with_savepoint.savepoint = async (fn: TransactionCallback, name?: string) => {
let savepoint_callback = fn;

if (closed || current_connection !== connection) {
throw connectionClosedError();
}
if ($isCallable(name)) {
savepoint_callback = name as unknown as TransactionCallback;
name = "";
}
if (!$isCallable(savepoint_callback)) {
throw $ERR_INVALID_ARG_VALUE("fn", callback, "must be a function");
}
// matchs the format of the savepoint name in postgres package
const save_point_name = `s${savepoints++}${name ? `_${name}` : ""}`;

try {
await sql_with_savepoint`SAVEPOINT ${save_point_name}`;
const result = await savepoint_callback(sql_with_savepoint);
if (!closed && current_connection === connection) {
await sql_with_savepoint(`RELEASE SAVEPOINT ${save_point_name}`);
} else {
throw connectionClosedError();
}
return result;
} catch (err) {
if (!closed && current_connection === connection) {
await sql_with_savepoint(`ROLLBACK TO SAVEPOINT ${save_point_name}`);
}
throw err;
}
};

const transaction_result = await callback(sql_with_savepoint);
if (!closed && current_connection === connection) {
await sql("COMMIT");
} else {
throw connectionClosedError();
}
return transaction_result;
} catch (err) {
if (current_connection && !closed && current_connection === connection) {
await sql("ROLLBACK");
}
throw err;
}
};

sql.connect = () => {
if (closed) {
return Promise.reject(new Error("Connection closed"));
return Promise.reject(connectionClosedError());
}

if (connected) {
Expand Down Expand Up @@ -697,7 +915,7 @@ function SQL(o) {

sql.then = () => {
if (closed) {
return Promise.reject(new Error("Connection closed"));
return Promise.reject(connectionClosedError());
}

if (connected) {
Expand Down
15 changes: 11 additions & 4 deletions src/sql/postgres.zig
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,14 @@ pub const PostgresSQLQuery = struct {

pub fn call(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const arguments = callframe.arguments_old(4).slice();
const query = arguments[0];
const values = arguments[1];
const columns = arguments[3];
var args = JSC.Node.ArgumentsSlice.init(globalThis.bunVM(), arguments);
defer args.deinit();
const query = args.nextEat() orelse {
return globalThis.throw("query must be a string", .{});
};
const values = args.nextEat() orelse {
return globalThis.throw("values must be an array", .{});
};

if (!query.isString()) {
return globalThis.throw("query must be a string", .{});
Expand All @@ -496,7 +501,9 @@ pub const PostgresSQLQuery = struct {
return globalThis.throw("values must be an array", .{});
}

const pending_value = arguments[2];
const pending_value = args.nextEat() orelse .undefined;
const columns = args.nextEat() orelse .undefined;

if (!pending_value.jsType().isArrayLike()) {
return globalThis.throwInvalidArgumentType("query", "pendingValue", "Array");
}
Expand Down

0 comments on commit aca75d2

Please sign in to comment.