Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for multiplexed session for read only transactions #2136

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ system-test/*key.json
.DS_Store
package-lock.json
__pycache__
.vscode
33 changes: 33 additions & 0 deletions samples/multiplexed-session.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict';

async function main(
instanceId = 'my-instance',
databaseId = 'my-database',
projectId = 'my-project-id'
) {
const {Spanner} = require('../build/src');
const spanner = new Spanner({
projectId: projectId,
});
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const queries = [{sql: 'SELECT 1'}, {sql: 'SELECT 2'}, {sql: 'SELECT 3'}];
async function runQueriesConcurrently() {
// await database.run('SELECT 1');
const promises = queries.map(async query => {
const [rows] = await database.run(query);
console.log(`Query: ${query.sql} returned ${rows.length} rows.`);
rows.forEach(row => console.log(row));
});

await Promise.all(promises);
}
runQueriesConcurrently();
}

process.on('unhandledRejection', err => {
console.error(err.message);
process.exitCode = 1;
});

main(...process.argv.slice(2));
70 changes: 45 additions & 25 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@
import {finished, Duplex, Readable, Transform} from 'stream';
import {PreciseDate} from '@google-cloud/precise-date';
import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.';
import {
MultiplexedSessionInterface,
MultiplexedSessionOptions,
} from './multiplexed-session';

import { GetSession, GetSessionInterface } from './get-session';

Check failure on line 103 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·GetSession,·GetSessionInterface·` with `GetSession,·GetSessionInterface`

export interface MultiplexedSessionConstructor {
new (database: Database): MultiplexedSessionInterface;
}
import arrify = require('arrify');
import {ServiceError} from 'google-gax';
import IPolicy = google.iam.v1.IPolicy;
Expand Down Expand Up @@ -151,6 +161,10 @@
EnumKey<typeof google.spanner.admin.database.v1.DatabaseDialect>
>;

export interface MultiplexedSessionConstructor {
new (database: Database): MultiplexedSessionInterface;
}

export interface SetIamPolicyRequest {
policy: Policy | null;
updateMask?: FieldMask | null;
Expand Down Expand Up @@ -338,7 +352,7 @@
class Database extends common.GrpcServiceObject {
private instance: Instance;
formattedName_: string;
pool_: SessionPoolInterface;
sessionFactory_: GetSessionInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
Expand All @@ -352,7 +366,8 @@
instance: Instance,
name: string,
poolOptions?: SessionPoolConstructor | SessionPoolOptions,
queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions
queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions,
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor,

Check failure on line 370 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·MultiplexedSessionOptions·|·MultiplexedSessionConstructor,·` with `⏎······|·MultiplexedSessionOptions⏎······|·MultiplexedSessionConstructor`

Check failure on line 370 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
) {
const methods = {
/**
Expand Down Expand Up @@ -449,15 +464,6 @@
},
} as {} as ServiceObjectConfig);

this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
}
Expand All @@ -473,11 +479,21 @@
[CLOUD_RESOURCE_HEADER]: this.formattedName_,
};
this.request = instance.request;
this._observabilityOptions = instance._observabilityOptions;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);

this.sessionFactory_ = new GetSession(this, name, poolOptions, multiplexedSessionOptions);

Check failure on line 489 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `this,·name,·poolOptions,·multiplexedSessionOptions` with `⏎······this,⏎······name,⏎······poolOptions,⏎······multiplexedSessionOptions⏎····`
this.pool_ = this.sessionFactory_.getPool();
this.multiplexedSession_ = this.sessionFactory_.getMultiplexedSession();
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
this.queryOptions_ = Object.assign(
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
Expand Down Expand Up @@ -1013,6 +1029,7 @@
}
);
}

/**
* @typedef {array} CreateTableResponse
* @property {Table} 0 The new {@link Table}.
Expand Down Expand Up @@ -1129,16 +1146,18 @@
* @returns {Transaction}
*/
private _releaseOnEnd(session: Session, transaction: Snapshot, span: Span) {
transaction.once('end', () => {
try {
this.pool_.release(session);
} catch (e) {
setSpanErrorAndException(span, e as Error);
this.emit('error', e);
} finally {
span.end();
}
});
if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS!=='true') {

Check failure on line 1149 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS!==` with `·(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS·!==·`
transaction.once('end', () => {
try {
this.pool_.release(session);
} catch (e) {
setSpanErrorAndException(span, e as Error);
this.emit('error', e);
} finally {
span.end();
}
});
}
}
/**
* @typedef {array} DatabaseDeleteResponse
Expand Down Expand Up @@ -3047,7 +3066,8 @@
...this._traceConfig,
};
return startTrace('Database.runStream', traceConfig, span => {
this.pool_.getSession((err, session) => {
this.sessionFactory_.getSession((err, session) => {
console.log("session: ", session?.formattedName_);

Check failure on line 3070 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `"session:·"` with `'session:·'`

Check warning on line 3070 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Strings must use singlequote
if (err) {
setSpanError(span, err);
proxyStream.destroy(err);
Expand Down
73 changes: 73 additions & 0 deletions src/get-session.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Database, Session, Transaction } from ".";

Check failure on line 1 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·Database,·Session,·Transaction·}·from·"."` with `Database,·Session,·Transaction}·from·'.'`

Check warning on line 1 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Strings must use singlequote
import { MultiplexedSession, MultiplexedSessionInterface, MultiplexedSessionOptions } from "./multiplexed-session";

Check failure on line 2 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·MultiplexedSession,·MultiplexedSessionInterface,·MultiplexedSessionOptions·}·from·"./multiplexed-session"` with `⏎··MultiplexedSession,⏎··MultiplexedSessionInterface,⏎··MultiplexedSessionOptions,⏎}·from·'./multiplexed-session'`

Check warning on line 2 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Strings must use singlequote
import { SessionPool, SessionPoolInterface, SessionPoolOptions } from "./session-pool";

Check failure on line 3 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·SessionPool,·SessionPoolInterface,·SessionPoolOptions·}·from·"./session-pool"` with `⏎··SessionPool,⏎··SessionPoolInterface,⏎··SessionPoolOptions,⏎}·from·'./session-pool'`
import { MultiplexedSessionConstructor, SessionPoolConstructor } from "./database";

