Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Jan 18, 2025
1 parent be7c841 commit a06f824
Showing 1 changed file with 171 additions and 38 deletions.
209 changes: 171 additions & 38 deletions src/js/bun/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,12 @@ class PooledConnection {
onFinish: ((err: Error | null) => void) | null = null;
canBeConnected: boolean = false;
connectionInfo: any;
/// reserved is used to indicate that the connection is currently reserved
reserved: boolean = false;
queriesUsing: number = 0;
/// preReserved is used to indicate that the connection will be reserved in the future when queryCount drops to 0
preReserved: boolean = false;
/// queryCount is used to indicate the number of queries using the connection, if a connection is reserved or if its a transaction queryCount will be 1 independently of the number of queries
queryCount: number = 0;
#onConnected(err, _) {
const connectionInfo = this.connectionInfo;
if (connectionInfo?.onconnect) {
Expand All @@ -266,6 +270,9 @@ class PooledConnection {
this.state = err ? "closed" : "connected";
const onFinish = this.onFinish;
if (onFinish) {
this.queryCount = 0;
this.reserved = false;

// pool is closed, lets finish the connection
if (err) {
onFinish(err);
Expand All @@ -274,7 +281,7 @@ class PooledConnection {
}
return;
}
this.pool.release(this);
this.pool.release(this, true);
}
#onClose(err) {
const connectionInfo = this.connectionInfo;
Expand All @@ -289,6 +296,8 @@ class PooledConnection {
this.pool.readyConnections.delete(this);
const queries = new Set(this.queries);
this.queries.clear();
this.queryCount = 0;
this.reserved = false;
// notify all queries that the connection is closed
for (const onClose of queries) {
onClose(err);
Expand All @@ -298,7 +307,7 @@ class PooledConnection {
onFinish(err);
}

this.pool.release(this);
this.pool.release(this, true);
}
constructor(connectionInfo, pool: ConnectionPool) {
this.connection = createConnection(connectionInfo, this.#onConnected.bind(this), this.#onClose.bind(this));
Expand Down Expand Up @@ -370,6 +379,8 @@ class ConnectionPool {
connections: PooledConnection[];
readyConnections: Set<PooledConnection>;
waitingQueue: Array<(err: Error | null, result: any) => void> = [];
reservedQueue: Array<(err: Error | null, result: any) => void> = [];

poolStarted: boolean = false;
closed: boolean = false;
constructor(connectionInfo) {
Expand All @@ -378,32 +389,103 @@ class ConnectionPool {
this.readyConnections = new Set();
}

release(connection: PooledConnection) {
flushConcurrentQueries() {
if (this.waitingQueue.length === 0) {
return;
}
while (this.waitingQueue.length > 0) {
let endReached = true;

const nonReservedConnections = Array.from(this.readyConnections).filter(c => !c.preReserved);
if (nonReservedConnections.length === 0) {
return;
}
// kinda balance the load between connections
const orderedConnections = nonReservedConnections.sort((a, b) => a.queryCount - b.queryCount);
const leastQueries = orderedConnections[0].queryCount;

for (const connection of orderedConnections) {
if (connection.queryCount > leastQueries) {
endReached = false;
break;
}

const pending = this.waitingQueue.shift();
if (pending) {
connection.queryCount++;
pending(null, connection);
}
}
const halfPoolSize = Math.ceil(this.connections.length / 2);
if (endReached || orderedConnections.length < halfPoolSize) {
// we are able to distribute the load between connections but the connection pool is less than half of the pool size
// so we can stop here and wait for the next tick to flush the waiting queue
break;
}
}
if (this.waitingQueue.length > 0) {
if (connection.storedError) {
// this connection got a error but maybe we can wait for another
// we still wanna to flush the waiting queue but lets wait for the next tick because some connections might be released
// this is better for query performance
process.nextTick(this.flushConcurrentQueries.bind(this));
}
}

release(connection: PooledConnection, connectingEvent: boolean = false) {
if (!connectingEvent) {
connection.queryCount--;
}
const was_reserved = connection.reserved;
connection.reserved = false;
connection.preReserved = false;
if (connection.state !== "connected") {
// connection is not ready
return;
}
if (was_reserved) {
if (this.waitingQueue.length > 0) {
if (connection.storedError) {
// this connection got a error but maybe we can wait for another

if (this.hasConnectionsAvailable()) {
if (this.hasConnectionsAvailable()) {
return;
}

// we have no connections available so lets fails
let pending;
while ((pending = this.waitingQueue.shift())) {
pending.onConnected(connection.storedError, connection);
}
return;
}

// we have no connections available so lets fails
let pending;
while ((pending = this.waitingQueue.shift())) {
pending(connection.storedError, connection);
const pendingReserved = this.reservedQueue.shift();
if (pendingReserved) {
connection.reserved = true;
connection.queryCount++;
// we have a connection waiting for a reserved connection lets prioritize it
pendingReserved(connection.storedError, connection);
return;
}
return;
this.flushConcurrentQueries();
} else {
// connection is ready, lets add it back to the ready connections
this.readyConnections.add(connection);
}
// we have some pending connections, lets connect them with the released connection
const pending = this.waitingQueue.shift();
pending?.(connection.storedError, connection);
} else {
if (connection.state !== "connected") {
// connection is not ready, lets not add it to the ready connections
return;
if (connection.queryCount == 0) {
// ok we can actually bind reserved queries to it
const pendingReserved = this.reservedQueue.shift();
if (pendingReserved) {
connection.reserved = true;
connection.queryCount++;
// we have a connection waiting for a reserved connection lets prioritize it
pendingReserved(connection.storedError, connection);
return;
}
}
// connection is ready, lets add it to the ready connections

this.readyConnections.add(connection);

this.flushConcurrentQueries();
}
}

Expand Down Expand Up @@ -505,13 +587,13 @@ class ConnectionPool {

/**
* @param {function} onConnected - The callback function to be called when the connection is established.
* @param {boolean} reserved - Whether the connection is reserved, if is reserved the connection will not be released until release is called, if not release will only decrement the queriesUsing counter
* @param {boolean} reserved - Whether the connection is reserved, if is reserved the connection will not be released until release is called, if not release will only decrement the queryCount counter
*/
connect(onConnected: (err: Error | null, result: any) => void, reserved: boolean = false) {
// TODO: this always reserve a connection, we should only reserve if reserved is true
if (this.closed) {
return onConnected(connectionClosedError(), null);
}

if (this.readyConnections.size === 0) {
// no connection ready lets make some
let retry_in_progress = false;
Expand All @@ -531,7 +613,12 @@ class ConnectionPool {
if (!retry_in_progress) {
// avoid adding to the queue twice, we wanna to retry every available pool connection
retry_in_progress = true;
this.waitingQueue.push(onConnected);
if (reserved) {
// we are not sure what connection will be available so we dont pre reserve
this.reservedQueue.push(onConnected);
} else {
this.waitingQueue.push(onConnected);
}
}
} else {
// we have some error, lets grab it and fail if unable to start a connection
Expand All @@ -542,31 +629,66 @@ class ConnectionPool {
all_closed = false;
}
}

if (!all_closed && !retry_in_progress) {
// is possible to connect because we have some working connections, or we are just without network for some reason
// wait for connection to be released or fail
this.waitingQueue.push(onConnected);
if (reserved) {
// we are not sure what connection will be available so we dont pre reserve
this.reservedQueue.push(onConnected);
} else {
this.waitingQueue.push(onConnected);
}
} else {
// impossible to connect or retry
onConnected(storedError, null);
}
return;
}
// we never started the pool, lets start it
this.waitingQueue.push(onConnected);
if (reserved) {
this.reservedQueue.push(onConnected);
} else {
this.waitingQueue.push(onConnected);
}
this.poolStarted = true;
const pollSize = this.connections.length;
for (let i = 0; i < pollSize; i++) {
// pool is always at least 1 connection
this.connections[0] = new PooledConnection(this.connectionInfo, this);
this.connections[0].preReserved = reserved; // lets pre reserve the first connection
for (let i = 1; i < pollSize; i++) {
this.connections[i] = new PooledConnection(this.connectionInfo, this);
}
return;
}

// we have some connection ready
const first = this.readyConnections.values().next().value;
this.readyConnections.delete(first);
onConnected(null, first);
if (reserved) {
let connectionWithLeastQueries: PooledConnection | null = null;
let leastQueries = Infinity;
for (const connection of this.readyConnections) {
if (connection.reserved || connection.preReserved) continue;
const queryCount = connection.queryCount;
if (queryCount > 0) {
if (queryCount < leastQueries) {
leastQueries = queryCount;
connectionWithLeastQueries = connection;
continue;
}
}
connection.reserved = true;
connection.queryCount++;
this.readyConnections.delete(connection);
onConnected(null, connection);
return;
}
if (connectionWithLeastQueries) {
// lets mark the connection with the least queries as preReserved if any
connectionWithLeastQueries.preReserved = true;
}
// no connection available to be reserved lets wait for a connection to be released
this.reservedQueue.push(onConnected);
} else {
this.waitingQueue.push(onConnected);
this.flushConcurrentQueries();
}
}
}

Expand Down Expand Up @@ -748,7 +870,14 @@ function loadOptions(o) {
}

if (url) {
({ hostname, port, username, password, protocol: adapter } = o = url);
({ hostname, port, username, password, adapter } = o);
// object overrides url
hostname ||= url.hostname;
port ||= url.port;
username ||= url.username;
password ||= url.password;
adapter ||= url.protocol;

if (adapter[adapter.length - 1] === ":") {
adapter = adapter.slice(0, -1);
}
Expand Down Expand Up @@ -851,14 +980,18 @@ function loadOptions(o) {
port = Number(port);

if (!Number.isSafeInteger(port) || port < 1 || port > 65535) {
throw new Error(`Invalid port: ${port}`);
throw $ERR_INVALID_ARG_VALUE("port", port, "must be a non-negative integer between 1 and 65535");
}

if (adapter && !(adapter === "postgres" || adapter === "postgresql")) {
throw new Error(`Unsupported adapter: ${adapter}. Only \"postgres\" is supported for now`);
switch (adapter) {
case "postgres":
case "postgresql":
adapter = "postgres";
break;
default:
throw new Error(`Unsupported adapter: ${adapter}. Only \"postgres\" is supported for now`);
}
//TODO: when adding MySQL, SQLite or MSSQL we need to add the adapter to match
const ret: any = { hostname, port, username, password, database, tls, query, sslMode, adapter: "postgres" };
const ret: any = { hostname, port, username, password, database, tls, query, sslMode, adapter };
if (idleTimeout != null) {
ret.idleTimeout = idleTimeout;
}
Expand Down

0 comments on commit a06f824

Please sign in to comment.