Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-khare-mongoDB committed Jan 6, 2025
1 parent 125305a commit 2a43e4d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 42 deletions.
3 changes: 3 additions & 0 deletions socket-connection-rtt-monitoring.logs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{}
{"newResources":{"libuvResources":[],"activeResources":["TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap"]}}
{"beforeExitHappened":true}
62 changes: 27 additions & 35 deletions test/integration/node-specific/client_close.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ describe.only('MongoClient.close() Integration', () => {
const connectionMonitoringReport = () =>
process.report
.getReport()
.libuv.filter(r => r.type === 'tcp' && r.is_active)
.libuv.filter(r => r.type === 'tcp')
.map(r => r.remoteEndpoint);

// assert socket creation
// assert socket creation
const servers = client.topology?.s.servers;
for (const server of servers) {
const { host, port } = server[1].s.description.hostAddress;
expect(connectionMonitoringReport()).to.deep.include({host, port});
}
}

await client.close();

Expand Down Expand Up @@ -153,56 +153,48 @@ describe.only('MongoClient.close() Integration', () => {
describe('Node.js resource: Socket', () => {
describe('when rtt monitoring is turned on', () => {
it('no sockets remain after client.close()', async () => {
const run = async ({ MongoClient, uri, expect, sleep }) => {
const run = async ({ MongoClient, uri, log, expect, sleep }) => {
const heartbeatFrequencyMS = 100;
const client = new MongoClient(uri, {
serverMonitoringMode: 'stream',
heartbeatFrequencyMS
});
await client.connect();

const servers = Array.from(client.topology.s.servers.keys());

// a hashmap of <server host/ports, boolean>
const serversHeartbeatOccurred = servers.reduce(
(acc, hostname) => ({ ...acc, [hostname]: false }),
{}
);

const activeSocketsReport = () =>
process.report.getReport().libuv.filter(r => r.type === 'tcp' && r.is_active);
process.report.getReport().libuv.filter(r => r.type === 'tcp');

const socketsAddressesBeforeHeartbeat = activeSocketsReport().map(
r => r.address
);

const rttSocketReport = () =>
const activeSocketsAfterHeartbeat = () =>
activeSocketsReport()
.filter(r => !socketsAddressesBeforeHeartbeat.includes(r.address))
.map(r => r.remoteEndpoint.host + ':' + r.remoteEndpoint.port);

client.on('serverHeartbeatSucceeded', async ev => {
// assert creation of rttPinger socket
const newSocketsAfterHeartbeat = rttSocketReport();
expect(newSocketsAfterHeartbeat).to.deep.contain(ev.connectionId);
// set of servers whose hearbeats have occurred
const heartbeatOccurredSet = new Set();

// assert rttPinger socket is connected to a server
expect(serversHeartbeatOccurred.keys()).to.deep.contain(ev.connectionId);
serversHeartbeatOccurred[ev.connectionId] = true;
});
client.on('serverHeartbeatSucceeded', async ev => heartbeatOccurredSet.add(ev.connectionId));

// ensure there is enough time for the heartbeatFrequencyMS for the event to occur
// ensure there is enough time for the events to occur
await sleep(heartbeatFrequencyMS * 10);

// all servers should have had a heartbeat event
expect(serversHeartbeatOccurred.values().filter(r => r !== true)).to.be.empty;
// all servers should have had a heartbeat event and had a new socket created for rtt pinger
log(heartbeatOccurredSet);
const servers = client.topology.s.servers
for (const server of servers) {
expect(heartbeatOccurredSet).to.deep.contain(server[0]);
expect(activeSocketsAfterHeartbeat()).to.deep.contain(server[0]);
}

// close the client
await client.close();

// upon close, assert rttPinger socket is cleaned up
const newSocketsAfterClose = rttSocketReport();
expect(newSocketsAfterClose).to.have.length(0);
// upon close, assert rttPinger sockets are cleaned up
const activeSocketsAfterClose = activeSocketsAfterHeartbeat();
expect(activeSocketsAfterClose).to.have.length(0);
};

await runScriptAndGetProcessInfo('socket-connection-rtt-monitoring', config, run);
Expand Down Expand Up @@ -235,30 +227,30 @@ describe.only('MongoClient.close() Integration', () => {
});

describe('after a minPoolSize has been set on the ConnectionPool', () => {
it('no sockets remain after client.close()', async function () {
it.only('no sockets remain after client.close()', async function () {
const run = async function ({ MongoClient, uri, log, expect }) {
log({hi: 1});
const options = { minPoolSize: 2 };
const client = new MongoClient(uri, options);
await client.connect();
const connectionMonitoringReport = () =>
process.report.getReport().libuv.filter(r => r.type === 'tcp' && r.is_active).map(r => r.remoteEndpoint);
process.report.getReport().libuv.filter(r => r.type === 'tcp').map(r => r.remoteEndpoint);

// assert socket creation
// there should be three sockets for each server: client connection socket, monitor socket, pool size monitoring socket
// assert socket creation
// there should be a client connection socket for each server, one monitor socket total, and one pool size monitoring socket total
const servers = client.topology?.s.servers;

for (const server of servers) {
log({ monreport: connectionMonitoringReport() });
const { host, port } = server[1].s.description.hostAddress;
const relevantHostAddresses = connectionMonitoringReport().filter(r => r.host === host && r.port === port);
log({relevantHostAddresses});
expect(relevantHostAddresses).length.to.be.gte(3);
}

await client.close();
};

await runScriptAndGetProcessInfo(
'socket-minPoolSize',
'socket-minPoolSize',
config,
run
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { parseSnapshot } from 'v8-heapsnapshot';

import { type BSON, type ClientEncryption, type MongoClient } from '../../mongodb';
import { type TestConfiguration } from '../../tools/runner/config';
import { sleep } from '../../tools/utils';

export type ResourceTestFunction = HeapResourceTestFunction | ProcessResourceTestFunction;

Expand All @@ -25,7 +24,6 @@ export type ProcessResourceTestFunction = (options: {
expect: typeof expect;
ClientEncryption?: typeof ClientEncryption;
BSON?: typeof BSON;
sleep?: typeof sleep;
}) => Promise<void>;

const HEAP_RESOURCE_SCRIPT_PATH = path.resolve(
Expand Down Expand Up @@ -170,7 +168,7 @@ export async function runScriptAndGetProcessInfo(
await writeFile(scriptName, scriptContent, { encoding: 'utf8' });
const logFile = name + '.logs.txt';

const script = spawn(process.argv[0], [scriptName], { stdio: ['ignore', 'ignore', 'ignore'] });
const script = spawn(process.execPath, [scriptName], { stdio: ['ignore', 'ignore', 'ignore'] });

const willClose = once(script, 'close');

Expand All @@ -186,14 +184,14 @@ export async function runScriptAndGetProcessInfo(

// delete temporary files
await unlink(scriptName);
await unlink(logFile);
// await unlink(logFile);

// assertions about exit status
if (exitCode) {
const assertionError = new AssertionError(
messages.error.message + '\n\t' + JSON.stringify(messages.error.resources, undefined, 2)
messages.error?.message + '\n\t' + JSON.stringify(messages.error?.resources, undefined, 2)
);
assertionError.stack = messages.error.stack + new Error().stack.slice('Error'.length);
assertionError.stack = messages.error?.stack + new Error().stack.slice('Error'.length);
throw assertionError;
}

Expand Down
2 changes: 1 addition & 1 deletion test/tools/fixtures/process_resource_script.in.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const { setTimeout } = require('timers');

let originalReport;
const logFile = scriptName + '.logs.txt';
const sleep = promisify(setTimeout);
const sleep = util.promisify(setTimeout);

const run = func;

Expand Down

0 comments on commit 2a43e4d

Please sign in to comment.