Skip to content

Commit

Permalink
Merge branch 'next' of https://github.com/novuhq/novu into next
Browse files Browse the repository at this point in the history
  • Loading branch information
scopsy committed Dec 14, 2023
2 parents 6da6299 + 9f2bbb0 commit bc97936
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 34 deletions.
14 changes: 8 additions & 6 deletions apps/ws/src/bootstrap.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import './config';
import 'newrelic';
import helmet from 'helmet';
import { NestFactory } from '@nestjs/core';
import * as Sentry from '@sentry/node';
import { RedisIoAdapter } from './shared/framework/redis.adapter';
import { BullMqService, getErrorInterceptor, Logger } from '@novu/application-generic';

import { AppModule } from './app.module';
import { CONTEXT_PATH } from './config';
import helmet from 'helmet';
import { BullMqService } from '@novu/application-generic';
import { InMemoryIoAdapter } from './shared/framework/in-memory-io.adapter';

import { version } from '../package.json';
import { getErrorInterceptor, Logger } from '@novu/application-generic';
import { prepareAppInfra, startAppInfra } from './socket/services';

if (process.env.SENTRY_DSN) {
Expand All @@ -23,7 +23,9 @@ if (process.env.SENTRY_DSN) {
export async function bootstrap() {
BullMqService.haveProInstalled();
const app = await NestFactory.create(AppModule, { bufferLogs: true });
const redisIoAdapter = new RedisIoAdapter(app);

const inMemoryAdapter = new InMemoryIoAdapter(app);
await inMemoryAdapter.connectToInMemoryCluster();

app.useLogger(app.get(Logger));
app.flushLogs();
Expand All @@ -43,7 +45,7 @@ export async function bootstrap() {
methods: ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
});

app.useWebSocketAdapter(redisIoAdapter);
app.useWebSocketAdapter(inMemoryAdapter);

app.enableShutdownHooks();

Expand Down
35 changes: 35 additions & 0 deletions apps/ws/src/shared/framework/in-memory-io.adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { IoAdapter } from '@nestjs/platform-socket.io';
import { ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { getRedisPrefix } from '@novu/shared';
import { WebSocketsInMemoryProviderService } from '@novu/application-generic';

export class InMemoryIoAdapter extends IoAdapter {
private webSocketsInMemoryProviderService: WebSocketsInMemoryProviderService;
private adapterConstructor: ReturnType<typeof createAdapter>;

async connectToInMemoryCluster(): Promise<void> {
// TODO: Pending to inject in the provider instantiation
const keyPrefix = getRedisPrefix() ? `socket.io#${getRedisPrefix()}` : 'socket.io';

this.webSocketsInMemoryProviderService = new WebSocketsInMemoryProviderService();

await this.webSocketsInMemoryProviderService.initialize();

const pubClient = this.webSocketsInMemoryProviderService.getClient();
const subClient = pubClient?.duplicate();

/*
* TODO: Might not be needed to connect as we are checking it is initialized already.
* await Promise.all([pubClient?.connect(), subClient?.connect()]);
*/
this.adapterConstructor = createAdapter(pubClient, subClient);
}

createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);

return server;
}
}
26 changes: 0 additions & 26 deletions apps/ws/src/shared/framework/redis.adapter.ts

This file was deleted.

16 changes: 14 additions & 2 deletions apps/ws/src/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import {
MessageRepository,
MemberRepository,
} from '@novu/dal';
import { AnalyticsService, DalServiceHealthIndicator, QueuesModule } from '@novu/application-generic';
import {
AnalyticsService,
DalServiceHealthIndicator,
WebSocketsInMemoryProviderService,
QueuesModule,
} from '@novu/application-generic';

import { SubscriberOnlineService } from './subscriber-online';
import { JobTopicNameEnum } from '@novu/shared';
Expand Down Expand Up @@ -47,7 +52,14 @@ const analyticsService = {
},
};

const PROVIDERS = [analyticsService, dalService, DalServiceHealthIndicator, SubscriberOnlineService, ...DAL_MODELS];
const PROVIDERS = [
analyticsService,
dalService,
DalServiceHealthIndicator,
SubscriberOnlineService,
WebSocketsInMemoryProviderService,
...DAL_MODELS,
];

@Module({
imports: [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './cache-in-memory-provider.service';
export * from './in-memory-provider.service';
export * from './web-sockets-in-memory-provider.service';
export * from './workflow-in-memory-provider.service';
export * from './types';
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { Logger } from '@nestjs/common';

import { InMemoryProviderService } from './in-memory-provider.service';
import {
InMemoryProviderEnum,
InMemoryProviderClient,
ScanStream,
} from './types';

import { GetIsInMemoryClusterModeEnabled } from '../../usecases';

const LOG_CONTEXT = 'WebSocketsInMemoryProviderService';

export class WebSocketsInMemoryProviderService {
public inMemoryProviderService: InMemoryProviderService;
public isCluster: boolean;
private getIsInMemoryClusterModeEnabled: GetIsInMemoryClusterModeEnabled;

constructor() {
this.getIsInMemoryClusterModeEnabled =
new GetIsInMemoryClusterModeEnabled();

const provider = this.selectProvider();
this.isCluster = this.isClusterMode();

this.inMemoryProviderService = new InMemoryProviderService(
provider,
this.isCluster
);
}

/**
* Rules for the provider selection:
* - For our self hosted users we assume all of them have a single node Redis
* instance.
* - For Novu we will use Elasticache. We fallback to a Redis Cluster configuration
* if Elasticache not configured properly. That's happening in the provider
* mapping in the /in-memory-provider/providers/index.ts
*/
private selectProvider(): InMemoryProviderEnum {
if (process.env.IS_DOCKER_HOSTED) {
return InMemoryProviderEnum.REDIS;
}

return InMemoryProviderEnum.ELASTICACHE;
}

private descriptiveLogMessage(message) {
return `[Provider: ${this.selectProvider()}] ${message}`;
}

private isClusterMode(): boolean {
const isClusterModeEnabled = this.getIsInMemoryClusterModeEnabled.execute();

Logger.log(
this.descriptiveLogMessage(
`Cluster mode ${
isClusterModeEnabled ? 'IS' : 'IS NOT'
} enabled for ${LOG_CONTEXT}`
),
LOG_CONTEXT
);

return isClusterModeEnabled;
}

public async initialize(): Promise<void> {
await this.inMemoryProviderService.delayUntilReadiness();
}

public getClient(): InMemoryProviderClient {
return this.inMemoryProviderService.inMemoryProviderClient;
}

public getClientStatus(): string {
return this.getClient().status;
}

public getTtl(): number {
return this.inMemoryProviderService.inMemoryProviderConfig.ttl;
}

public inMemoryScan(pattern: string): ScanStream {
return this.inMemoryProviderService.inMemoryScan(pattern);
}

public isReady(): boolean {
return this.inMemoryProviderService.isClientReady();
}

public providerInUseIsInClusterMode(): boolean {
const providerConfigured =
this.inMemoryProviderService.getProvider.configured;

return this.isCluster || providerConfigured !== InMemoryProviderEnum.REDIS;
}

public async shutdown(): Promise<void> {
await this.inMemoryProviderService.shutdown();
}
}

0 comments on commit bc97936

Please sign in to comment.