Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(app-gen, worker): Add MetricsService #4928

Merged
merged 12 commits into from
Dec 2, 2023
Merged
3 changes: 3 additions & 0 deletions apps/worker/src/.env.development
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ MAX_NOVU_INTEGRATION_SMS_REQUESTS=20
# Storage Service
# STORAGE_SERVICE=

# Metrics Service
# METRICS_SERVICE=

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
Expand Down
3 changes: 3 additions & 0 deletions apps/worker/src/.env.production
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ MAX_NOVU_INTEGRATION_SMS_REQUESTS=20
# Storage Service
# STORAGE_SERVICE=

# Metrics Service
# METRICS_SERVICE=

# Redis
# REDIS_HOST=localhost
REDIS_PORT=6379
Expand Down
3 changes: 3 additions & 0 deletions apps/worker/src/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ NOVU_SMS_INTEGRATION_SENDER=1234567890
# Storage Service
# STORAGE_SERVICE=

# Metrics Service
# METRICS_SERVICE=

# Redis
REDIS_PORT=6379
REDIS_HOST=localhost
Expand Down
3 changes: 3 additions & 0 deletions apps/worker/src/.example.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ MAX_NOVU_INTEGRATION_SMS_REQUESTS=20
# Storage Service
# STORAGE_SERVICE=

# Metrics Service
# METRICS_SERVICE=

# Redis
REDIS_PORT=6379
REDIS_HOST=localhost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { expect } from 'chai';

import {
ActiveJobsMetricQueueService,
ActiveJobsMetricWorkerService,
MetricsService,
StandardQueueService,
WebSocketsQueueService,
WorkflowQueueService,
Expand All @@ -17,21 +16,9 @@ let activeJobsMetricService: ActiveJobsMetricService;
let standardService: StandardQueueService;
let webSocketsQueueService: WebSocketsQueueService;
let workflowQueueService: WorkflowQueueService;
let metricsService: MetricsService;
let moduleRef: TestingModule;

before(async () => {
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';
process.env.IS_IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';

moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

standardService = moduleRef.get<StandardQueueService>(StandardQueueService);
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService);
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService);
});

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't being used, it's overridden below.

