From 176e8ad70d89bb0658a64afb8bde5b0fbcfd9427 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Thu, 9 Jan 2025 16:37:32 +0100 Subject: [PATCH 1/5] feat: unique connection counting --- package.json | 1 + src/lib/db/index.ts | 2 + .../features/scheduler/schedule-services.ts | 7 ++ .../fake-unique-connection-store.ts | 23 ++++++ .../unique-connection-service.ts | 77 +++++++++++++++++++ .../unique-connection-store-type.ts | 14 ++++ .../unique-connection-store.ts | 30 ++++++++ src/lib/metric-events.ts | 3 + src/lib/middleware/response-time-metrics.ts | 7 +- src/lib/services/index.ts | 6 ++ src/lib/types/services.ts | 2 + src/lib/types/stores.ts | 3 + ...20250109150818-unique-connections-table.js | 15 ++++ src/server-dev.ts | 1 + src/test/fixtures/store.ts | 2 + yarn.lock | 17 ++++ 16 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 src/lib/features/unique-connection/fake-unique-connection-store.ts create mode 100644 src/lib/features/unique-connection/unique-connection-service.ts create mode 100644 src/lib/features/unique-connection/unique-connection-store-type.ts create mode 100644 src/lib/features/unique-connection/unique-connection-store.ts create mode 100644 src/migrations/20250109150818-unique-connections-table.js diff --git a/package.json b/package.json index ad39e457d898..bbe056855242 100644 --- a/package.json +++ b/package.json @@ -134,6 +134,7 @@ "hash-sum": "^2.0.0", "helmet": "^6.0.0", "http-errors": "^2.0.0", + "hyperloglog-lite": "^1.0.2", "ip-address": "^10.0.1", "joi": "^17.13.3", "js-sha256": "^0.11.0", diff --git a/src/lib/db/index.ts b/src/lib/db/index.ts index 4657547a42df..6d2d41e4e0b4 100644 --- a/src/lib/db/index.ts +++ b/src/lib/db/index.ts @@ -56,6 +56,7 @@ import { OnboardingStore } from '../features/onboarding/onboarding-store'; import { createOnboardingReadModel } from '../features/onboarding/createOnboardingReadModel'; import { UserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store'; import { UserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model'; +import { UniqueConnectionStore } from '../features/unique-connection/unique-connection-store'; export const createStores = ( config: IUnleashConfig, @@ -185,6 +186,7 @@ export const createStores = ( ), userUnsubscribeStore: new UserUnsubscribeStore(db), userSubscriptionsReadModel: new UserSubscriptionsReadModel(db), + uniqueConnectionStore: new UniqueConnectionStore(db), }; }; diff --git a/src/lib/features/scheduler/schedule-services.ts b/src/lib/features/scheduler/schedule-services.ts index ad5a93bbe7ac..99d373bc29e2 100644 --- a/src/lib/features/scheduler/schedule-services.ts +++ b/src/lib/features/scheduler/schedule-services.ts @@ -32,6 +32,7 @@ export const scheduleServices = async ( frontendApiService, clientMetricsServiceV2, integrationEventsService, + uniqueConnectionService, } = services; schedulerService.schedule( @@ -179,4 +180,10 @@ export const scheduleServices = async ( minutesToMilliseconds(15), 'cleanUpIntegrationEvents', ); + + schedulerService.schedule( + uniqueConnectionService.sync.bind(uniqueConnectionService), + minutesToMilliseconds(10), + 'uniqueConnectionService', + ); }; diff --git a/src/lib/features/unique-connection/fake-unique-connection-store.ts b/src/lib/features/unique-connection/fake-unique-connection-store.ts new file mode 100644 index 000000000000..babe3311dfd8 --- /dev/null +++ b/src/lib/features/unique-connection/fake-unique-connection-store.ts @@ -0,0 +1,23 @@ +import type { IUniqueConnectionStore } from '../../types'; +import type { + TimedUniqueConnections, + UniqueConnections, +} from './unique-connection-store-type'; + +export class FakeUniqueConnectionStore implements IUniqueConnectionStore { + private uniqueConnectionsRecord: Record = + {}; + + async insert(uniqueConnections: UniqueConnections): Promise { + this.uniqueConnectionsRecord[uniqueConnections.id] = { + ...uniqueConnections, + updatedAt: new Date(), + }; + } + + async get( + id: 'current' | 'previous', + ): Promise<(UniqueConnections & { updatedAt: Date }) | null> { + return this.uniqueConnectionsRecord[id] || null; + } +} diff --git a/src/lib/features/unique-connection/unique-connection-service.ts b/src/lib/features/unique-connection/unique-connection-service.ts new file mode 100644 index 000000000000..38636ec7d0e5 --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-service.ts @@ -0,0 +1,77 @@ +import type { IUnleashConfig } from '../../types/option'; +import type { IFlagResolver, IUnleashStores } from '../../types'; +import type { Logger } from '../../logger'; +import type { IUniqueConnectionStore } from './unique-connection-store-type'; +import HyperLogLog from 'hyperloglog-lite'; +import type EventEmitter from 'events'; +import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; + +export class UniqueConnectionService { + private logger: Logger; + + private uniqueConnectionStore: IUniqueConnectionStore; + + private flagResolver: IFlagResolver; + + private eventBus: EventEmitter; + + private activeHour: number; + + private hll = HyperLogLog(12); + + constructor( + { + uniqueConnectionStore, + }: Pick, + config: IUnleashConfig, + ) { + this.uniqueConnectionStore = uniqueConnectionStore; + this.logger = config.getLogger('services/unique-connection-service.ts'); + this.flagResolver = config.flagResolver; + this.eventBus = config.eventBus; + this.activeHour = new Date().getHours(); + } + + listen() { + this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this)); + } + + async count(connectionId: string) { + if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; + this.hll.add(HyperLogLog.hash(connectionId)); + } + + async sync(): Promise { + if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; + const currentHour = new Date().getHours(); + const currentBucket = await this.uniqueConnectionStore.get('current'); + if (this.activeHour !== currentHour && currentBucket) { + if (currentBucket.updatedAt.getHours() < currentHour) { + this.hll.merge({ n: 12, buckets: currentBucket.hll }); + await this.uniqueConnectionStore.insert({ + hll: this.hll.output().buckets, + id: 'previous', + }); + } else { + const previousBucket = + await this.uniqueConnectionStore.get('previous'); + this.hll.merge({ n: 12, buckets: previousBucket }); + await this.uniqueConnectionStore.insert({ + hll: this.hll.output().buckets, + id: 'previous', + }); + } + this.activeHour = currentHour; + + this.hll = HyperLogLog(12); + } else { + if (currentBucket) { + this.hll.merge({ n: 12, buckets: currentBucket }); + } + await this.uniqueConnectionStore.insert({ + hll: this.hll.output().buckets, + id: 'current', + }); + } + } +} diff --git a/src/lib/features/unique-connection/unique-connection-store-type.ts b/src/lib/features/unique-connection/unique-connection-store-type.ts new file mode 100644 index 000000000000..0fb6f0d5c459 --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-store-type.ts @@ -0,0 +1,14 @@ +export type UniqueConnections = { + hll: Buffer; + id: 'current' | 'previous'; +}; + +export type TimedUniqueConnections = UniqueConnections & { + updatedAt: Date; +}; + +// id, hll, updated_at +export interface IUniqueConnectionStore { + insert(uniqueConnections: UniqueConnections): Promise; + get(id: 'current' | 'previous'): Promise; +} diff --git a/src/lib/features/unique-connection/unique-connection-store.ts b/src/lib/features/unique-connection/unique-connection-store.ts new file mode 100644 index 000000000000..dfd502584344 --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-store.ts @@ -0,0 +1,30 @@ +import type { Db } from '../../db/db'; +import type { IUniqueConnectionStore } from '../../types'; +import type { UniqueConnections } from './unique-connection-store-type'; + +export class UniqueConnectionStore implements IUniqueConnectionStore { + private db: Db; + + constructor(db: Db) { + this.db = db; + } + + async insert(uniqueConnections: UniqueConnections): Promise { + await this.db('unique_connections') + .insert({ id: uniqueConnections.id, hll: uniqueConnections.hll }) + .onConflict('id') + .merge(); + } + + async get( + id: 'current' | 'previous', + ): Promise<(UniqueConnections & { updatedAt: Date }) | null> { + const row = await this.db('unique_connections') + .select('id', 'hll', 'updated_at') + .where('id', id) + .first(); + return row + ? { id: row.id, hll: row.hll, updatedAt: row.updated_at } + : null; + } +} diff --git a/src/lib/metric-events.ts b/src/lib/metric-events.ts index 6591ea4c672f..29557f9b84de 100644 --- a/src/lib/metric-events.ts +++ b/src/lib/metric-events.ts @@ -1,6 +1,7 @@ import type EventEmitter from 'events'; const REQUEST_TIME = 'request_time'; +const SDK_CONNECTION_ID_RECEIVED = 'sdk_connection_id_received'; const DB_TIME = 'db_time'; const FUNCTION_TIME = 'function_time'; const SCHEDULER_JOB_TIME = 'scheduler_job_time'; @@ -21,6 +22,7 @@ const CLIENT_DELTA_MEMORY = 'client_delta_memory'; type MetricEvent = | typeof REQUEST_TIME + | typeof SDK_CONNECTION_ID_RECEIVED | typeof DB_TIME | typeof FUNCTION_TIME | typeof SCHEDULER_JOB_TIME @@ -71,6 +73,7 @@ const onMetricEvent = ( export { REQUEST_TIME, + SDK_CONNECTION_ID_RECEIVED, DB_TIME, SCHEDULER_JOB_TIME, FUNCTION_TIME, diff --git a/src/lib/middleware/response-time-metrics.ts b/src/lib/middleware/response-time-metrics.ts index 8ab50328ee03..5b19a5a3f1f8 100644 --- a/src/lib/middleware/response-time-metrics.ts +++ b/src/lib/middleware/response-time-metrics.ts @@ -1,6 +1,6 @@ import * as responseTime from 'response-time'; import type EventEmitter from 'events'; -import { REQUEST_TIME } from '../metric-events'; +import { REQUEST_TIME, SDK_CONNECTION_ID_RECEIVED } from '../metric-events'; import type { IFlagResolver } from '../types/experimental'; import type { InstanceStatsService } from '../services'; import type { RequestHandler } from 'express'; @@ -66,6 +66,11 @@ export function responseTimeMetrics( req.query.appName; } + const connectionId = req.headers['x-unleash-connection-id']; + if (connectionId && flagResolver.isEnabled('uniqueSdkTracking')) { + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, connectionId); + } + const timingInfo = { path: pathname, method: req.method, diff --git a/src/lib/services/index.ts b/src/lib/services/index.ts index 3cdc3fe1ba02..cb9a067febf0 100644 --- a/src/lib/services/index.ts +++ b/src/lib/services/index.ts @@ -157,6 +157,7 @@ import { createContextService, createFakeContextService, } from '../features/context/createContextService'; +import { UniqueConnectionService } from '../features/unique-connection/unique-connection-service'; export const createServices = ( stores: IUnleashStores, @@ -403,6 +404,9 @@ export const createServices = ( const featureLifecycleService = transactionalFeatureLifecycleService; featureLifecycleService.listen(); + const uniqueConnectionService = new UniqueConnectionService(stores, config); + uniqueConnectionService.listen(); + const onboardingService = db ? createOnboardingService(config)(db) : createFakeOnboardingService(config).onboardingService; @@ -484,6 +488,7 @@ export const createServices = ( personalDashboardService, projectStatusService, transactionalUserSubscriptionsService, + uniqueConnectionService, }; }; @@ -537,4 +542,5 @@ export { PersonalDashboardService, ProjectStatusService, UserSubscriptionsService, + UniqueConnectionService, }; diff --git a/src/lib/types/services.ts b/src/lib/types/services.ts index 5fa6d107c7fc..e37efac1a580 100644 --- a/src/lib/types/services.ts +++ b/src/lib/types/services.ts @@ -59,6 +59,7 @@ import type { OnboardingService } from '../features/onboarding/onboarding-servic import type { PersonalDashboardService } from '../features/personal-dashboard/personal-dashboard-service'; import type { ProjectStatusService } from '../features/project-status/project-status-service'; import type { UserSubscriptionsService } from '../features/user-subscriptions/user-subscriptions-service'; +import type { UniqueConnectionService } from '../features/unique-connection/unique-connection-service'; export interface IUnleashServices { transactionalAccessService: WithTransactional; @@ -131,4 +132,5 @@ export interface IUnleashServices { personalDashboardService: PersonalDashboardService; projectStatusService: ProjectStatusService; transactionalUserSubscriptionsService: WithTransactional; + uniqueConnectionService: UniqueConnectionService; } diff --git a/src/lib/types/stores.ts b/src/lib/types/stores.ts index 9998009fe692..710b842d48e8 100644 --- a/src/lib/types/stores.ts +++ b/src/lib/types/stores.ts @@ -53,6 +53,7 @@ import { IOnboardingReadModel } from '../features/onboarding/onboarding-read-mod import { IOnboardingStore } from '../features/onboarding/onboarding-store-type'; import type { IUserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store-type'; import type { IUserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model-type'; +import { IUniqueConnectionStore } from '../features/unique-connection/unique-connection-store-type'; export interface IUnleashStores { accessStore: IAccessStore; @@ -110,6 +111,7 @@ export interface IUnleashStores { onboardingStore: IOnboardingStore; userUnsubscribeStore: IUserUnsubscribeStore; userSubscriptionsReadModel: IUserSubscriptionsReadModel; + uniqueConnectionStore: IUniqueConnectionStore; } export { @@ -165,4 +167,5 @@ export { type IProjectReadModel, IOnboardingStore, type IUserSubscriptionsReadModel, + IUniqueConnectionStore, }; diff --git a/src/migrations/20250109150818-unique-connections-table.js b/src/migrations/20250109150818-unique-connections-table.js new file mode 100644 index 000000000000..e3632a49dabb --- /dev/null +++ b/src/migrations/20250109150818-unique-connections-table.js @@ -0,0 +1,15 @@ +exports.up = function(db, cb) { + db.runSql(` + CREATE TABLE IF NOT EXISTS unique_connections + ( + id VARCHAR(255) NOT NULL, + updated_at TIMESTAMP DEFAULT now(), + hll BYTEA NOT NULL, + PRIMARY KEY (id) + ); +`, cb) +}; + +exports.down = function(db, cb) { + db.runSql(`DROP TABLE unique_connections;`, cb); +}; diff --git a/src/server-dev.ts b/src/server-dev.ts index 22aea88451de..1e08a0a007f4 100644 --- a/src/server-dev.ts +++ b/src/server-dev.ts @@ -55,6 +55,7 @@ process.nextTick(async () => { flagOverviewRedesign: false, granularAdminPermissions: true, deltaApi: true, + uniqueSdkTracking: true, }, }, authentication: { diff --git a/src/test/fixtures/store.ts b/src/test/fixtures/store.ts index be96d9a380b7..21038c30f12a 100644 --- a/src/test/fixtures/store.ts +++ b/src/test/fixtures/store.ts @@ -56,6 +56,7 @@ import { FakeOnboardingStore } from '../../lib/features/onboarding/fake-onboardi import { createFakeOnboardingReadModel } from '../../lib/features/onboarding/createOnboardingReadModel'; import { FakeUserUnsubscribeStore } from '../../lib/features/user-subscriptions/fake-user-unsubscribe-store'; import { FakeUserSubscriptionsReadModel } from '../../lib/features/user-subscriptions/fake-user-subscriptions-read-model'; +import { FakeUniqueConnectionStore } from '../../lib/features/unique-connection/fake-unique-connection-store'; const db = { select: () => ({ @@ -121,6 +122,7 @@ const createStores: () => IUnleashStores = () => { onboardingStore: new FakeOnboardingStore(), userUnsubscribeStore: new FakeUserUnsubscribeStore(), userSubscriptionsReadModel: new FakeUserSubscriptionsReadModel(), + uniqueConnectionStore: new FakeUniqueConnectionStore(), }; }; diff --git a/yarn.lock b/yarn.lock index 014f65b439c9..d8dbff984bbc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4919,6 +4919,15 @@ __metadata: languageName: node linkType: hard +"hyperloglog-lite@npm:^1.0.2": + version: 1.0.2 + resolution: "hyperloglog-lite@npm:1.0.2" + dependencies: + murmurhash32-node: "npm:^1.0.1" + checksum: 10c0/3077b9dba1bac384b842a70d1b17da58449d3e633936ef7bd03a3386613e59c413f5f886d9383d14c3fe31eac524abe28a99025d43c446e57aa4175b17675450 + languageName: node + linkType: hard + "iconv-lite@npm:0.4.24": version: 0.4.24 resolution: "iconv-lite@npm:0.4.24" @@ -6831,6 +6840,13 @@ __metadata: languageName: node linkType: hard +"murmurhash32-node@npm:^1.0.1": + version: 1.0.1 + resolution: "murmurhash32-node@npm:1.0.1" + checksum: 10c0/06a36a2f0d0c6855ce131c2a5c225c3096f53bf36898eb2683b2200f782577cde07f07485792d6e85798ea74f4dd95e836058fbab07c49cfcbc0a79b168ab654 + languageName: node + linkType: hard + "murmurhash3js@npm:^3.0.1": version: 3.0.1 resolution: "murmurhash3js@npm:3.0.1" @@ -9311,6 +9327,7 @@ __metadata: helmet: "npm:^6.0.0" http-errors: "npm:^2.0.0" husky: "npm:^9.0.11" + hyperloglog-lite: "npm:^1.0.2" ip-address: "npm:^10.0.1" jest: "npm:29.7.0" jest-junit: "npm:^16.0.0" From 7f897b109bb713ad3174be1c02381f7d1c76d34b Mon Sep 17 00:00:00 2001 From: kwasniew Date: Thu, 9 Jan 2025 16:52:28 +0100 Subject: [PATCH 2/5] feat: unique connection counting --- src/lib/middleware/response-time-metrics.test.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/lib/middleware/response-time-metrics.test.ts b/src/lib/middleware/response-time-metrics.test.ts index 6e8dd541d951..db4456e50bb1 100644 --- a/src/lib/middleware/response-time-metrics.test.ts +++ b/src/lib/middleware/response-time-metrics.test.ts @@ -66,6 +66,7 @@ describe('responseTimeMetrics new behavior', () => { }, method: 'GET', path: 'should-not-be-used', + headers: {}, }; // @ts-expect-error req and res doesn't have all properties @@ -98,6 +99,7 @@ describe('responseTimeMetrics new behavior', () => { }; const reqWithoutRoute = { method: 'GET', + headers: {}, }; // @ts-expect-error req and res doesn't have all properties @@ -132,6 +134,7 @@ describe('responseTimeMetrics new behavior', () => { }; const reqWithoutRoute = { method: 'GET', + headers: {}, }; // @ts-expect-error req and res doesn't have all properties @@ -166,6 +169,7 @@ describe('responseTimeMetrics new behavior', () => { const reqWithoutRoute = { method: 'GET', path, + headers: {}, }; // @ts-expect-error req and res doesn't have all properties @@ -210,6 +214,7 @@ describe('responseTimeMetrics new behavior', () => { const reqWithoutRoute = { method: 'GET', path, + headers: {}, }; // @ts-expect-error req and res doesn't have all properties From 87c1e096ae05e71a198ae0a55f85a5de880fd6da Mon Sep 17 00:00:00 2001 From: kwasniew Date: Fri, 10 Jan 2025 10:40:00 +0100 Subject: [PATCH 3/5] feat: improve logic for syncing counts --- .../fake-unique-connection-store.ts | 4 + .../unique-connection-service.test.ts | 142 ++++++++++++++++++ .../unique-connection-service.ts | 42 ++++-- .../unique-connection-store-type.ts | 1 + .../unique-connection-store.e2e.test.ts | 58 +++++++ .../unique-connection-store.ts | 4 + 6 files changed, 242 insertions(+), 9 deletions(-) create mode 100644 src/lib/features/unique-connection/unique-connection-service.test.ts create mode 100644 src/lib/features/unique-connection/unique-connection-store.e2e.test.ts diff --git a/src/lib/features/unique-connection/fake-unique-connection-store.ts b/src/lib/features/unique-connection/fake-unique-connection-store.ts index babe3311dfd8..cad87bc22710 100644 --- a/src/lib/features/unique-connection/fake-unique-connection-store.ts +++ b/src/lib/features/unique-connection/fake-unique-connection-store.ts @@ -20,4 +20,8 @@ export class FakeUniqueConnectionStore implements IUniqueConnectionStore { ): Promise<(UniqueConnections & { updatedAt: Date }) | null> { return this.uniqueConnectionsRecord[id] || null; } + + async deleteAll(): Promise { + this.uniqueConnectionsRecord = {}; + } } diff --git a/src/lib/features/unique-connection/unique-connection-service.test.ts b/src/lib/features/unique-connection/unique-connection-service.test.ts new file mode 100644 index 000000000000..603fc9b76da8 --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-service.test.ts @@ -0,0 +1,142 @@ +import { UniqueConnectionService } from './unique-connection-service'; +import { FakeUniqueConnectionStore } from './fake-unique-connection-store'; +import getLogger from '../../../test/fixtures/no-logger'; +import type { IFlagResolver } from '../../types'; +import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; +import { addHours } from 'date-fns'; +import { EventEmitter } from 'events'; + +const alwaysOnFlagResolver = { + isEnabled() { + return true; + }, +} as unknown as IFlagResolver; + +test('sync first current bucket', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + uniqueConnectionService.listen(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + + await uniqueConnectionService.sync(); + + const stats = await uniqueConnectionService.getStats(); + expect(stats).toEqual({ previous: 0, current: 2 }); +}); + +test('sync first previous bucket', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + uniqueConnectionService.listen(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + + await uniqueConnectionService.sync(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3'); + + await uniqueConnectionService.sync(() => addHours(new Date(), 1)); + + const stats = await uniqueConnectionService.getStats(); + expect(stats).toEqual({ previous: 3, current: 0 }); +}); + +test('sync to existing current bucket from the same service', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + uniqueConnectionService.listen(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + + await uniqueConnectionService.sync(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3'); + + const stats = await uniqueConnectionService.getStats(); + expect(stats).toEqual({ previous: 0, current: 3 }); +}); + +test('sync to existing current bucket from another service', async () => { + const eventBus = new EventEmitter(); + const config = { + flagResolver: alwaysOnFlagResolver, + getLogger, + eventBus: eventBus, + }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService1 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + const uniqueConnectionService2 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + + uniqueConnectionService1.count('connection1'); + uniqueConnectionService1.count('connection2'); + await uniqueConnectionService1.sync(); + + uniqueConnectionService2.count('connection1'); + uniqueConnectionService2.count('connection3'); + await uniqueConnectionService2.sync(); + + const stats1 = await uniqueConnectionService1.getStats(); + expect(stats1).toEqual({ previous: 0, current: 3 }); + const stats2 = await uniqueConnectionService2.getStats(); + expect(stats2).toEqual({ previous: 0, current: 3 }); +}); + +test('sync to existing previous bucket from another service', async () => { + const eventBus = new EventEmitter(); + const config = { + flagResolver: alwaysOnFlagResolver, + getLogger, + eventBus: eventBus, + }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService1 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + const uniqueConnectionService2 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + + uniqueConnectionService1.count('connection1'); + uniqueConnectionService1.count('connection2'); + await uniqueConnectionService1.sync(() => addHours(new Date(), 1)); + + uniqueConnectionService2.count('connection1'); + uniqueConnectionService2.count('connection3'); + await uniqueConnectionService2.sync(() => addHours(new Date(), 1)); + + const stats1 = await uniqueConnectionService1.getStats(); + expect(stats1).toEqual({ previous: 3, current: 0 }); + const stats2 = await uniqueConnectionService2.getStats(); + expect(stats1).toEqual({ previous: 3, current: 0 }); +}); diff --git a/src/lib/features/unique-connection/unique-connection-service.ts b/src/lib/features/unique-connection/unique-connection-service.ts index 38636ec7d0e5..cd1822e2c54f 100644 --- a/src/lib/features/unique-connection/unique-connection-service.ts +++ b/src/lib/features/unique-connection/unique-connection-service.ts @@ -6,6 +6,8 @@ import HyperLogLog from 'hyperloglog-lite'; import type EventEmitter from 'events'; import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; +const n = 12; + export class UniqueConnectionService { private logger: Logger; @@ -17,13 +19,13 @@ export class UniqueConnectionService { private activeHour: number; - private hll = HyperLogLog(12); + private hll = HyperLogLog(n); constructor( { uniqueConnectionStore, }: Pick, - config: IUnleashConfig, + config: Pick, ) { this.uniqueConnectionStore = uniqueConnectionStore; this.logger = config.getLogger('services/unique-connection-service.ts'); @@ -36,18 +38,34 @@ export class UniqueConnectionService { this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this)); } - async count(connectionId: string) { + count(connectionId: string) { if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; this.hll.add(HyperLogLog.hash(connectionId)); } - async sync(): Promise { + async getStats() { + const [previous, current] = await Promise.all([ + this.uniqueConnectionStore.get('previous'), + this.uniqueConnectionStore.get('current'), + ]); + const previousHll = HyperLogLog(n); + if (previous) { + previousHll.merge({ n, buckets: previous.hll }); + } + const currentHll = HyperLogLog(n); + if (current) { + currentHll.merge({ n, buckets: current.hll }); + } + return { previous: previousHll.count(), current: currentHll.count() }; + } + + async sync(clock = () => new Date()): Promise { if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; - const currentHour = new Date().getHours(); + const currentHour = clock().getHours(); const currentBucket = await this.uniqueConnectionStore.get('current'); if (this.activeHour !== currentHour && currentBucket) { if (currentBucket.updatedAt.getHours() < currentHour) { - this.hll.merge({ n: 12, buckets: currentBucket.hll }); + this.hll.merge({ n, buckets: currentBucket.hll }); await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, id: 'previous', @@ -55,7 +73,9 @@ export class UniqueConnectionService { } else { const previousBucket = await this.uniqueConnectionStore.get('previous'); - this.hll.merge({ n: 12, buckets: previousBucket }); + if (previousBucket) { + this.hll.merge({ n, buckets: previousBucket.hll }); + } await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, id: 'previous', @@ -63,10 +83,14 @@ export class UniqueConnectionService { } this.activeHour = currentHour; - this.hll = HyperLogLog(12); + this.hll = HyperLogLog(n); + await this.uniqueConnectionStore.insert({ + hll: this.hll.output().buckets, + id: 'current', + }); } else { if (currentBucket) { - this.hll.merge({ n: 12, buckets: currentBucket }); + this.hll.merge({ n, buckets: currentBucket.hll }); } await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, diff --git a/src/lib/features/unique-connection/unique-connection-store-type.ts b/src/lib/features/unique-connection/unique-connection-store-type.ts index 0fb6f0d5c459..79ee2d55e273 100644 --- a/src/lib/features/unique-connection/unique-connection-store-type.ts +++ b/src/lib/features/unique-connection/unique-connection-store-type.ts @@ -11,4 +11,5 @@ export type TimedUniqueConnections = UniqueConnections & { export interface IUniqueConnectionStore { insert(uniqueConnections: UniqueConnections): Promise; get(id: 'current' | 'previous'): Promise; + deleteAll(): Promise; } diff --git a/src/lib/features/unique-connection/unique-connection-store.e2e.test.ts b/src/lib/features/unique-connection/unique-connection-store.e2e.test.ts new file mode 100644 index 000000000000..29fca9e6914e --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-store.e2e.test.ts @@ -0,0 +1,58 @@ +import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init'; +import getLogger from '../../../test/fixtures/no-logger'; +import type { + IUniqueConnectionStore, + IUnleashStores, +} from '../../../lib/types'; +import HyperLogLog from 'hyperloglog-lite'; + +let stores: IUnleashStores; +let db: ITestDb; +let uniqueConnectionStore: IUniqueConnectionStore; + +beforeAll(async () => { + db = await dbInit('unique_connections_store', getLogger); + stores = db.stores; + uniqueConnectionStore = stores.uniqueConnectionStore; +}); + +afterAll(async () => { + await db.destroy(); +}); + +beforeEach(async () => { + await uniqueConnectionStore.deleteAll(); +}); + +test('should store empty HyperLogLog buffer', async () => { + const hll = HyperLogLog(12); + await uniqueConnectionStore.insert({ + id: 'current', + hll: hll.output().buckets, + }); + + const fetchedHll = await uniqueConnectionStore.get('current'); + hll.merge({ n: 12, buckets: fetchedHll!.hll }); + expect(hll.count()).toBe(0); +}); + +test('should store non empty HyperLogLog buffer', async () => { + const hll = HyperLogLog(12); + hll.add(HyperLogLog.hash('connection-1')); + hll.add(HyperLogLog.hash('connection-2')); + await uniqueConnectionStore.insert({ + id: 'current', + hll: hll.output().buckets, + }); + + const fetchedHll = await uniqueConnectionStore.get('current'); + const emptyHll = HyperLogLog(12); + emptyHll.merge({ n: 12, buckets: fetchedHll!.hll }); + expect(hll.count()).toBe(2); +}); + +test('should indicate when no entry', async () => { + const fetchedHll = await uniqueConnectionStore.get('current'); + + expect(fetchedHll).toBeNull(); +}); diff --git a/src/lib/features/unique-connection/unique-connection-store.ts b/src/lib/features/unique-connection/unique-connection-store.ts index dfd502584344..aa3eab7c5c2d 100644 --- a/src/lib/features/unique-connection/unique-connection-store.ts +++ b/src/lib/features/unique-connection/unique-connection-store.ts @@ -27,4 +27,8 @@ export class UniqueConnectionStore implements IUniqueConnectionStore { ? { id: row.id, hll: row.hll, updatedAt: row.updated_at } : null; } + + async deleteAll(): Promise { + await this.db('unique_connections').delete(); + } } From 486f009ca5dde1ebbc6136c92fe0dac723c424de Mon Sep 17 00:00:00 2001 From: kwasniew Date: Fri, 10 Jan 2025 10:48:51 +0100 Subject: [PATCH 4/5] feat: improve logic for syncing counts --- .../unique-connection-service.test.ts | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/lib/features/unique-connection/unique-connection-service.test.ts b/src/lib/features/unique-connection/unique-connection-service.test.ts index 603fc9b76da8..f609edeb20d6 100644 --- a/src/lib/features/unique-connection/unique-connection-service.test.ts +++ b/src/lib/features/unique-connection/unique-connection-service.test.ts @@ -4,7 +4,7 @@ import getLogger from '../../../test/fixtures/no-logger'; import type { IFlagResolver } from '../../types'; import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; import { addHours } from 'date-fns'; -import { EventEmitter } from 'events'; +import EventEmitter from 'events'; const alwaysOnFlagResolver = { isEnabled() { @@ -67,13 +67,13 @@ test('sync to existing current bucket from the same service', async () => { ); uniqueConnectionService.listen(); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + uniqueConnectionService.count('connection1'); + uniqueConnectionService.count('connection2'); await uniqueConnectionService.sync(); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3'); + uniqueConnectionService.count('connection1'); + uniqueConnectionService.count('connection3'); const stats = await uniqueConnectionService.getStats(); expect(stats).toEqual({ previous: 0, current: 3 }); @@ -140,3 +140,30 @@ test('sync to existing previous bucket from another service', async () => { const stats2 = await uniqueConnectionService2.getStats(); expect(stats1).toEqual({ previous: 3, current: 0 }); }); + +test('populate previous and current', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + + uniqueConnectionService.count('connection1'); + uniqueConnectionService.count('connection2'); + await uniqueConnectionService.sync(); + await uniqueConnectionService.sync(); + + uniqueConnectionService.count('connection3'); + await uniqueConnectionService.sync(() => addHours(new Date(), 1)); + await uniqueConnectionService.sync(() => addHours(new Date(), 1)); + + uniqueConnectionService.count('connection3'); + uniqueConnectionService.count('connection4'); + await uniqueConnectionService.sync(() => addHours(new Date(), 1)); + await uniqueConnectionService.sync(() => addHours(new Date(), 1)); + + const stats = await uniqueConnectionService.getStats(); + expect(stats).toEqual({ previous: 3, current: 2 }); +}); From 6c7a8c9be7486c80fe576baa0fad8864e79f02c4 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Fri, 10 Jan 2025 13:26:49 +0100 Subject: [PATCH 5/5] chore: sync with main --- .../20250109150818-unique-connections-table.js | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 src/migrations/20250109150818-unique-connections-table.js diff --git a/src/migrations/20250109150818-unique-connections-table.js b/src/migrations/20250109150818-unique-connections-table.js deleted file mode 100644 index e3632a49dabb..000000000000 --- a/src/migrations/20250109150818-unique-connections-table.js +++ /dev/null @@ -1,15 +0,0 @@ -exports.up = function(db, cb) { - db.runSql(` - CREATE TABLE IF NOT EXISTS unique_connections - ( - id VARCHAR(255) NOT NULL, - updated_at TIMESTAMP DEFAULT now(), - hll BYTEA NOT NULL, - PRIMARY KEY (id) - ); -`, cb) -}; - -exports.down = function(db, cb) { - db.runSql(`DROP TABLE unique_connections;`, cb); -};