diff --git a/package.json b/package.json index 0cb26beb6..d1c65ddbd 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "merge-stream": "^2.0.0", "p-queue": "^6.0.2", "protobufjs": "^6.10.1", + "pumpify": "2.0.1", "split-array-stream": "^2.0.0", "stack-trace": "0.0.10", "stream-events": "^1.0.4", diff --git a/src/database.ts b/src/database.ts index b936e5d98..34ae1fe2a 100644 --- a/src/database.ts +++ b/src/database.ts @@ -31,6 +31,7 @@ import * as through from 'through2'; import {CallOptions, grpc, Operation as GaxOperation} from 'google-gax'; import {Backup} from './backup'; import {BatchTransaction, TransactionIdentifier} from './batch-transaction'; +import * as pumpify from 'pumpify'; import { google as databaseAdmin, google, @@ -371,8 +372,7 @@ class Database extends common.GrpcServiceObject { [CLOUD_RESOURCE_HEADER]: this.formattedName_, }; this.request = instance.request; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.requestStream = instance.requestStream as any; + this.requestStream = instance.requestStream; this.pool_.on('error', this.emit.bind(this, 'error')); this.pool_.open(); this.queryOptions_ = Object.assign( @@ -1513,13 +1513,27 @@ class Database extends common.GrpcServiceObject { delete gaxOpts.pageToken; } - return this.requestStream({ - client: 'SpannerClient', - method: 'listSessionsStream', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }); + return new pumpify.obj([ + this.requestStream({ + client: 'SpannerClient', + method: 'listSessionsStream', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }), + new Transform({ + objectMode: true, + transform: ( + chunk: databaseAdmin.spanner.v1.ISession, + enc: string, + cb: Function + ) => { + const session = this.session(chunk.name!); + session.metadata = chunk; + cb(null, session); + }, + }), + ]); } getSnapshot(options?: TimestampBounds): Promise<[Snapshot]>; diff --git a/src/index.ts b/src/index.ts index f4bfb7d7e..9b255fb1d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -48,6 +48,8 @@ import {PartitionedDml, Snapshot, Transaction} from './transaction'; import grpcGcpModule = require('grpc-gcp'); const grpcGcp = grpcGcpModule(grpc); import * as v1 from './v1'; +import * as pumpify from 'pumpify'; +import {Transform} from 'stream'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -619,13 +621,27 @@ class Spanner extends GrpcService { delete gaxOpts.pageToken; } - return this.requestStream({ - client: 'InstanceAdminClient', - method: 'listInstancesStream', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }); + return new pumpify.obj([ + this.requestStream({ + client: 'InstanceAdminClient', + method: 'listInstancesStream', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }), + new Transform({ + objectMode: true, + transform: ( + chunk: instanceAdmin.spanner.admin.instance.v1.IInstance, + enc: string, + cb: Function + ) => { + const instance = this.instance(chunk.name!); + instance.metadata = chunk; + cb(null, instance); + }, + }), + ]); } getInstanceConfigs( diff --git a/src/instance.ts b/src/instance.ts index dbdfe2208..07cad6d66 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -18,6 +18,7 @@ import arrify = require('arrify'); import {ServiceObjectConfig, GetConfig} from '@google-cloud/common'; // eslint-disable-next-line @typescript-eslint/no-var-requires const common = require('./common-grpc/service-object'); +import * as pumpify from 'pumpify'; import {promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import snakeCase = require('lodash.snakecase'); @@ -33,7 +34,7 @@ import { PagedOptionsWithFilter, CLOUD_RESOURCE_HEADER, } from './common'; -import {Duplex} from 'stream'; +import {Duplex, Transform} from 'stream'; import {SessionPoolOptions, SessionPool} from './session-pool'; import {grpc, Operation as GaxOperation, CallOptions} from 'google-gax'; import {Backup} from './backup'; @@ -433,13 +434,23 @@ class Instance extends common.GrpcServiceObject { delete gaxOpts.pageToken; } - return this.requestStream({ - client: 'DatabaseAdminClient', - method: 'listBackupsStream', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }); + return new pumpify.obj([ + this.requestStream({ + client: 'DatabaseAdminClient', + method: 'listBackupsStream', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }), + new Transform({ + objectMode: true, + transform: (chunk: IBackup, enc: string, cb: Function) => { + const backup = this.backup(chunk.name!); + backup.metadata = chunk; + cb(null, backup); + }, + }), + ]); } getBackupOperations( @@ -1326,13 +1337,23 @@ class Instance extends common.GrpcServiceObject { delete gaxOpts.pageToken; } - return this.requestStream({ - client: 'DatabaseAdminClient', - method: 'listDatabasesStream', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }); + return new pumpify.obj([ + this.requestStream({ + client: 'DatabaseAdminClient', + method: 'listDatabasesStream', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }), + new Transform({ + objectMode: true, + transform: (chunk: IDatabase, enc: string, cb: Function) => { + const database = this.database(chunk.name!, {min: 0}); + database.metadata = chunk; + cb(null, database); + }, + }), + ]); } getMetadata( diff --git a/system-test/spanner.ts b/system-test/spanner.ts index 052c009d2..28ebbdf48 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -23,7 +23,7 @@ import * as crypto from 'crypto'; import * as extend from 'extend'; import * as is from 'is'; import * as uuid from 'uuid'; -import {Backup, Database, Spanner, Instance} from '../src'; +import {Backup, Database, Instance, Session, Spanner} from '../src'; import {Key} from '../src/table'; import { ReadRequest, @@ -114,7 +114,7 @@ describe('Spanner', () => { after(async () => { if (generateInstanceForTest) { // Deleting all backups before an instance can be deleted. - await Promise.all( + await Promise.all( RESOURCES_TO_CLEAN.filter( resource => resource instanceof Backup ).map(backup => backup.delete(GAX_OPTIONS)) @@ -124,7 +124,7 @@ describe('Spanner', () => { * All databasess will automatically be deleted with instance. * @see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.admin.instance.v1#google.spanner.admin.instance.v1.InstanceAdmin.DeleteInstance} */ - await Promise.all( + await Promise.all( RESOURCES_TO_CLEAN.filter( resource => resource instanceof Instance ).map(instance => instance.delete(GAX_OPTIONS)) @@ -1269,6 +1269,23 @@ describe('Spanner', () => { ); }); + it('should list backups steaming', done => { + const backups: Backup[] = []; + instance + .getBackupsStream() + .on('error', assert.ifError) + .on('data', backup => { + backups.push(backup); + }) + .on('end', () => { + assert.ok(backups.length > 0); + assert.ok( + backups.find(b => b.formattedName_ === backup1.formattedName_) + ); + done(); + }); + }); + it('should list backups with pagination', async () => { const [page1, , resp1] = await instance.getBackups({ pageSize: 1, @@ -1480,6 +1497,26 @@ describe('Spanner', () => { await Promise.all(sessions.map(session => session.delete())); }); + + it('should list sessions', async () => { + const [sessions] = await DATABASE.getSessions(); + assert.ok(sessions.length > 0); + assert.ok(sessions.find(s => s.id === session.id)); + }); + + it('should list sessions streaming', done => { + const sessions: Session[] = []; + DATABASE.getSessionsStream() + .on('error', assert.ifError) + .on('data', sessionObj => { + sessions.push(sessionObj); + }) + .on('end', () => { + assert.ok(sessions.length > 0); + assert.ok(sessions.find(s => s.id === session.id)); + done(); + }); + }); }); describe('Tables', () => { diff --git a/test/database.ts b/test/database.ts index 580db7148..54a66d70a 100644 --- a/test/database.ts +++ b/test/database.ts @@ -23,7 +23,7 @@ import * as extend from 'extend'; import {ApiError, util} from '@google-cloud/common'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; -import {Transform, Duplex} from 'stream'; +import {Transform} from 'stream'; import * as through from 'through2'; import * as pfy from '@google-cloud/promisify'; import {grpc} from 'google-gax'; @@ -32,6 +32,7 @@ import {Instance} from '../src'; import {MockError} from './mockserver/mockspanner'; import {IOperation} from '../src/instance'; import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import duplexify = require('duplexify'); import {google} from '../protos/protos'; import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType; @@ -2186,7 +2187,7 @@ describe('Database', () => { const OPTIONS = { gaxOptions: {autoPaginate: false}, } as db.GetSessionsOptions; - const returnValue = {} as Duplex; + const returnValue = through.obj(); it('should make and return the correct gax API call', () => { const expectedReqOpts = extend({}, OPTIONS, { @@ -2207,7 +2208,7 @@ describe('Database', () => { }; const returnedValue = database.getSessionsStream(OPTIONS); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should pass pageSize and pageToken from gaxOptions into reqOpts', () => { @@ -2234,7 +2235,7 @@ describe('Database', () => { }; const returnedValue = database.getSessionsStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('pageSize and pageToken in options should take precedence over gaxOptions', () => { @@ -2268,7 +2269,7 @@ describe('Database', () => { }; const returnedValue = database.getSessionsStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should not require options', () => { @@ -2283,7 +2284,29 @@ describe('Database', () => { }; const returnedValue = database.getSessionsStream(); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); + }); + + it('should create and return Session objects', done => { + const stream = through.obj(); + database.requestStream = () => { + return stream; + }; + const protoSession = {name: 'session'}; + setImmediate(() => { + stream.push(protoSession); + stream.push(null); + }); + + database + .getSessionsStream() + .on('error', assert.ifError) + .on('data', session => { + assert.ok(session instanceof FakeSession); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + assert.strictEqual((session as any).metadata, protoSession); + }) + .on('end', done); }); }); diff --git a/test/index.ts b/test/index.ts index 0a3432a42..2d388f602 100644 --- a/test/index.ts +++ b/test/index.ts @@ -36,6 +36,7 @@ import {CLOUD_RESOURCE_HEADER} from '../src/common'; // Verify that CLOUD_RESOURCE_HEADER is set to a correct value. assert.strictEqual(CLOUD_RESOURCE_HEADER, 'google-cloud-resource-prefix'); +import duplexify = require('duplexify'); // eslint-disable-next-line @typescript-eslint/no-var-requires const apiConfig = require('../src/spanner_grpc_config.json'); @@ -879,7 +880,7 @@ describe('Spanner', () => { filter: 'b', }; const ORIGINAL_OPTIONS = extend({}, OPTIONS); - const returnValue = {}; + const returnValue = through.obj(); it('should make and return the correct gax API call', () => { const expectedReqOpts = extend({}, OPTIONS, { @@ -900,7 +901,7 @@ describe('Spanner', () => { }; const returnedValue = spanner.getInstancesStream(OPTIONS); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should pass pageSize and pageToken from gaxOptions into reqOpts', () => { @@ -928,7 +929,7 @@ describe('Spanner', () => { }; const returnedValue = spanner.getInstancesStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('pageSize and pageToken in options should take precedence over gaxOptions', () => { @@ -963,7 +964,7 @@ describe('Spanner', () => { }; const returnedValue = spanner.getInstancesStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should not require options', () => { @@ -978,7 +979,28 @@ describe('Spanner', () => { }; const returnedValue = spanner.getInstancesStream(); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); + }); + + it('should create and return Instance objects', done => { + const stream = through.obj(); + spanner.requestStream = () => { + return stream; + }; + const protoInstance = {name: 'instance'}; + setImmediate(() => { + stream.push(protoInstance); + stream.push(null); + }); + spanner + .getInstancesStream() + .on('error', assert.ifError) + .on('data', instance => { + assert.ok(instance instanceof FakeInstance); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + assert.strictEqual((instance as any).metadata, protoInstance); + }) + .on('end', done); }); }); diff --git a/test/instance.ts b/test/instance.ts index da80468a3..4f0bd5706 100644 --- a/test/instance.ts +++ b/test/instance.ts @@ -25,7 +25,7 @@ import * as proxyquire from 'proxyquire'; import * as pfy from '@google-cloud/promisify'; import * as sinon from 'sinon'; import snakeCase = require('lodash.snakecase'); -import {Duplex} from 'stream'; +import * as through from 'through2'; import * as inst from '../src/instance'; import {Spanner, Database, RequestConfig} from '../src'; @@ -34,6 +34,7 @@ import {SessionPoolOptions} from '../src/session-pool'; import {Backup} from '../src/backup'; import {PreciseDate} from '@google-cloud/precise-date'; import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import duplexify = require('duplexify'); let promisified = false; const fakePfy = extend({}, pfy, { @@ -1088,7 +1089,7 @@ describe('Instance', () => { const OPTIONS = { gaxOptions: {autoPaginate: false}, } as inst.GetDatabasesOptions; - const returnValue = {} as Duplex; + const returnValue = through.obj(); it('should make and return the correct gax API call', () => { const expectedReqOpts = extend({}, OPTIONS, { @@ -1110,7 +1111,7 @@ describe('Instance', () => { }; const returnedValue = instance.getDatabasesStream(OPTIONS); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should pass pageSize and pageToken from gaxOptions into reqOpts', () => { @@ -1137,7 +1138,7 @@ describe('Instance', () => { }; const returnedValue = instance.getDatabasesStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('pageSize and pageToken in options should take precedence over gaxOptions', () => { @@ -1173,7 +1174,7 @@ describe('Instance', () => { }; const returnedValue = instance.getDatabasesStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should not require options', () => { @@ -1188,7 +1189,34 @@ describe('Instance', () => { }; const returnedValue = instance.getDatabasesStream(); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); + }); + + it('should create and return Database objects', done => { + const stream = through.obj(); + instance.requestStream = () => { + return stream; + }; + const protoDatabase = {name: 'database'}; + setImmediate(() => { + stream.push(protoDatabase); + stream.push(null); + }); + + const spy = sandbox.spy(instance, 'database'); + instance + .getDatabasesStream() + .on('error', assert.ifError) + .on('data', database => { + assert.ok(database instanceof FakeDatabase); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + assert.strictEqual((database as any).metadata, protoDatabase); + assert.strictEqual( + (spy.getCall(0).args[1]! as SessionPoolOptions).min, + 0 + ); + }) + .on('end', done); }); }); @@ -1523,7 +1551,7 @@ describe('Instance', () => { const OPTIONS = { gaxOptions: {autoPaginate: false}, } as inst.GetDatabasesOptions; - const returnValue = {} as Duplex; + const returnValue = through.obj(); it('should make and return the correct gax API call', () => { const expectedReqOpts = extend({}, OPTIONS, { @@ -1545,7 +1573,7 @@ describe('Instance', () => { }; const returnedValue = instance.getBackupsStream(OPTIONS); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should pass pageSize and pageToken from gaxOptions into reqOpts', () => { @@ -1572,7 +1600,7 @@ describe('Instance', () => { }; const returnedValue = instance.getBackupsStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('pageSize and pageToken in options should take precedence over gaxOptions', () => { @@ -1608,7 +1636,7 @@ describe('Instance', () => { }; const returnedValue = instance.getBackupsStream(options); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); }); it('should not require options', () => { @@ -1623,7 +1651,27 @@ describe('Instance', () => { }; const returnedValue = instance.getBackupsStream(); - assert.strictEqual(returnedValue, returnValue); + assert.ok(returnedValue instanceof duplexify); + }); + + it('should create and return Backup objects', done => { + const stream = through.obj(); + instance.requestStream = () => { + return stream; + }; + const protoBackup = {name: 'backup'}; + setImmediate(() => { + stream.push(protoBackup); + stream.push(null); + }); + + instance + .getBackupsStream() + .on('error', assert.ifError) + .on('data', backup => { + assert.ok(backup instanceof FakeBackup); + }) + .on('end', done); }); });