Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into release-please--branches--main--components--…
Browse files Browse the repository at this point in the history
…spanner
surbhigarg92 authored Jan 3, 2025
2 parents 1f2c8d5 + b467380 commit 0bb05ad
Showing 9 changed files with 518 additions and 50 deletions.
2 changes: 2 additions & 0 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ import {Instance, MutationGroup, Spanner} from '../src';
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
import {MockError} from '../test/mockserver/mockspanner';
import {FakeSessionFactory} from '../test/database';
const {generateWithAllSpansHaveDBName} = require('./helper');

const fakePfy = extend({}, pfy, {
@@ -234,6 +235,7 @@ describe('Database', () => {
'./codec': {codec: fakeCodec},
'./partial-result-stream': {partialResultStream: fakePartialResultStream},
'./session-pool': {SessionPool: FakeSessionPool},
'./session-factory': {SessionFactory: FakeSessionFactory},
'./session': {Session: FakeSession},
'./table': {Table: FakeTable},
'./transaction-runner': {
21 changes: 9 additions & 12 deletions src/database.ts
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ import {
} from 'google-gax';
import {Backup} from './backup';
import {BatchTransaction, TransactionIdentifier} from './batch-transaction';
import {SessionFactory, SessionFactoryInterface} from './session-factory';
import {
google as databaseAdmin,
google,
@@ -111,7 +112,6 @@ import {
setSpanErrorAndException,
traceConfig,
} from './instrument';

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse
@@ -339,6 +339,7 @@ class Database extends common.GrpcServiceObject {
private instance: Instance;
formattedName_: string;
pool_: SessionPoolInterface;
sessionFactory_: SessionFactoryInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
commonHeaders_: {[k: string]: string};
request: DatabaseRequest;
@@ -450,15 +451,6 @@ class Database extends common.GrpcServiceObject {
},
} as {} as ServiceObjectConfig);

this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
this.labels = poolOptions.labels || null;
@@ -480,8 +472,13 @@ class Database extends common.GrpcServiceObject {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.sessionFactory_ = new SessionFactory(this, name, poolOptions);
this.pool_ = this.sessionFactory_.getPool();
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
this.queryOptions_ = Object.assign(
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
8 changes: 6 additions & 2 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
import {EventEmitter} from 'events';
import {Database} from './database';
import {Session} from './session';
import {GetSessionCallback} from './session-pool';
import {GetSessionCallback} from './session-factory';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
@@ -38,7 +38,7 @@ export const MUX_SESSION_CREATE_ERROR = 'mux-session-create-error';
* @constructs MultiplexedSessionInterface
* @param {Database} database The database to create a multiplexed session for.
*/
export interface MultiplexedSessionInterface {
export interface MultiplexedSessionInterface extends EventEmitter {
/**
* When called creates a multiplexed session.
*
@@ -71,6 +71,7 @@ export class MultiplexedSession
database: Database;
// frequency to create new mux session
refreshRate: number;
isMultiplexedEnabled: boolean;
_multiplexedSession: Session | null;
_refreshHandle!: NodeJS.Timer;
_observabilityOptions?: ObservabilityOptions;
@@ -81,6 +82,9 @@ export class MultiplexedSession
this.refreshRate = 7;
this._multiplexedSession = null;
this._observabilityOptions = database._observabilityOptions;
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true'
? (this.isMultiplexedEnabled = true)
: (this.isMultiplexedEnabled = false);
}

/**
161 changes: 161 additions & 0 deletions src/session-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {Database, Session, Transaction} from '.';
import {
MultiplexedSession,
MultiplexedSessionInterface,
} from './multiplexed-session';
import {
SessionPool,
SessionPoolInterface,
SessionPoolOptions,
} from './session-pool';
import {SessionPoolConstructor} from './database';
import {ServiceObjectConfig} from '@google-cloud/common';
const common = require('./common-grpc/service-object');

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing session-factory logic.
*
* @interface SessionFactoryInterface
*/
export interface SessionFactoryInterface {
/**
* When called returns a session.
*
* @name SessionFactoryInterface#getSession
* @param {GetSessionCallback} callback The callback function.
*/
getSession(callback: GetSessionCallback): void;

/**
* When called returns the pool object.
*
* @name SessionFactoryInterface#getPool
*/
getPool(): SessionPoolInterface;

/**
* To be called when releasing a session.
*
* @name SessionFactoryInterface#release
* @param {Session} session The session to be released.
*/
release(session: Session): void;
}

/**
* Creates a SessionFactory object to manage the creation of
* session-pool and multiplexed session.
*
* @class
*
* @param {Database} database Database object.
* @param {String} name Name of the database.
* @param {SessionPoolOptions|SessionPoolInterface} options Session pool
* configuration options or custom pool inteface.
*/
export class SessionFactory
extends common.GrpcServiceObject
implements SessionFactoryInterface
{
multiplexedSession_: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
name: String,
poolOptions?: SessionPoolConstructor | SessionPoolOptions
) {
super({
parent: database,
id: name,
} as {} as ServiceObjectConfig);
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(database, null)
: new SessionPool(database, poolOptions);
this.pool_.on('error', this.emit.bind(database, 'error'));
this.pool_.open();
this.multiplexedSession_ = new MultiplexedSession(database);
// Multiplexed sessions should only be created if its enabled.
if ((this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled) {
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));
this.multiplexedSession_.createSession();
}
}

/**
* Retrieves a session, either a regular session or a multiplexed session, based on the environment variable configuration.
*
* If the environment variable `GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS` is set to `true`, the method will attempt to
* retrieve a multiplexed session. Otherwise, it will retrieve a session from the regular pool.
*
* @param {GetSessionCallback} callback The callback function.
*/

getSession(callback: GetSessionCallback): void {
const sessionHandler = (this.multiplexedSession_ as MultiplexedSession)
.isMultiplexedEnabled
? this.multiplexedSession_
: this.pool_;

sessionHandler!.getSession((err, session) => callback(err, session));
}

/**
* Returns the regular session pool object.
*
* @returns {SessionPoolInterface} The session pool used by current instance.
*/

getPool(): SessionPoolInterface {
return this.pool_;
}

/**
* Releases a session back to the session pool.
*
* This method returns a session to the pool after it is no longer needed.
* It is a no-op for multiplexed sessions.
*
* @param {Session} session - The session to be released. This should be an instance of `Session` that was
* previously acquired from the session pool.
*
* @throws {Error} If the session is invalid or cannot be released.
*/
release(session: Session): void {
if (
!(this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled
) {
this.pool_.release(session);
}
}
}
16 changes: 1 addition & 15 deletions src/session-pool.ts
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ import {
setSpanErrorAndException,
startTrace,
} from './instrument';

import {GetSessionCallback} from './session-factory';
import {
isDatabaseNotFoundError,
isInstanceNotFoundError,
@@ -59,20 +59,6 @@ export interface GetWriteSessionCallback {
): void;
}

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing custom session pooling logic, it should extend the
* {@link https://nodejs.org/api/events.html|EventEmitter} class and emit any
Loading

0 comments on commit 0bb05ad

Please sign in to comment.