Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alkatrivedi committed Oct 1, 2024
1 parent c030404 commit 1b85524
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 92 deletions.
178 changes: 123 additions & 55 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ import {
setSpanError,
setSpanErrorAndException,
} from './instrument';
import { GetSession, GetSessionInterface } from './session-getter';

Check failure on line 118 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·GetSession,·GetSessionInterface·` with `GetSession,·GetSessionInterface`

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
Expand Down Expand Up @@ -348,6 +349,7 @@ class Database extends common.GrpcServiceObject {
formattedName_: string;
pool_: SessionPoolInterface;
multiplexedSession_?: MultiplexedSessionInterface;
getSession_?: GetSessionInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
Expand Down Expand Up @@ -468,6 +470,7 @@ class Database extends common.GrpcServiceObject {
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this)
: new MultiplexedSession(this, multiplexedSessionOptions);
this.getSession_ = new GetSession(this);
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
}
Expand Down Expand Up @@ -3033,63 +3036,128 @@ class Database extends common.GrpcServiceObject {
const proxyStream: Transform = through.obj();
const q = {sql: query, opts: this.observabilityConfig};
return startTrace('Database.runStream', q, span => {
this.pool_.getSession((err, session) => {
if (err) {
setSpanError(span, err);
proxyStream.destroy(err);
span.end();
return;
}

span.addEvent('Using Session', {'session.id': session?.id});

const snapshot = session!.snapshot(options, this.queryOptions_);

this._releaseOnEnd(session!, snapshot, span);

Check failure on line 3039 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `⏎`
let dataReceived = false;
let dataStream = snapshot.runStream(query);

const endListener = () => {
span.end();
snapshot.end();
};
dataStream
.once('data', () => (dataReceived = true))
.once('error', err => {
setSpanError(span, err);

if (
!dataReceived &&
isSessionNotFoundError(err as grpc.ServiceError)
) {
// If it is a 'Session not found' error and we have not yet received
// any data, we can safely retry the query on a new session.
// Register the error on the session so the pool can discard it.
if (session) {
session.lastError = err as grpc.ServiceError;
}
span.addEvent('No session available', {
'session.id': session?.id,
});
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.removeListener('end', endListener);
dataStream.end();
snapshot.end();
// Create a new data stream and add it to the end user stream.
dataStream = this.runStream(query, options);
dataStream.pipe(proxyStream);
} else {
proxyStream.destroy(err);
snapshot.end();
}
})
.on('stats', stats => proxyStream.emit('stats', stats))
.on('response', response => proxyStream.emit('response', response))
.once('end', endListener)
.pipe(proxyStream);
this.getSession_?.getSession((err, session) => {
console.log("err: ", err);

Check failure on line 3041 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `"err:·"` with `'err:·'`
console.log("session: ", session);

Check failure on line 3042 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `"session:·"` with `'session:·'`
});

Check failure on line 3044 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `······`

Check failure on line 3044 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS) {
// this.multiplexedSession_?.getSession((err, session) => {
// console.log("mux session: ", session?.formattedName_);
// if (err) {
// setSpanError(span, err);
// proxyStream.destroy(err);
// span.end();
// return;
// }

Check failure on line 3054 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `··`

Check failure on line 3054 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// span.addEvent('Using Session', {'session.id': session?.id});

Check failure on line 3056 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Delete `··`

Check failure on line 3056 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// const snapshot = session!.snapshot(options, this.queryOptions_);

// let dataReceived = false;
// let dataStream = snapshot.runStream(query);

// const endListener = () => {
// span.end();
// snapshot.end();
// };
// dataStream
// .once('data', () => (dataReceived = true))
// .once('error', err => {
// setSpanError(span, err);

// if (
// !dataReceived &&
// isSessionNotFoundError(err as grpc.ServiceError)
// ) {
// // If it is a 'Session not found' error and we have not yet received
// // any data, we can safely retry the query on a new session.
// // Register the error on the session so the pool can discard it.
// if (session) {
// session.lastError = err as grpc.ServiceError;
// }
// span.addEvent('No session available', {
// 'session.id': session?.id,
// });
// // Remove the current data stream from the end user stream.
// dataStream.unpipe(proxyStream);
// dataStream.removeListener('end', endListener);
// dataStream.end();
// snapshot.end();
// // Create a new data stream and add it to the end user stream.
// dataStream = this.runStream(query, options);
// dataStream.pipe(proxyStream);
// } else {
// proxyStream.destroy(err);
// snapshot.end();
// }
// })
// .on('stats', stats => proxyStream.emit('stats', stats))
// .on('response', response => proxyStream.emit('response', response))
// .once('end', endListener)
// .pipe(proxyStream);
// });
// } else {
// this.pool_.getSession((err, session) => {
// if (err) {
// setSpanError(span, err);
// proxyStream.destroy(err);
// span.end();
// return;
// }

// span.addEvent('Using Session', {'session.id': session?.id});

// const snapshot = session!.snapshot(options, this.queryOptions_);

// this._releaseOnEnd(session!, snapshot, span);

// let dataReceived = false;
// let dataStream = snapshot.runStream(query);

// const endListener = () => {
// span.end();
// snapshot.end();
// };
// dataStream
// .once('data', () => (dataReceived = true))
// .once('error', err => {
// setSpanError(span, err);

// if (
// !dataReceived &&
// isSessionNotFoundError(err as grpc.ServiceError)
// ) {
// // If it is a 'Session not found' error and we have not yet received
// // any data, we can safely retry the query on a new session.
// // Register the error on the session so the pool can discard it.
// if (session) {
// session.lastError = err as grpc.ServiceError;
// }
// span.addEvent('No session available', {
// 'session.id': session?.id,
// });
// // Remove the current data stream from the end user stream.
// dataStream.unpipe(proxyStream);
// dataStream.removeListener('end', endListener);
// dataStream.end();
// snapshot.end();
// // Create a new data stream and add it to the end user stream.
// dataStream = this.runStream(query, options);
// dataStream.pipe(proxyStream);
// } else {
// proxyStream.destroy(err);
// snapshot.end();
// }
// })
// .on('stats', stats => proxyStream.emit('stats', stats))
// .on('response', response => proxyStream.emit('response', response))
// .once('end', endListener)
// .pipe(proxyStream);
// });
// }

finished(proxyStream, err => {
if (err) {
Expand Down
48 changes: 12 additions & 36 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import {EventEmitter} from 'events';
import * as is from 'is';
import PQueue from 'p-queue';
import { GoogleError } from 'google-gax';
import {google} from '../protos/protos';
import {Database} from './database';
import {Session} from './session';
import {Transaction} from './transaction';
import {NormalCallback} from './common';

interface MultiplexedSessionInventory {
multiplexedSession: Session | null;
}
Expand Down Expand Up @@ -40,15 +36,14 @@ export interface MultiplexedSessionOptions {
}

const DEFAULTS: MultiplexedSessionOptions = {
refreshRate: 10,
refreshRate: 30,
concurrency: Infinity,
databaseRole: null,
};

export class MultiplexedSession extends EventEmitter implements MultiplexedSessionInterface {
database: Database;
multiplexedSessionOptions: MultiplexedSessionOptions;
multiplexedSessionPromise: Promise<Session> | undefined;
_acquires: PQueue;
_multiplexedInventory!: MultiplexedSessionInventory;
_pingHandle!: NodeJS.Timer;
Expand All @@ -67,7 +62,6 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
this._multiplexedInventory = {
multiplexedSession: null,
};
this.multiplexedSessionPromise = undefined;
this._acquires = new PQueue({
concurrency: 1,
});
Expand Down Expand Up @@ -96,31 +90,12 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
}

async _refresh(): Promise<void> {
this._multiplexedInventory.multiplexedSession?.getMetadata((err, resp) => {
if (err) {
console.error('Error fetching metadata:', err);
return;
}

if (resp?.createTime) {
const seconds = Number(resp.createTime.seconds) ?? 0;
const nanos = resp.createTime.nanos ?? 0;
if (typeof seconds === 'number' && typeof nanos === 'number') {
const timestampMs = seconds * 1000 + nanos / 1000000;
const createdDate = new Date(timestampMs);
const currentDate = new Date(Date.now());
const differenceInMs = Math.abs(
currentDate.getTime() - createdDate.getTime()
);
const differenceInDays = differenceInMs / (1000 * 60 * 60 * 24);
if (differenceInDays >= 7) {
this.createSession();
}
} else {
console.warn('Invalid timestamp values.');
}
}
});
const metadata = await this._multiplexedInventory.multiplexedSession?.getMetadata();
const createTime = (parseInt(metadata![0].createTime.seconds) * 1000) + (metadata![0].createTime.nanos / 1000000);
const expireTime = createTime + (7*24*60*60*1000);
if(Date.now() > expireTime) {
this.createSession();
}
}

/**
Expand All @@ -132,7 +107,9 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
this._acquire().then(
session => callback(null, session, session?.txn),
callback
);
).catch(err => {
console.log("err: ", err);
})
}

async _acquire(): Promise<Session | null> {
Expand Down Expand Up @@ -210,10 +187,9 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
);

try {
const res = await Promise.race(promises);
console.log("the response is: ", res);
await Promise.race(promises);
} catch(err) {
console.log("try is catching error");
console.log("ERROR: ",err);
} finally {
removeOnceCloseListener!();
removeListener!();
Expand Down
58 changes: 58 additions & 0 deletions src/session-getter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { EventEmitter } from "events-intercept";
import { Database, Session, Transaction } from ".";
import { MultiplexedSession, MultiplexedSessionInterface, MultiplexedSessionOptions } from "./multiplexed-session";
import { SessionPool, SessionPoolInterface, SessionPoolOptions } from "./session-pool";
import { MultiplexedSessionConstructor, SessionPoolConstructor } from "./database";

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

export interface GetSessionInterface {
getSession(callback: GetSessionCallback): void;
}

export class GetSession extends EventEmitter implements GetSessionInterface {
database: Database;
multiplexedSession_?: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
poolOptions?: SessionPoolConstructor | SessionPoolOptions,
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor
) {
super();
this.database = database;
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this.database, null)
: new SessionPool(this.database, poolOptions);
this.multiplexedSession_ =
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this.database)
: new MultiplexedSession(this.database, multiplexedSessionOptions);
}
getSession(callback: GetSessionCallback): void{
if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS==='true') {
this.multiplexedSession_?.getSession((err, session) => {
console.log("err: ", err);
err ? callback(err, null) : callback(null, session);
})
} else {
this.pool_?.getSession((err, session) => {
console.log("err: ", err);
err ? callback(err, null) : callback(null, session);
})
}
}
}
1 change: 0 additions & 1 deletion src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ export class Session extends common.GrpcServiceObject {
promisifyAll(Session, {
exclude: [
'delete',
'getMetadata',
'partitionedDml',
'snapshot',
'transaction',
Expand Down
14 changes: 14 additions & 0 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,20 @@ describe('Database', () => {
database.createSession(databaseRole, assert.ifError);
});

it('should send multiplexed correctly', done => {
const multiplexed = {multiplexed: true};
const options = {a: 'b'};
const originalOptions = extend(true, {}, options);

database.request = config => {
assert.deepStrictEqual(config.reqOpts.session.multiplexed, multiplexed.multiplexed);
assert.deepStrictEqual(options, originalOptions);
done();
};

database.createSession(multiplexed, assert.ifError);
});

describe('error', () => {
const ERROR = new Error('Error.');
const API_RESPONSE = {};
Expand Down
Loading

0 comments on commit 1b85524

Please sign in to comment.