diff --git a/socket-connection-rtt-monitoring.logs.txt b/socket-connection-rtt-monitoring.logs.txt new file mode 100644 index 0000000000..9231c6dee6 --- /dev/null +++ b/socket-connection-rtt-monitoring.logs.txt @@ -0,0 +1,3 @@ +{} +{"newResources":{"libuvResources":[],"activeResources":["TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap"]}} +{"beforeExitHappened":true} diff --git a/test/integration/node-specific/client_close.test.ts b/test/integration/node-specific/client_close.test.ts index d4784e6e38..330460b339 100644 --- a/test/integration/node-specific/client_close.test.ts +++ b/test/integration/node-specific/client_close.test.ts @@ -113,15 +113,15 @@ describe.only('MongoClient.close() Integration', () => { const connectionMonitoringReport = () => process.report .getReport() - .libuv.filter(r => r.type === 'tcp' && r.is_active) + .libuv.filter(r => r.type === 'tcp') .map(r => r.remoteEndpoint); - // assert socket creation + // assert socket creation const servers = client.topology?.s.servers; for (const server of servers) { const { host, port } = server[1].s.description.hostAddress; expect(connectionMonitoringReport()).to.deep.include({host, port}); - } + } await client.close(); @@ -153,7 +153,7 @@ describe.only('MongoClient.close() Integration', () => { describe('Node.js resource: Socket', () => { describe('when rtt monitoring is turned on', () => { it('no sockets remain after client.close()', async () => { - const run = async ({ MongoClient, uri, expect, sleep }) => { + const run = async ({ MongoClient, uri, log, expect, sleep }) => { const heartbeatFrequencyMS = 100; const client = new MongoClient(uri, { serverMonitoringMode: 'stream', @@ -161,48 +161,40 @@ describe.only('MongoClient.close() Integration', () => { }); await client.connect(); - const servers = Array.from(client.topology.s.servers.keys()); - - // a hashmap of - const serversHeartbeatOccurred = servers.reduce( - (acc, hostname) => ({ ...acc, [hostname]: false }), - {} - ); - const activeSocketsReport = () => - process.report.getReport().libuv.filter(r => r.type === 'tcp' && r.is_active); + process.report.getReport().libuv.filter(r => r.type === 'tcp'); const socketsAddressesBeforeHeartbeat = activeSocketsReport().map( r => r.address ); - const rttSocketReport = () => + const activeSocketsAfterHeartbeat = () => activeSocketsReport() .filter(r => !socketsAddressesBeforeHeartbeat.includes(r.address)) .map(r => r.remoteEndpoint.host + ':' + r.remoteEndpoint.port); - client.on('serverHeartbeatSucceeded', async ev => { - // assert creation of rttPinger socket - const newSocketsAfterHeartbeat = rttSocketReport(); - expect(newSocketsAfterHeartbeat).to.deep.contain(ev.connectionId); + // set of servers whose hearbeats have occurred + const heartbeatOccurredSet = new Set(); - // assert rttPinger socket is connected to a server - expect(serversHeartbeatOccurred.keys()).to.deep.contain(ev.connectionId); - serversHeartbeatOccurred[ev.connectionId] = true; - }); + client.on('serverHeartbeatSucceeded', async ev => heartbeatOccurredSet.add(ev.connectionId)); - // ensure there is enough time for the heartbeatFrequencyMS for the event to occur + // ensure there is enough time for the events to occur await sleep(heartbeatFrequencyMS * 10); - // all servers should have had a heartbeat event - expect(serversHeartbeatOccurred.values().filter(r => r !== true)).to.be.empty; + // all servers should have had a heartbeat event and had a new socket created for rtt pinger + log(heartbeatOccurredSet); + const servers = client.topology.s.servers + for (const server of servers) { + expect(heartbeatOccurredSet).to.deep.contain(server[0]); + expect(activeSocketsAfterHeartbeat()).to.deep.contain(server[0]); + } // close the client await client.close(); - // upon close, assert rttPinger socket is cleaned up - const newSocketsAfterClose = rttSocketReport(); - expect(newSocketsAfterClose).to.have.length(0); + // upon close, assert rttPinger sockets are cleaned up + const activeSocketsAfterClose = activeSocketsAfterHeartbeat(); + expect(activeSocketsAfterClose).to.have.length(0); }; await runScriptAndGetProcessInfo('socket-connection-rtt-monitoring', config, run); @@ -235,30 +227,30 @@ describe.only('MongoClient.close() Integration', () => { }); describe('after a minPoolSize has been set on the ConnectionPool', () => { - it('no sockets remain after client.close()', async function () { + it.only('no sockets remain after client.close()', async function () { const run = async function ({ MongoClient, uri, log, expect }) { log({hi: 1}); const options = { minPoolSize: 2 }; const client = new MongoClient(uri, options); await client.connect(); const connectionMonitoringReport = () => - process.report.getReport().libuv.filter(r => r.type === 'tcp' && r.is_active).map(r => r.remoteEndpoint); + process.report.getReport().libuv.filter(r => r.type === 'tcp').map(r => r.remoteEndpoint); - // assert socket creation - // there should be three sockets for each server: client connection socket, monitor socket, pool size monitoring socket + // assert socket creation + // there should be a client connection socket for each server, one monitor socket total, and one pool size monitoring socket total const servers = client.topology?.s.servers; + for (const server of servers) { - log({ monreport: connectionMonitoringReport() }); const { host, port } = server[1].s.description.hostAddress; const relevantHostAddresses = connectionMonitoringReport().filter(r => r.host === host && r.port === port); - log({relevantHostAddresses}); expect(relevantHostAddresses).length.to.be.gte(3); } + await client.close(); }; await runScriptAndGetProcessInfo( - 'socket-minPoolSize', + 'socket-minPoolSize', config, run ); diff --git a/test/integration/node-specific/resource_tracking_script_builder.ts b/test/integration/node-specific/resource_tracking_script_builder.ts index 5d0a4ebc43..041c1de71d 100644 --- a/test/integration/node-specific/resource_tracking_script_builder.ts +++ b/test/integration/node-specific/resource_tracking_script_builder.ts @@ -8,7 +8,6 @@ import { parseSnapshot } from 'v8-heapsnapshot'; import { type BSON, type ClientEncryption, type MongoClient } from '../../mongodb'; import { type TestConfiguration } from '../../tools/runner/config'; -import { sleep } from '../../tools/utils'; export type ResourceTestFunction = HeapResourceTestFunction | ProcessResourceTestFunction; @@ -25,7 +24,6 @@ export type ProcessResourceTestFunction = (options: { expect: typeof expect; ClientEncryption?: typeof ClientEncryption; BSON?: typeof BSON; - sleep?: typeof sleep; }) => Promise; const HEAP_RESOURCE_SCRIPT_PATH = path.resolve( @@ -170,7 +168,7 @@ export async function runScriptAndGetProcessInfo( await writeFile(scriptName, scriptContent, { encoding: 'utf8' }); const logFile = name + '.logs.txt'; - const script = spawn(process.argv[0], [scriptName], { stdio: ['ignore', 'ignore', 'ignore'] }); + const script = spawn(process.execPath, [scriptName], { stdio: ['ignore', 'ignore', 'ignore'] }); const willClose = once(script, 'close'); @@ -186,14 +184,14 @@ export async function runScriptAndGetProcessInfo( // delete temporary files await unlink(scriptName); - await unlink(logFile); + // await unlink(logFile); // assertions about exit status if (exitCode) { const assertionError = new AssertionError( - messages.error.message + '\n\t' + JSON.stringify(messages.error.resources, undefined, 2) + messages.error?.message + '\n\t' + JSON.stringify(messages.error?.resources, undefined, 2) ); - assertionError.stack = messages.error.stack + new Error().stack.slice('Error'.length); + assertionError.stack = messages.error?.stack + new Error().stack.slice('Error'.length); throw assertionError; } diff --git a/test/tools/fixtures/process_resource_script.in.js b/test/tools/fixtures/process_resource_script.in.js index ebf3dd6f86..0849d99fa0 100644 --- a/test/tools/fixtures/process_resource_script.in.js +++ b/test/tools/fixtures/process_resource_script.in.js @@ -17,7 +17,7 @@ const { setTimeout } = require('timers'); let originalReport; const logFile = scriptName + '.logs.txt'; -const sleep = promisify(setTimeout); +const sleep = util.promisify(setTimeout); const run = func;