Check failure on line 4 in src/get-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·MultiplexedSessionConstructor,·SessionPoolConstructor·}·from·"./database"` with `⏎··MultiplexedSessionConstructor,⏎··SessionPoolConstructor,⏎}·from·'./database'`
import { ServiceObjectConfig } from "@google-cloud/common";
const common = require('./common-grpc/service-object');

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

export interface GetSessionInterface {
getSession(callback: GetSessionCallback): void;
getPool(): SessionPoolInterface;
getMultiplexedSession(): MultiplexedSessionInterface | undefined;
}

export class GetSession extends common.GrpcServiceObject implements GetSessionInterface{
multiplexedSession_?: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
name: String,
poolOptions?: SessionPoolConstructor | SessionPoolOptions,
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor
) {
super({
parent: database,
id: name,
} as {} as ServiceObjectConfig);
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(database, null)
: new SessionPool(database, poolOptions);
this.multiplexedSession_ =
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(database)
: new MultiplexedSession(database, multiplexedSessionOptions);
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.multiplexedSession_.createSession();
}

getSession(callback: GetSessionCallback): void{
if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS==='true') {
this.multiplexedSession_?.getSession((err, session) => {
err ? callback(err, null) : callback(null, session);
})
} else {
this.pool_?.getSession((err, session) => {
err ? callback(err, null) : callback(null, session);
})
}
}

getPool(): SessionPoolInterface {
return this.pool_;
}

getMultiplexedSession(): MultiplexedSessionInterface | undefined {
return this.multiplexedSession_;
}
}
12 changes: 9 additions & 3 deletions src/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const common = require('./common-grpc/service-object');
import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import snakeCase = require('lodash.snakecase');
import {Database, SessionPoolConstructor} from './database';
import {Database, MultiplexedSessionConstructor, SessionPoolConstructor} from './database';
import {Spanner, RequestConfig} from '.';
import {
RequestCallback,
Expand Down Expand Up @@ -52,6 +52,7 @@ import {google as databaseAdmin} from '../protos/protos';
import {google as spannerClient} from '../protos/protos';
import {CreateInstanceRequest} from './index';
import {ObservabilityOptions} from './instrument';
import { MultiplexedSessionOptions } from './multiplexed-session';

export type IBackup = databaseAdmin.spanner.admin.database.v1.IBackup;
export type IDatabase = databaseAdmin.spanner.admin.database.v1.IDatabase;
Expand Down Expand Up @@ -960,7 +961,8 @@ class Instance extends common.GrpcServiceObject {
database(
name: string,
poolOptions?: SessionPoolOptions | SessionPoolConstructor,
queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions
queryOptions?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions,
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor,
): Database {
if (!name) {
throw new GoogleError('A name is required to access a Database object.');
Expand All @@ -975,9 +977,13 @@ class Instance extends common.GrpcServiceObject {
optionsKey =
optionsKey + '/' + JSON.stringify(Object.entries(queryOptions!).sort());
}
if (multiplexedSessionOptions && Object.keys(multiplexedSessionOptions).length > 0) {
optionsKey =
optionsKey + '/' + JSON.stringify(Object.entries(multiplexedSessionOptions!).sort());
}
const key = name.split('/').pop() + optionsKey;
if (!this.databases_.has(key!)) {
const db = new Database(this, name, poolOptions, queryOptions);
const db = new Database(this, name, poolOptions, queryOptions, multiplexedSessionOptions);
db._observabilityOptions = this._observabilityOptions;
this.databases_.set(key!, db);
}
Expand Down
Loading
Loading