diff --git a/src/bun.js/bindings/ErrorCode.ts b/src/bun.js/bindings/ErrorCode.ts index 521c62939c5996..1cd6e4f926a24f 100644 --- a/src/bun.js/bindings/ErrorCode.ts +++ b/src/bun.js/bindings/ErrorCode.ts @@ -148,6 +148,7 @@ export default [ ["ERR_POSTGRES_CONNECTION_TIMEOUT", Error, "PostgresError"], ["ERR_POSTGRES_LIFETIME_TIMEOUT", Error, "PostgresError"], ["ERR_POSTGRES_INVALID_TRANSACTION_STATE", Error, "PostgresError"], + ["ERR_POSTGRES_QUERY_CANCELLED", Error, "PostgresError"], // S3 ["ERR_S3_MISSING_CREDENTIALS", Error], diff --git a/src/js/bun/sql.ts b/src/js/bun/sql.ts index 1cb05c8d57127e..5dfb3bb13bae40 100644 --- a/src/js/bun/sql.ts +++ b/src/js/bun/sql.ts @@ -245,13 +245,35 @@ class ConnectionWithState { state: "pending" | "connected" | "closed" = "pending"; storedError: Error | null = null; queries: Set<(err: Error) => void> = new Set(); + onFinish: ((err: Error | null) => void) | null = null; + canBeConnected: boolean = false; + connectionInfo: any; #onConnected(err, _) { + const connectionInfo = this.connectionInfo; + if (connectionInfo?.onconnect) { + connectionInfo.onconnect(err); + } this.storedError = err; + this.canBeConnected = !err; this.state = err ? "closed" : "connected"; + const onFinish = this.onFinish; + if (onFinish) { + // pool is closed, lets finish the connection + if (err) { + onFinish(err); + } else { + this.connection.close(); + } + return; + } this.pool.release(this); } #onClose(err) { + const connectionInfo = this.connectionInfo; + if (connectionInfo?.onclose) { + connectionInfo.onclose(err); + } this.state = "closed"; this.connection = null; this.storedError = err; @@ -260,26 +282,73 @@ class ConnectionWithState { this.pool.readyConnections.delete(this); const queries = new Set(this.queries); this.queries.clear(); + // notify all queries that the connection is closed for (const onClose of queries) { onClose(err); } - - // we need to reconnect - // lets use a retry strategy - // TODO: implement retry strategy, maxLifetime, idleTimeout, connectionTimeout + const onFinish = this.onFinish; + if (onFinish) { + onFinish(err); + return; + } } constructor(connectionInfo, pool: ConnectionPool) { //TODO: maxLifetime, idleTimeout, connectionTimeout this.connection = createConnection(connectionInfo, this.#onConnected.bind(this), this.#onClose.bind(this)); this.state = "pending"; this.pool = pool; + this.connectionInfo = connectionInfo; + } + onClose(onClose: (err: Error) => void) { + this.queries.add(onClose); } - bindQuery(query: Query, onClose: (err: Error) => void) { this.queries.add(onClose); // @ts-ignore query.finally(onQueryFinish.bind(this, onClose)); } + #doRetry() { + if (this.pool.closed) { + return; + } + // retry connection + this.connection = createConnection( + this.connectionInfo, + this.#onConnected.bind(this, this.connectionInfo), + this.#onClose.bind(this, this.connectionInfo), + ); + } + retry() { + // if pool is closed, we can't retry + if (this.pool.closed) { + return false; + } + // we need to reconnect + // lets use a retry strategy + // TODO: implement retry strategy, maxLifetime, idleTimeout, connectionTimeout + + // we can only retry if one day we are able to connect + if (this.canBeConnected) { + this.#doRetry(); + } else { + // analyse type of error to see if we can retry + switch (this.storedError?.code) { + case "ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD": + case "ERR_POSTGRES_UNKNOWN_AUTHENTICATION_METHOD": + case "ERR_POSTGRES_TLS_NOT_AVAILABLE": + case "ERR_POSTGRES_TLS_UPGRADE_FAILED": + case "ERR_POSTGRES_INVALID_SERVER_SIGNATURE": + case "ERR_POSTGRES_INVALID_SERVER_KEY": + case "ERR_POSTGRES_AUTHENTICATION_FAILED_PBKDF2": + // we can't retry this are authentication errors + return false; + default: + // we can retry + this.#doRetry(); + return true; + } + } + } } class ConnectionPool { connectionInfo: any; @@ -287,6 +356,8 @@ class ConnectionPool { connections: ConnectionWithState[]; readyConnections: Set; waitingQueue: Array<(err: Error | null, result: any) => void> = []; + poolStarted: boolean = false; + closed: boolean = false; constructor(connectionInfo) { this.connectionInfo = connectionInfo; @@ -301,28 +372,149 @@ class ConnectionPool { } this.connections = new Array(max); - for (let i = 0; i < max; i++) { - this.connections[i] = new ConnectionWithState(this.connectionInfo, this); - } this.readyConnections = new Set(); } release(connection: ConnectionWithState) { if (this.waitingQueue.length > 0) { + // we have some pending connections, lets connect them with the released connection const pending = this.waitingQueue.shift(); - pending?.(null, connection); + + pending?.(connection.storedError, connection); } else { + if (connection.state !== "connected") { + // connection is not ready, lets not add it to the ready connections + return; + } + // connection is ready, lets add it to the ready connections this.readyConnections.add(connection); } } + isConnected() { + if (this.readyConnections.size > 0) { + return true; + } + if (this.poolStarted) { + for (let i = 0; i < this.connections.length; i++) { + const connection = this.connections[i]; + if (connection.state === "connected") { + return true; + } + } + } + return false; + } + flush() { + if (this.closed) { + return; + } + if (this.poolStarted) { + this.poolStarted = false; + for (let i = 0; i < this.connections.length; i++) { + const connection = this.connections[i]; + if (connection.state === "connected") { + connection.connection.flush(); + } + } + } + } + close() { + if (this.closed) { + return Promise.reject(connectionClosedError()); + } + this.closed = true; + let pending; + while ((pending = this.waitingQueue.shift())) { + pending(connectionClosedError(), null); + } + const promises: Array> = []; + if (this.poolStarted) { + this.poolStarted = false; + for (let i = 0; i < this.connections.length; i++) { + const connection = this.connections[i]; + switch (connection.state) { + case "pending": + { + const { promise, resolve } = Promise.withResolvers(); + connection.onFinish = resolve; + promises.push(promise); + } + break; + case "connected": + { + const { promise, resolve } = Promise.withResolvers(); + connection.onFinish = resolve; + promises.push(promise); + connection.connection.close(); + } + break; + } + // clean connection reference + // @ts-ignore + this.connections[i] = null; + } + } + this.readyConnections.clear(); + this.waitingQueue.length = 0; + return Promise.all(promises); + } connect(onConnected: (err: Error | null, result: any) => void) { + if (this.closed) { + return onConnected(connectionClosedError(), null); + } if (this.readyConnections.size === 0) { - // wait for connection to be released + // no connection ready lets make some + let retry_in_progress = false; + let all_closed = true; + let storedError: Error | null = null; + + if (this.poolStarted) { + // we already started the pool + // lets check if some connection is available to retry + const pollSize = this.connections.length; + for (let i = 0; i < pollSize; i++) { + const connection = this.connections[i]; + // we need a new connection and we have some connections that can retry + if (connection.state === "closed") { + if (connection.retry()) { + // lets wait for connection to be released + if (!retry_in_progress) { + // avoid adding to the queue twice, we wanna to retry every available pool connection + retry_in_progress = true; + this.waitingQueue.push(onConnected); + } + } else { + // we have some error, lets grab it and fail if unable to start a connection + storedError = connection.storedError; + } + } else { + // we have some pending or open connections + all_closed = false; + } + } + + if (!all_closed && !retry_in_progress) { + // is possible to connect because we have some working connections, or we are just without network for some reason + // wait for connection to be released or fail + this.waitingQueue.push(onConnected); + } else { + // impossible to connect or retry + onConnected(storedError, null); + } + return; + } + // we never started the pool, lets start it this.waitingQueue.push(onConnected); + this.poolStarted = true; + const pollSize = this.connections.length; + for (let i = 0; i < pollSize; i++) { + this.connections[i] = new ConnectionWithState(this.connectionInfo, this); + } return; } - // unshift + + // we have some connection ready const first = this.readyConnections.values().next().value; this.readyConnections.delete(first); onConnected(null, first); @@ -628,106 +820,206 @@ function loadOptions(o) { } function SQL(o) { - var connection, - connected = false, - connecting = false, - closed = false, - onConnect: any[] = [], - storedErrorForClosedConnection, - connectionInfo = loadOptions(o); + var connectionInfo = loadOptions(o); + var pool = new ConnectionPool(connectionInfo); + + function doCreateQuery(strings, values) { + const sqlString = normalizeStrings(strings, values); + let columns; + if (hasSQLArrayParameter) { + hasSQLArrayParameter = false; + const v = values[0]; + columns = v.columns; + values = v.value; + } - const pool = new ConnectionPool(connectionInfo); + return createQuery(sqlString, values, new SQLResultArray(), columns); + } - function connectedHandler(query, handle, err) { + function onQueryDisconnected(err) { + // connection closed mid query this will not be called if the query finishes first + const query = this; if (err) { return query.reject(err); } - - if (!connected) { - return query.reject(storedErrorForClosedConnection || new Error("Not connected")); - } - + // query is cancelled when waiting for a connection from the pool if (query.cancelled) { - return query.reject(new Error("Query cancelled")); + return query.reject($ERR_POSTGRES_QUERY_CANCELLED("Query cancelled")); } - - handle.run(connection, query); - - // if the above throws, we don't want it to be in the array. - // This array exists mostly to keep the in-flight queries alive. - connection.queries.push(query); } - function pendingConnectionHandler(query, handle) { - onConnect.push(err => connectedHandler(query, handle, err)); - if (!connecting) { - connecting = true; - connection = createConnection(connectionInfo, onConnected, onClose); + function onQueryConnected(handle, err, pooledConnection) { + const query = this; + if (err) { + // fail to aquire a connection from the pool + return query.reject(err); + } + // query is cancelled when waiting for a connection from the pool + if (query.cancelled) { + pool.release(pooledConnection); // release the connection back to the pool + return query.reject($ERR_POSTGRES_QUERY_CANCELLED("Query cancelled")); } - } - function closedConnectionHandler(query, handle) { - query.reject(storedErrorForClosedConnection || connectionClosedError()); + // bind close event to the query (will unbind and auto release the connection when the query is finished) + pooledConnection.bindQuery(query, onQueryDisconnected.bind(query)); + handle.run(pooledConnection.connection, query); } - - function onConnected(err, result) { - connected = !err; - for (const handler of onConnect) { - handler(err); + function queryFromPoolHandler(query, handle, err) { + if (err) { + // fail to create query + return query.reject(err); } - onConnect = []; - - if (connected && connectionInfo?.onconnect) { - connectionInfo.onconnect(err); + // query is cancelled + if (query.cancelled) { + return query.reject($ERR_POSTGRES_QUERY_CANCELLED("Query cancelled")); } + + pool.connect(onQueryConnected.bind(query, handle)); + } + function queryFromPool(strings, values) { + return new Query(doCreateQuery(strings, values), queryFromPoolHandler); } - function onClose(err, queries) { - closed = true; - storedErrorForClosedConnection = err; - if (sql === lazyDefaultSQL) { - resetDefaultSQL(initialDefaultSQL); + function onTransactionQueryDisconnected(query) { + const transactionQueries = this; + transactionQueries.delete(query); + } + function queryFromTransactionHandler(transactionQueries, query, handle, err) { + const pooledConnection = this; + if (err) { + return query.reject(err); } - - onConnected(err, undefined); - if (queries) { - const queriesCopy = queries.slice(); - queries.length = 0; - for (const handler of queriesCopy) { - handler.reject(err); - } + // query is cancelled + if (query.cancelled) { + return query.reject($ERR_POSTGRES_QUERY_CANCELLED("Query cancelled")); } - - if (connectionInfo?.onclose) { - connectionInfo.onclose(err); + // keep the query alive until we finish the transaction or the query + transactionQueries.add(query); + query.finally(onTransactionQueryDisconnected.bind(transactionQueries, query)); + handle.run(pooledConnection.connection, query); + } + function queryFromTransaction(strings, values, pooledConnection, transactionQueries) { + return new Query( + doCreateQuery(strings, values), + queryFromTransactionHandler.bind(pooledConnection, transactionQueries), + ); + } + function onTransactionDisconnected(err) { + const reject = this.reject; + this.closed = true; + if (err) { + return reject(err); } } - - function doCreateQuery(strings, values) { - const sqlString = normalizeStrings(strings, values); - let columns; - if (hasSQLArrayParameter) { - hasSQLArrayParameter = false; - const v = values[0]; - columns = v.columns; - values = v.value; + async function onTransactionConnected(options, resolve, reject, err, pooledConnection) { + const callback = this as unknown as TransactionCallback; + if (err) { + return reject(err); } + const state = { + closed: false, + reject, + }; + const onClose = onTransactionDisconnected.bind(state); + pooledConnection.onClose(onClose); + let savepoints = 0; + let transactionQueries = new Set(); - return createQuery(sqlString, values, new SQLResultArray(), columns); - } + function transaction_sql(strings, ...values) { + if (state.closed) { + return Promise.reject(connectionClosedError()); + } + if ($isJSArray(strings) && strings[0] && typeof strings[0] === "object") { + return new SQLArrayParameter(strings, values); + } - function connectedSQL(strings, values) { - return new Query(doCreateQuery(strings, values), connectedHandler); - } + return queryFromTransaction(strings, values, pooledConnection, transactionQueries); + } + transaction_sql.connect = () => { + if (state.closed) { + return Promise.reject(connectionClosedError()); + } + return Promise.resolve(transaction_sql); + }; + // begin is not allowed on a transaction we need to use savepoint() instead + transaction_sql.begin = function () { + throw $ERR_POSTGRES_INVALID_TRANSACTION_STATE("cannot call begin inside a transaction use savepoint() instead"); + }; + + transaction_sql.flush = function () { + if (state.closed) { + throw connectionClosedError(); + } + return pooledConnection.flush(); + }; + transaction_sql.close = async function () { + // we dont actually close the connection here, we just set the state to closed and rollback the transaction + if (state.closed) { + return Promise.reject(connectionClosedError()); + } + await transaction_sql("ROLLBACK"); + state.closed = true; + }; + transaction_sql[Symbol.asyncDispose] = () => transaction_sql.close(); + transaction_sql.then = transaction_sql.connect; + transaction_sql.options = sql.options; - function closedSQL(strings, values) { - return new Query(undefined, closedConnectionHandler); - } + transaction_sql.savepoint = async (fn: TransactionCallback, name?: string) => { + let savepoint_callback = fn; - function pendingSQL(strings, values) { - return new Query(doCreateQuery(strings, values), pendingConnectionHandler); + if (state.closed) { + throw connectionClosedError(); + } + if ($isCallable(name)) { + savepoint_callback = name as unknown as TransactionCallback; + name = ""; + } + if (!$isCallable(savepoint_callback)) { + throw $ERR_INVALID_ARG_VALUE("fn", callback, "must be a function"); + } + // matchs the format of the savepoint name in postgres package + const save_point_name = `s${savepoints++}${name ? `_${name}` : ""}`; + await transaction_sql(`SAVEPOINT ${save_point_name}`); + + try { + const result = await savepoint_callback(transaction_sql); + await transaction_sql(`RELEASE SAVEPOINT ${save_point_name}`); + return result; + } catch (err) { + if (!state.closed) { + await transaction_sql(`ROLLBACK TO SAVEPOINT ${save_point_name}`); + } + throw err; + } + }; + let transaction_started = false; + try { + if (options) { + //@ts-ignore + await transaction_sql(`BEGIN ${options}`); + } else { + //@ts-ignore + await transaction_sql("BEGIN"); + } + transaction_started = true; + const transaction_result = await callback(transaction_sql); + await transaction_sql("COMMIT"); + return resolve(transaction_result); + } catch (err) { + try { + if (!state.closed && transaction_started) { + await transaction_sql("ROLLBACK"); + } + } catch (err) { + return reject(err); + } + return reject(err); + } finally { + state.closed = true; + pooledConnection.queries.delete(onClose); + pool.release(pooledConnection); + } } - function sql(strings, ...values) { /** * const users = [ @@ -746,15 +1038,7 @@ function SQL(o) { return new SQLArrayParameter(strings, values); } - if (closed) { - return closedSQL(strings, values); - } - - if (connected) { - return connectedSQL(strings, values); - } - - return pendingSQL(strings, values); + return queryFromPool(strings, values); } sql.begin = async (options_or_fn: string | TransactionCallback, fn?: TransactionCallback) => { @@ -779,157 +1063,61 @@ function SQL(o) { */ - // this is a big TODO we need to make sure that each created query actually uses the same connection or fails - let current_connection; - let savepoints = 0; - try { - if (closed) { - throw connectionClosedError(); - } - let callback = fn; - let options: string | undefined = options_or_fn as unknown as string; - if ($isCallable(options_or_fn)) { - callback = options_or_fn as unknown as TransactionCallback; - options = undefined; - } else if (typeof options_or_fn !== "string") { - throw $ERR_INVALID_ARG_VALUE("options", options_or_fn, "must be a string"); - } - if (!$isCallable(callback)) { - throw $ERR_INVALID_ARG_VALUE("fn", callback, "must be a function"); - } - - if (options) { - //@ts-ignore - await sql(`BEGIN ${options}`); - } else { - //@ts-ignore - await sql("BEGIN"); - } - // keep track of the connection that is being used - current_connection = connection; - - // we need a function able to check for the current connection - const sql_with_savepoint = function (strings, ...values) { - return sql(strings, ...values); - }; - // allow flush, close, options, then, and asyncDispose to be called on the sql_with_savepoint - sql_with_savepoint.flush = sql.flush; - sql_with_savepoint.close = sql.close; - sql_with_savepoint.options = sql.options; - sql_with_savepoint.then = sql.then; - // begin is not allowed on a transaction we need to use savepoint() instead - sql_with_savepoint.begin = function () { - throw $ERR_POSTGRES_INVALID_TRANSACTION_STATE("cannot call begin on a transaction use savepoint() instead"); - }; - sql_with_savepoint[Symbol.asyncDispose] = sql[Symbol.asyncDispose]; - - // this version accepts savepoints with is basically nested transactions - sql_with_savepoint.savepoint = async (fn: TransactionCallback, name?: string) => { - let savepoint_callback = fn; - - if (closed || current_connection !== connection) { - throw connectionClosedError(); - } - if ($isCallable(name)) { - savepoint_callback = name as unknown as TransactionCallback; - name = ""; - } - if (!$isCallable(savepoint_callback)) { - throw $ERR_INVALID_ARG_VALUE("fn", callback, "must be a function"); - } - // matchs the format of the savepoint name in postgres package - const save_point_name = `s${savepoints++}${name ? `_${name}` : ""}`; - - try { - await sql_with_savepoint`SAVEPOINT ${save_point_name}`; - const result = await savepoint_callback(sql_with_savepoint); - if (!closed && current_connection === connection) { - await sql_with_savepoint(`RELEASE SAVEPOINT ${save_point_name}`); - } else { - throw connectionClosedError(); - } - return result; - } catch (err) { - if (!closed && current_connection === connection) { - await sql_with_savepoint(`ROLLBACK TO SAVEPOINT ${save_point_name}`); - } - throw err; - } - }; - - const transaction_result = await callback(sql_with_savepoint); - if (!closed && current_connection === connection) { - await sql("COMMIT"); - } else { - throw connectionClosedError(); - } - return transaction_result; - } catch (err) { - if (current_connection && !closed && current_connection === connection) { - await sql("ROLLBACK"); - } - throw err; + if (pool.closed) { + throw connectionClosedError(); + } + let callback = fn; + let options: string | undefined = options_or_fn as unknown as string; + if ($isCallable(options_or_fn)) { + callback = options_or_fn as unknown as TransactionCallback; + options = undefined; + } else if (typeof options_or_fn !== "string") { + throw $ERR_INVALID_ARG_VALUE("options", options_or_fn, "must be a string"); } + if (!$isCallable(callback)) { + throw $ERR_INVALID_ARG_VALUE("fn", callback, "must be a function"); + } + const { promise, resolve, reject } = Promise.withResolvers(); + pool.connect(onTransactionConnected.bind(callback, options, resolve, reject)); + return promise; }; sql.connect = () => { - if (closed) { + if (pool.closed) { return Promise.reject(connectionClosedError()); } - if (connected) { + if (pool.isConnected()) { return Promise.resolve(sql); } - var { resolve, reject, promise } = Promise.withResolvers(); - onConnect.push(err => (err ? reject(err) : resolve(sql))); - if (!connecting) { - connecting = true; - connection = createConnection(connectionInfo, onConnected, onClose); - } + let { resolve, reject, promise } = Promise.withResolvers(); + const onConnected = (err, connection) => { + if (err) { + return reject(err); + } + // we are just measuring the connection here lets release it + pool.release(connection); + resolve(sql); + }; + + pool.connect(onConnected); return promise; }; sql.close = () => { - if (closed) { - return Promise.resolve(); - } - - var { resolve, promise } = Promise.withResolvers(); - onConnect.push(resolve); - connection.close(); - return promise; + return pool.close(); }; sql[Symbol.asyncDispose] = () => sql.close(); - sql.flush = () => { - if (closed || !connected) { - return; - } - - connection.flush(); - }; + sql.flush = () => pool.flush(); sql.options = connectionInfo; sql.then = () => { - if (closed) { - return Promise.reject(connectionClosedError()); - } - - if (connected) { - return Promise.resolve(sql); - } - - const { resolve, reject, promise } = Promise.withResolvers(); - onConnect.push(err => (err ? reject(err) : resolve(sql))); - if (!connecting) { - connecting = true; - connection = createConnection(connectionInfo, onConnected, onClose); - } - - return promise; + // should this wait queries to finish or just return if is connected? + return sql.connect(); }; return sql;