From 75c5e5d47e1570a7346f97b580dfe286a2ce38a2 Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Mon, 11 Nov 2024 11:40:43 +0530 Subject: [PATCH] class: add class get session --- .gitignore | 1 + src/database.ts | 196 ++++------------------ src/{session-getter.ts => get-session.ts} | 41 +++-- src/instance.ts | 12 +- 4 files changed, 73 insertions(+), 177 deletions(-) rename src/{session-getter.ts => get-session.ts} (57%) diff --git a/.gitignore b/.gitignore index d4f03a0df..14050d4e4 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ system-test/*key.json .DS_Store package-lock.json __pycache__ +.vscode \ No newline at end of file diff --git a/src/database.ts b/src/database.ts index ad560d495..5dbd2b9c6 100644 --- a/src/database.ts +++ b/src/database.ts @@ -96,10 +96,15 @@ import {finished, Duplex, Readable, Transform} from 'stream'; import {PreciseDate} from '@google-cloud/precise-date'; import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.'; import { - MultiplexedSession, MultiplexedSessionInterface, MultiplexedSessionOptions, } from './multiplexed-session'; + +import { GetSession, GetSessionInterface } from './get-session'; + +export interface MultiplexedSessionConstructor { + new (database: Database): MultiplexedSessionInterface; +} import arrify = require('arrify'); import {ServiceError} from 'google-gax'; import IPolicy = google.iam.v1.IPolicy; @@ -116,7 +121,6 @@ import { setSpanErrorAndException, traceConfig, } from './instrument'; -import { GetSession, GetSessionInterface } from './session-getter'; export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, @@ -348,9 +352,7 @@ export interface RestoreOptions { class Database extends common.GrpcServiceObject { private instance: Instance; formattedName_: string; - pool_: SessionPoolInterface; - multiplexedSession_?: MultiplexedSessionInterface; - getSession_?: GetSessionInterface; + sessionFactory_: GetSessionInterface; queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions; resourceHeader_: {[k: string]: string}; request: DatabaseRequest; @@ -365,9 +367,7 @@ class Database extends common.GrpcServiceObject { name: string, poolOptions?: SessionPoolConstructor | SessionPoolOptions, queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions, - multiplexedSessionOptions?: - | MultiplexedSessionOptions - | MultiplexedSessionConstructor + multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor, ) { const methods = { /** @@ -464,21 +464,6 @@ class Database extends common.GrpcServiceObject { }, } as {} as ServiceObjectConfig); - this.pool_ = - typeof poolOptions === 'function' - ? new (poolOptions as SessionPoolConstructor)(this, null) - : new SessionPool(this, poolOptions); - this.multiplexedSession_ = - typeof multiplexedSessionOptions === 'function' - ? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this) - : new MultiplexedSession(this, multiplexedSessionOptions); - this.getSession_ = new GetSession(this); - - const sessionPoolInstance = this.pool_ as SessionPool; - if (sessionPoolInstance) { - sessionPoolInstance._observabilityOptions = - instance._observabilityOptions; - } if (typeof poolOptions === 'object') { this.databaseRole = poolOptions.databaseRole || null; } @@ -494,14 +479,21 @@ class Database extends common.GrpcServiceObject { [CLOUD_RESOURCE_HEADER]: this.formattedName_, }; this.request = instance.request; - this._observabilityOptions = instance._observabilityOptions; // eslint-disable-next-line @typescript-eslint/no-explicit-any this.requestStream = instance.requestStream as any; - this.pool_.on('error', this.emit.bind(this, 'error')); - this.pool_.open(); - this.multiplexedSession_.createSession(); - //creating multiplexed session - // this.database.createSession({multiplexed: true}); + this.pool_ = + typeof poolOptions === 'function' + ? new (poolOptions as SessionPoolConstructor)(this, null) + : new SessionPool(this, poolOptions); + + this.sessionFactory_ = new GetSession(this, name, poolOptions, multiplexedSessionOptions); + this.pool_ = this.sessionFactory_.getPool(); + this.multiplexedSession_ = this.sessionFactory_.getMultiplexedSession(); + const sessionPoolInstance = this.pool_ as SessionPool; + if (sessionPoolInstance) { + sessionPoolInstance._observabilityOptions = + instance._observabilityOptions; + } this.queryOptions_ = Object.assign( Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() @@ -1013,10 +1005,6 @@ class Database extends common.GrpcServiceObject { reqOpts.session.creatorRole = options.databaseRole || this.databaseRole || null; - if (options.multiplexed) { - reqOpts.session.multiplexed = true; - } - const headers = this.resourceHeader_; if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); @@ -1158,16 +1146,18 @@ class Database extends common.GrpcServiceObject { * @returns {Transaction} */ private _releaseOnEnd(session: Session, transaction: Snapshot, span: Span) { - transaction.once('end', () => { - try { - this.pool_.release(session); - } catch (e) { - setSpanErrorAndException(span, e as Error); - this.emit('error', e); - } finally { - span.end(); - } - }); + if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS!=='true') { + transaction.once('end', () => { + try { + this.pool_.release(session); + } catch (e) { + setSpanErrorAndException(span, e as Error); + this.emit('error', e); + } finally { + span.end(); + } + }); + } } /** * @typedef {array} DatabaseDeleteResponse @@ -3076,7 +3066,8 @@ class Database extends common.GrpcServiceObject { ...this._traceConfig, }; return startTrace('Database.runStream', traceConfig, span => { - this.pool_.getSession((err, session) => { + this.sessionFactory_.getSession((err, session) => { + console.log("session: ", session?.formattedName_); if (err) { setSpanError(span, err); proxyStream.destroy(err); @@ -3133,123 +3124,6 @@ class Database extends common.GrpcServiceObject { .once('end', endListener) .pipe(proxyStream); }); - - // if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS) { - // this.multiplexedSession_?.getSession((err, session) => { - // console.log("mux session: ", session?.formattedName_); - // if (err) { - // setSpanError(span, err); - // proxyStream.destroy(err); - // span.end(); - // return; - // } - - // span.addEvent('Using Session', {'session.id': session?.id}); - - // const snapshot = session!.snapshot(options, this.queryOptions_); - - // let dataReceived = false; - // let dataStream = snapshot.runStream(query); - - // const endListener = () => { - // span.end(); - // snapshot.end(); - // }; - // dataStream - // .once('data', () => (dataReceived = true)) - // .once('error', err => { - // setSpanError(span, err); - - // if ( - // !dataReceived && - // isSessionNotFoundError(err as grpc.ServiceError) - // ) { - // // If it is a 'Session not found' error and we have not yet received - // // any data, we can safely retry the query on a new session. - // // Register the error on the session so the pool can discard it. - // if (session) { - // session.lastError = err as grpc.ServiceError; - // } - // span.addEvent('No session available', { - // 'session.id': session?.id, - // }); - // // Remove the current data stream from the end user stream. - // dataStream.unpipe(proxyStream); - // dataStream.removeListener('end', endListener); - // dataStream.end(); - // snapshot.end(); - // // Create a new data stream and add it to the end user stream. - // dataStream = this.runStream(query, options); - // dataStream.pipe(proxyStream); - // } else { - // proxyStream.destroy(err); - // snapshot.end(); - // } - // }) - // .on('stats', stats => proxyStream.emit('stats', stats)) - // .on('response', response => proxyStream.emit('response', response)) - // .once('end', endListener) - // .pipe(proxyStream); - // }); - // } else { - // this.pool_.getSession((err, session) => { - // if (err) { - // setSpanError(span, err); - // proxyStream.destroy(err); - // span.end(); - // return; - // } - - // span.addEvent('Using Session', {'session.id': session?.id}); - - // const snapshot = session!.snapshot(options, this.queryOptions_); - - // this._releaseOnEnd(session!, snapshot, span); - - // let dataReceived = false; - // let dataStream = snapshot.runStream(query); - - // const endListener = () => { - // span.end(); - // snapshot.end(); - // }; - // dataStream - // .once('data', () => (dataReceived = true)) - // .once('error', err => { - // setSpanError(span, err); - - // if ( - // !dataReceived && - // isSessionNotFoundError(err as grpc.ServiceError) - // ) { - // // If it is a 'Session not found' error and we have not yet received - // // any data, we can safely retry the query on a new session. - // // Register the error on the session so the pool can discard it. - // if (session) { - // session.lastError = err as grpc.ServiceError; - // } - // span.addEvent('No session available', { - // 'session.id': session?.id, - // }); - // // Remove the current data stream from the end user stream. - // dataStream.unpipe(proxyStream); - // dataStream.removeListener('end', endListener); - // dataStream.end(); - // snapshot.end(); - // // Create a new data stream and add it to the end user stream. - // dataStream = this.runStream(query, options); - // dataStream.pipe(proxyStream); - // } else { - // proxyStream.destroy(err); - // snapshot.end(); - // } - // }) - // .on('stats', stats => proxyStream.emit('stats', stats)) - // .on('response', response => proxyStream.emit('response', response)) - // .once('end', endListener) - // .pipe(proxyStream); - // }); - // } finished(proxyStream, err => { if (err) { diff --git a/src/session-getter.ts b/src/get-session.ts similarity index 57% rename from src/session-getter.ts rename to src/get-session.ts index 3b6ef339c..d6d3d8692 100644 --- a/src/session-getter.ts +++ b/src/get-session.ts @@ -1,8 +1,9 @@ -import { EventEmitter } from "events-intercept"; import { Database, Session, Transaction } from "."; import { MultiplexedSession, MultiplexedSessionInterface, MultiplexedSessionOptions } from "./multiplexed-session"; import { SessionPool, SessionPoolInterface, SessionPoolOptions } from "./session-pool"; import { MultiplexedSessionConstructor, SessionPoolConstructor } from "./database"; +import { ServiceObjectConfig } from "@google-cloud/common"; +const common = require('./common-grpc/service-object'); /** * @callback GetSessionCallback @@ -20,39 +21,53 @@ export interface GetSessionCallback { export interface GetSessionInterface { getSession(callback: GetSessionCallback): void; + getPool(): SessionPoolInterface; + getMultiplexedSession(): MultiplexedSessionInterface | undefined; } -export class GetSession extends EventEmitter implements GetSessionInterface { - database: Database; +export class GetSession extends common.GrpcServiceObject implements GetSessionInterface{ multiplexedSession_?: MultiplexedSessionInterface; pool_: SessionPoolInterface; constructor( database: Database, + name: String, poolOptions?: SessionPoolConstructor | SessionPoolOptions, multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor ) { - super(); - this.database = database; + super({ + parent: database, + id: name, + } as {} as ServiceObjectConfig); this.pool_ = - typeof poolOptions === 'function' - ? new (poolOptions as SessionPoolConstructor)(this.database, null) - : new SessionPool(this.database, poolOptions); + typeof poolOptions === 'function' + ? new (poolOptions as SessionPoolConstructor)(database, null) + : new SessionPool(database, poolOptions); this.multiplexedSession_ = - typeof multiplexedSessionOptions === 'function' - ? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this.database) - : new MultiplexedSession(this.database, multiplexedSessionOptions); + typeof multiplexedSessionOptions === 'function' + ? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(database) + : new MultiplexedSession(database, multiplexedSessionOptions); + this.pool_.on('error', this.emit.bind(this, 'error')); + this.pool_.open(); + this.multiplexedSession_.createSession(); } + getSession(callback: GetSessionCallback): void{ if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS==='true') { this.multiplexedSession_?.getSession((err, session) => { - console.log("err: ", err); err ? callback(err, null) : callback(null, session); }) } else { this.pool_?.getSession((err, session) => { - console.log("err: ", err); err ? callback(err, null) : callback(null, session); }) } } + + getPool(): SessionPoolInterface { + return this.pool_; + } + + getMultiplexedSession(): MultiplexedSessionInterface | undefined { + return this.multiplexedSession_; + } } \ No newline at end of file diff --git a/src/instance.ts b/src/instance.ts index 4986e3ecd..3b579aa67 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -21,7 +21,7 @@ const common = require('./common-grpc/service-object'); import {promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import snakeCase = require('lodash.snakecase'); -import {Database, SessionPoolConstructor} from './database'; +import {Database, MultiplexedSessionConstructor, SessionPoolConstructor} from './database'; import {Spanner, RequestConfig} from '.'; import { RequestCallback, @@ -52,6 +52,7 @@ import {google as databaseAdmin} from '../protos/protos'; import {google as spannerClient} from '../protos/protos'; import {CreateInstanceRequest} from './index'; import {ObservabilityOptions} from './instrument'; +import { MultiplexedSessionOptions } from './multiplexed-session'; export type IBackup = databaseAdmin.spanner.admin.database.v1.IBackup; export type IDatabase = databaseAdmin.spanner.admin.database.v1.IDatabase; @@ -960,7 +961,8 @@ class Instance extends common.GrpcServiceObject { database( name: string, poolOptions?: SessionPoolOptions | SessionPoolConstructor, - queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions + queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions, + multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor, ): Database { if (!name) { throw new GoogleError('A name is required to access a Database object.'); @@ -975,9 +977,13 @@ class Instance extends common.GrpcServiceObject { optionsKey = optionsKey + '/' + JSON.stringify(Object.entries(queryOptions!).sort()); } + if (multiplexedSessionOptions && Object.keys(multiplexedSessionOptions).length > 0) { + optionsKey = + optionsKey + '/' + JSON.stringify(Object.entries(multiplexedSessionOptions!).sort()); + } const key = name.split('/').pop() + optionsKey; if (!this.databases_.has(key!)) { - const db = new Database(this, name, poolOptions, queryOptions); + const db = new Database(this, name, poolOptions, queryOptions, multiplexedSessionOptions); db._observabilityOptions = this._observabilityOptions; this.databases_.set(key!, db); }