Skip to content

Commit

Permalink
remove global variables from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vinayak25 committed Nov 11, 2024
1 parent fcfce05 commit 23aa32e
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 199 deletions.
13 changes: 0 additions & 13 deletions integrations/sample-app/app/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,6 @@ import { HttpKernel } from './http/kernel';
import { ApplicationContainer } from './boot/container';
import { ApplicationExceptionFilter } from './errors/filter';
import { IntentHttpServer } from '@intentjs/core';
import heapdump from 'heapdump';

// Create a heapdump on demand
const createHeapdump = () => {
const filename = `heapdump-${Date.now()}.heapsnapshot`;
heapdump.writeSnapshot(filename, (err, filename) => {
if (err) {
console.error('Error creating heapdump:', err);
} else {
console.log(`Heapdump written to ${filename}`);
}
});
};

IntentHttpServer.init()
.useContainer(ApplicationContainer)
Expand Down
1 change: 0 additions & 1 deletion integrations/sample-app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"bcrypt": "^5.1.1",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.1",
"heapdump": "^0.3.15",
"jsonwebtoken": "^9.0.2",
"knex": "^3.1.0",
"node-cache": "^5.1.2",
Expand Down
20 changes: 0 additions & 20 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion packages/core/lib/config/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export class ConfigService<G = undefined> {
private static config: ConfigMap;

constructor(@Inject(CONFIG_FACTORY) private config: ConfigMap) {
console.log(this.config);
ConfigService.cachedConfig = new Map<ConfigPaths<G>, any>();
ConfigService.config = this.config;
}
Expand Down
6 changes: 2 additions & 4 deletions packages/core/lib/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ export class Queue {
static async dispatch(message: Message): Promise<void> {
const job = QueueMetadata.getJob(message.job);
const payload = PayloadBuilder.build(message, job?.options ?? {});
const { config, client } = QueueService.getConnection(
payload['connection'],
);
const { config, client } = QueueService.makeDriver(payload['connection']);

if (config.listenerType === 'subscribe') {
return (client as SubscribeQueueDriver).publish(
Expand All @@ -27,7 +25,7 @@ export class Queue {
export async function Dispatch(message: Message): Promise<void> {
const job = QueueMetadata.getJob(message.job);
const payload = PayloadBuilder.build(message, job?.options || {});
const { config, client } = QueueService.getConnection(payload['connection']);
const { config, client } = QueueService.makeDriver(payload['connection']);

if (config.listenerType === 'subscribe') {
return await (client as SubscribeQueueDriver).publish(
Expand Down
102 changes: 54 additions & 48 deletions packages/core/lib/queue/service.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import { Injectable, Type } from '@nestjs/common';
import { ConfigService } from '../config/service';
import { logTime } from '../utils/helpers';
import { InternalLogger } from '../utils/logger';
import { Str } from '../utils/string';
import {
DbQueueDriverOptions,
QueueDriverOptions,
QueueOptions,
RedisQueueDriverOptions,
SqsQueueDriverOptions,
SyncQueueDriverOptions,
} from './interfaces';
import {
DatabaseQueueDriver,
RedisQueueDriver,
SqsQueueDriver,
SyncQueueDriver,
} from './drivers';
import { RedisQueueDriver } from './drivers/redis';
import { QueueDriverOptions, QueueOptions } from './interfaces';
import { QueueMetadata } from './metadata';
import { QueueDrivers } from './strategy';
import { ConfigService } from '../config';
import { isEmpty } from 'lodash';
import { Str } from '../utils';

@Injectable()
export class QueueService {
Expand All @@ -22,54 +27,55 @@ export class QueueService {
db: DatabaseQueueDriver,
};

private static connections: Record<string, any> = {};
private static drivers: Map<
string,
{ config: Record<string, any>; client: QueueDrivers }
>;

constructor(private config: ConfigService) {
const options = this.config.get('queue') as QueueOptions;
if (!options) return;

for (const connName in options.connections) {
const time = Date.now();
const connection = options.connections[connName];
const driverName: string | Type<QueueDrivers> = connection.driver;
const driver: Type<QueueDrivers> = Str.isString(driverName)
? QueueService.queueDriverMap[driverName as unknown as string]
: driverName;

if (!driver) {
InternalLogger.error(
'QueueService',
`We couldn't find any driver associated with the "${driverName}".`,
);
continue;
}
QueueService.drivers = new Map<
string,
{ config: Record<string, any>; client: QueueDrivers }
>();
}

QueueService.connections[connName] = {
config: connection,
client: new driver(connection),
static makeDriver<T = QueueDrivers>(
connection: string,
options?:
| SyncQueueDriverOptions
| SqsQueueDriverOptions
| RedisQueueDriverOptions
| QueueDriverOptions
| DbQueueDriverOptions,
): { config: QueueDriverOptions; client: T } {
if (this.drivers.has(connection)) {
return this.drivers.get(connection) as {
config: QueueDriverOptions;
client: T;
};
InternalLogger.success(
'QueueService',
`Queue connection [${connName}] successfully initiailized ${logTime(
Date.now() - time,
)}`,
}

const config = ConfigService.get('queue') as QueueOptions;
options = options ?? config.connections[connection];
if (isEmpty(options)) {
throw new Error(
`Invalid options passed while trying to make a new driver [${connection}] for Queue`,
);
}
}

static getConnection<T = QueueDrivers>(
connection: string | undefined,
): { config: QueueDriverOptions; client: T } {
const options = QueueMetadata.getData();
if (!connection) connection = options.default;
return QueueService.connections[connection];
}
const driverName: string | Type<QueueDrivers> = options.driver;
const driver: Type<QueueDrivers> = Str.isString(driverName)
? QueueService.queueDriverMap[driverName as unknown as string]
: driverName;

this.drivers.set(connection, {
config: options,
client: new driver(options),
});

static getConnectionClient<T = QueueDrivers>(
connection: string | undefined,
): T {
const options = QueueMetadata.getData();
if (!connection) connection = options.default;
return QueueService.connections[connection].client;
return this.drivers.get(connection) as {
config: QueueDriverOptions;
client: T;
};
}
}
2 changes: 1 addition & 1 deletion packages/core/lib/queue/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class QueueWorker {
...args,
};

const { config } = QueueService.getConnection(this.options.connection);
const { config } = QueueService.makeDriver(this.options.connection);
this.options.listenerType = config.listenerType;

if (!this.options.queue) {
Expand Down
20 changes: 10 additions & 10 deletions packages/core/lib/queue/workers/pollQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ export class PollQueueWorker extends BaseQueueWorker {
this.logInfo('Poll Queue Worker Initialised');
this.logInfo('Listening for messages...');

const connection = QueueService.getConnectionClient<PollQueueDriver>(
const { client } = QueueService.makeDriver<PollQueueDriver>(
this.options.connection,
);

// perform scheduled task of the driver
if (connection.scheduledTask) this.performScheduledTask(connection);
if (client.scheduledTask) this.performScheduledTask(client);

const runner = new JobRunner(this.options, connection);
const runner = new JobRunner(this.options, client);
// eslint-disable-next-line no-constant-condition
while (1) {
const jobs = await this.poll(connection);
const jobs = await this.poll(client);
if (!jobs.length) {
await new Promise(resolve => setTimeout(resolve, this.options.sleep));
continue;
Expand Down Expand Up @@ -131,17 +131,17 @@ export class PollQueueWorker extends BaseQueueWorker {
}

async purge(): Promise<void> {
const connection = QueueService.getConnectionClient<PollQueueDriver>(
const { client } = QueueService.makeDriver<PollQueueDriver>(
this.options.connection,
);
await connection.purge({ queue: this.options.queue });
await client.purge({ queue: this.options.queue });
}

async count(): Promise<number> {
const connection = QueueService.getConnectionClient<PollQueueDriver>(
const { client } = QueueService.makeDriver<PollQueueDriver>(
this.options.connection,
);
return await connection.count({ queue: this.options.queue });
return await client.count({ queue: this.options.queue });
}

/**
Expand Down Expand Up @@ -176,10 +176,10 @@ export class PollQueueWorker extends BaseQueueWorker {
* @param job
*/
async removeJobFromQueue(job: DriverJob): Promise<void> {
const connection = QueueService.getConnectionClient<PollQueueDriver>(
const { client } = QueueService.makeDriver<PollQueueDriver>(
this.options.connection,
);
await connection.remove(job, this.options);
await client.remove(job, this.options);
}

/**
Expand Down
14 changes: 6 additions & 8 deletions packages/core/lib/queue/workers/subscribeQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ export class SubscribeQueueWorker {
async initListeners(): Promise<void> {
const jobs = QueueMetadata.getAllJobs();
await this.initBroker(this.options.connection, jobs);
const connClient = QueueService.getConnectionClient<SubscribeQueueDriver>(
const { client } = QueueService.makeDriver<SubscribeQueueDriver>(
this.options.connection,
);

await connClient.startListening(this.processIncomingMessage());
await client.startListening(this.processIncomingMessage());

this.attachDeamonListeners();

Expand Down Expand Up @@ -85,11 +85,10 @@ export class SubscribeQueueWorker {
): Promise<void> {
const topicNames = Object.keys(listeners);

const brokerClient =
QueueService.getConnectionClient<SubscribeQueueDriver>(broker);
const { client } = QueueService.makeDriver<SubscribeQueueDriver>(broker);

const workerOptions = Obj.pick(this.options, ['listenerId']);
await brokerClient.initListeners({
await client.initListeners({
topics: topicNames,
workerOptions: workerOptions,
});
Expand All @@ -99,9 +98,8 @@ export class SubscribeQueueWorker {
const listeners = QueueMetadata.getAllJobs();
const brokers = Object.keys(listeners);
for (const broker of brokers) {
const brokerClient =
QueueService.getConnectionClient<SubscribeQueueDriver>(broker);
await brokerClient.disconnect();
const { client } = QueueService.makeDriver<SubscribeQueueDriver>(broker);
await client.disconnect();
}

process.exit(0);
Expand Down
1 change: 0 additions & 1 deletion packages/core/lib/rest/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export * from './restServer';
export * from './interceptors/timeout';
export * from './interfaces';
export * from './decorators';
Expand Down
Loading

0 comments on commit 23aa32e

Please sign in to comment.