describe('Active Jobs Metric Service', () => {
before(async () => {
process.env.IN_MEMORY_CLUSTER_MODE_ENABLED = 'false';
Expand All @@ -44,32 +31,28 @@ describe('Active Jobs Metric Service', () => {
standardService = moduleRef.get<StandardQueueService>(StandardQueueService);
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService);
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService);
metricsService = moduleRef.get<MetricsService>(MetricsService);

const activeJobsMetricQueueService = new ActiveJobsMetricQueueService();
const activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService();

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
activeJobsMetricService = new ActiveJobsMetricService(
[standardService, webSocketsQueueService, workflowQueueService],
metricsService
);
});

describe('Environment variables not set', () => {
beforeEach(() => {
process.env.NOVU_MANAGED_SERVICE = 'false';
process.env.NEW_RELIC_LICENSE_KEY = '';

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
activeJobsMetricService = new ActiveJobsMetricService(
[standardService, webSocketsQueueService, workflowQueueService],
metricsService
);
});

it('should not initialize neither the queue or the worker if the environment conditions are not met', async () => {
expect(activeJobsMetricService).to.be.ok;
expect(activeJobsMetricService).to.have.all.keys('tokenList');
expect(activeJobsMetricService).to.have.all.keys('tokenList', 'metricsService');
expect(await activeJobsMetricService.activeJobsMetricQueueService).to.not.be.ok;
expect(await activeJobsMetricService.activeJobsMetricWorkerService).to.not.be.ok;
});
Expand All @@ -80,11 +63,10 @@ describe('Active Jobs Metric Service', () => {
process.env.NOVU_MANAGED_SERVICE = 'true';
process.env.NEW_RELIC_LICENSE_KEY = 'license';

activeJobsMetricService = new ActiveJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
activeJobsMetricService = new ActiveJobsMetricService(
[standardService, webSocketsQueueService, workflowQueueService],
metricsService
);
});

after(async () => {
Expand All @@ -97,6 +79,7 @@ describe('Active Jobs Metric Service', () => {
expect(activeJobsMetricService).to.have.all.keys(
'activeJobsMetricQueueService',
'activeJobsMetricWorkerService',
'metricsService',
'tokenList'
);
expect(await activeJobsMetricService.activeJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const nr = require('newrelic');

import {
ActiveJobsMetricQueueService,
ActiveJobsMetricWorkerService,
MetricsService,
QueueBaseService,
WorkerOptions,
} from '@novu/application-generic';
Expand All @@ -18,7 +17,7 @@ export class ActiveJobsMetricService {
public readonly activeJobsMetricQueueService: ActiveJobsMetricQueueService;
public readonly activeJobsMetricWorkerService: ActiveJobsMetricWorkerService;

constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[]) {
constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], private metricsService: MetricsService) {
if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) {
this.activeJobsMetricQueueService = new ActiveJobsMetricQueueService();
this.activeJobsMetricWorkerService = new ActiveJobsMetricWorkerService();
Expand Down Expand Up @@ -100,11 +99,9 @@ export class ActiveJobsMetricService {

Logger.verbose('Recording active, waiting, and delayed metrics');

nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/waiting`, waitCount);
nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/delayed`, delayedCount);
nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/active`, activeCount);

Logger.verbose(`Queue/${deploymentName}/${queueService.topic}`, { waitCount, delayedCount, activeCount });
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verbose logging is handled in the metrics service now.

this.metricsService.recordMetric(`Queue/${deploymentName}/${queueService.topic}/waiting`, waitCount);
this.metricsService.recordMetric(`Queue/${deploymentName}/${queueService.topic}/delayed`, delayedCount);
this.metricsService.recordMetric(`Queue/${deploymentName}/${queueService.topic}/active`, activeCount);
}

return resolve();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { expect } from 'chai';

import {
CompletedJobsMetricQueueService,
CompletedJobsMetricWorkerService,
MetricsService,
StandardQueueService,
WebSocketsQueueService,
WorkflowQueueService,
Expand All @@ -17,6 +16,7 @@ let completedJobsMetricService: CompletedJobsMetricService;
let standardService: StandardQueueService;
let webSocketsQueueService: WebSocketsQueueService;
let workflowQueueService: WorkflowQueueService;
let metricsService: MetricsService;
let moduleRef: TestingModule;

before(async () => {
Expand All @@ -30,6 +30,7 @@ before(async () => {
standardService = moduleRef.get<StandardQueueService>(StandardQueueService);
webSocketsQueueService = moduleRef.get<WebSocketsQueueService>(WebSocketsQueueService);
workflowQueueService = moduleRef.get<WorkflowQueueService>(WorkflowQueueService);
metricsService = moduleRef.get<MetricsService>(MetricsService);
});

describe('Completed Jobs Metric Service', () => {
Expand All @@ -38,16 +39,15 @@ describe('Completed Jobs Metric Service', () => {
process.env.NOVU_MANAGED_SERVICE = 'false';
process.env.NEW_RELIC_LICENSE_KEY = '';

completedJobsMetricService = new CompletedJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
completedJobsMetricService = new CompletedJobsMetricService(
[standardService, webSocketsQueueService, workflowQueueService],
metricsService
);
});

it('should not initialize neither the queue or the worker if the environment conditions are not met', async () => {
expect(completedJobsMetricService).to.be.ok;
expect(completedJobsMetricService).to.have.all.keys('tokenList');
expect(completedJobsMetricService).to.have.all.keys('tokenList', 'metricsService');
expect(await completedJobsMetricService.completedJobsMetricQueueService).to.not.be.ok;
expect(await completedJobsMetricService.completedJobsMetricWorkerService).to.not.be.ok;
});
Expand All @@ -58,11 +58,10 @@ describe('Completed Jobs Metric Service', () => {
process.env.NOVU_MANAGED_SERVICE = 'true';
process.env.NEW_RELIC_LICENSE_KEY = 'license';

completedJobsMetricService = new CompletedJobsMetricService([
standardService,
webSocketsQueueService,
workflowQueueService,
]);
completedJobsMetricService = new CompletedJobsMetricService(
[standardService, webSocketsQueueService, workflowQueueService],
metricsService
);
});

after(async () => {
Expand All @@ -75,6 +74,7 @@ describe('Completed Jobs Metric Service', () => {
expect(completedJobsMetricService).to.have.all.keys(
'completedJobsMetricQueueService',
'completedJobsMetricWorkerService',
'metricsService',
'tokenList'
);
expect(await completedJobsMetricService.completedJobsMetricQueueService.bullMqService.getStatus()).to.deep.equal({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
CompletedJobsMetricQueueService,
CompletedJobsMetricWorkerService,
MetricsService,
QueueBaseService,
WorkerOptions,
} from '@novu/application-generic';
Expand All @@ -19,7 +20,7 @@ export class CompletedJobsMetricService {
public readonly completedJobsMetricQueueService: CompletedJobsMetricQueueService;
public readonly completedJobsMetricWorkerService: CompletedJobsMetricWorkerService;

constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[]) {
constructor(@Inject('BULLMQ_LIST') private tokenList: QueueBaseService[], private metricsService: MetricsService) {
if (process.env.NOVU_MANAGED_SERVICE === 'true' && process.env.NEW_RELIC_LICENSE_KEY) {
this.completedJobsMetricQueueService = new CompletedJobsMetricQueueService();
this.completedJobsMetricWorkerService = new CompletedJobsMetricWorkerService();
Expand Down Expand Up @@ -100,12 +101,8 @@ export class CompletedJobsMetricService {
Logger.verbose('active length', process.env.NEW_RELIC_LICENSE_KEY.length);
Logger.verbose('Recording active, waiting, and delayed metrics');

const nr = require('newrelic');
nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/completed`, completeNumber);
nr.recordMetric(`Queue/${deploymentName}/${queueService.topic}/failed`, failNumber);

Logger.verbose(`Queue/${deploymentName}/${queueService.topic}/completed`, completeNumber);
Logger.verbose(`Queue/${deploymentName}/${queueService.topic}/failed`, failNumber);
this.metricsService.recordMetric(`Queue/${deploymentName}/${queueService.topic}/completed`, completeNumber);
this.metricsService.recordMetric(`Queue/${deploymentName}/${queueService.topic}/failed`, failNumber);
}

return resolve();
Expand Down
3 changes: 2 additions & 1 deletion apps/worker/src/app/workflow/workflow.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
SubscriberJobBound,
TriggerBroadcast,
TriggerMulticast,
MetricsModule,
} from '@novu/application-generic';
import { JobRepository } from '@novu/dal';

Expand Down Expand Up @@ -129,7 +130,7 @@ const PROVIDERS: Provider[] = [
];

@Module({
imports: [SharedModule, QueuesModule],
imports: [SharedModule, QueuesModule, MetricsModule],
controllers: [],
providers: [...PROVIDERS, ...USE_CASES, ...REPOSITORIES],
})
Expand Down
1 change: 1 addition & 0 deletions apps/worker/src/types/env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ declare global {
MAX_NOVU_INTEGRATION_MAIL_REQUESTS?: string;
NOVU_EMAIL_INTEGRATION_API_KEY?: string;
STORAGE_SERVICE?: string;
METRICS_SERVICE?: string;
REDIS_HOST: string;
REDIS_PORT: number;
REDIS_PASSWORD?: string;
Expand Down
3 changes: 3 additions & 0 deletions packages/application-generic/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"reflect-metadata": "^0.1.13"
},
"dependencies": {
"@aws-sdk/client-cloudwatch": "^3.382.0",
"@aws-sdk/client-s3": "^3.382.0",
"@aws-sdk/s3-request-presigner": "^3.382.0",
"@azure/storage-blob": "^12.11.0",
Expand Down Expand Up @@ -137,12 +138,14 @@
"@istanbuljs/nyc-config-typescript": "^1.0.1",
"@types/analytics-node": "^3.1.9",
"@types/jest": "29.5.2",
"@types/newrelic": "^9",
"@types/sinon": "^9.0.0",
"codecov": "^3.5.0",
"cpx": "^1.5.0",
"dotenv": "^8.2.0",
"chai": "^4.2.0",
"jest": "^27.1.0",
"newrelic": "^9",
"npm-run-all": "^4.1.5",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding newrelic as devDependecies to keep the peer deps working as normal.

"nyc": "^15.1.0",
"prettier": "~2.8.0",
Expand Down
1 change: 1 addition & 0 deletions packages/application-generic/src/modules/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { QueuesModule } from './queues.module';
export { BaseApiQueuesModule } from './queues.module';
export { MetricsModule } from './metrics.module';
23 changes: 23 additions & 0 deletions packages/application-generic/src/modules/metrics.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Module, Provider } from '@nestjs/common';
import { MetricsService, metricsServiceList } from '../services/metrics';
import {
AwsMetricsService,
AzureMetricsService,
GCPMetricsService,
NewRelicMetricsService,
} from '../services/metrics/metrics.service';

const PROVIDERS: Provider[] = [
MetricsService,
NewRelicMetricsService,
GCPMetricsService,
AzureMetricsService,
AwsMetricsService,
metricsServiceList,
];

@Module({
providers: [...PROVIDERS],
exports: [...PROVIDERS],
})
export class MetricsModule {}
1 change: 1 addition & 0 deletions packages/application-generic/src/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export { VerifyPayloadService } from './verify-payload.service';
export { EventsDistributedLockService } from './events-distributed-lock.service';
export * from './calculate-delay';
export * from './storage';
export * from './metrics';
export * from './distributed-lock';
export {
BullMqConnectionOptions,
Expand Down
Loading