Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unique connection counting #9074

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
"hash-sum": "^2.0.0",
"helmet": "^6.0.0",
"http-errors": "^2.0.0",
"hyperloglog-lite": "^1.0.2",
"ip-address": "^10.0.1",
"joi": "^17.13.3",
"js-sha256": "^0.11.0",
Expand Down
2 changes: 2 additions & 0 deletions src/lib/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import { OnboardingStore } from '../features/onboarding/onboarding-store';
import { createOnboardingReadModel } from '../features/onboarding/createOnboardingReadModel';
import { UserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store';
import { UserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model';
import { UniqueConnectionStore } from '../features/unique-connection/unique-connection-store';

export const createStores = (
config: IUnleashConfig,
Expand Down Expand Up @@ -185,6 +186,7 @@ export const createStores = (
),
userUnsubscribeStore: new UserUnsubscribeStore(db),
userSubscriptionsReadModel: new UserSubscriptionsReadModel(db),
uniqueConnectionStore: new UniqueConnectionStore(db),
};
};

Expand Down
7 changes: 7 additions & 0 deletions src/lib/features/scheduler/schedule-services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export const scheduleServices = async (
frontendApiService,
clientMetricsServiceV2,
integrationEventsService,
uniqueConnectionService,
} = services;

schedulerService.schedule(
Expand Down Expand Up @@ -179,4 +180,10 @@ export const scheduleServices = async (
minutesToMilliseconds(15),
'cleanUpIntegrationEvents',
);

schedulerService.schedule(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync in-memory HyperLogLogs with DB

uniqueConnectionService.sync.bind(uniqueConnectionService),
minutesToMilliseconds(10),
'uniqueConnectionService',
);
};
23 changes: 23 additions & 0 deletions src/lib/features/unique-connection/fake-unique-connection-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { IUniqueConnectionStore } from '../../types';
import type {
TimedUniqueConnections,
UniqueConnections,
} from './unique-connection-store-type';

export class FakeUniqueConnectionStore implements IUniqueConnectionStore {
private uniqueConnectionsRecord: Record<string, TimedUniqueConnections> =
{};

async insert(uniqueConnections: UniqueConnections): Promise<void> {
this.uniqueConnectionsRecord[uniqueConnections.id] = {
...uniqueConnections,
updatedAt: new Date(),
};
}

async get(
id: 'current' | 'previous',
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
return this.uniqueConnectionsRecord[id] || null;
}
}
77 changes: 77 additions & 0 deletions src/lib/features/unique-connection/unique-connection-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import type { IUnleashConfig } from '../../types/option';
import type { IFlagResolver, IUnleashStores } from '../../types';
import type { Logger } from '../../logger';
import type { IUniqueConnectionStore } from './unique-connection-store-type';
import HyperLogLog from 'hyperloglog-lite';
import type EventEmitter from 'events';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';

export class UniqueConnectionService {
private logger: Logger;

private uniqueConnectionStore: IUniqueConnectionStore;

private flagResolver: IFlagResolver;

private eventBus: EventEmitter;

private activeHour: number;

private hll = HyperLogLog(12);

constructor(
{
uniqueConnectionStore,
}: Pick<IUnleashStores, 'uniqueConnectionStore'>,
config: IUnleashConfig,
) {
this.uniqueConnectionStore = uniqueConnectionStore;
this.logger = config.getLogger('services/unique-connection-service.ts');
this.flagResolver = config.flagResolver;
this.eventBus = config.eventBus;
this.activeHour = new Date().getHours();
}

listen() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this service is interested in new connection ids received

this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this));
}

async count(connectionId: string) {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
this.hll.add(HyperLogLog.hash(connectionId));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this data structure tracks unique connections

}

async sync(): Promise<void> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic will get detailed coverage. We track current and previous HyperLogLog in 2 DB rows. If we found the hour has changed we start a new current bucket and copy current to previous. We have to accommodate for multiple pods where any pod can be the first one to start a current bucket.

if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
const currentHour = new Date().getHours();
const currentBucket = await this.uniqueConnectionStore.get('current');
if (this.activeHour !== currentHour && currentBucket) {
if (currentBucket.updatedAt.getHours() < currentHour) {
this.hll.merge({ n: 12, buckets: currentBucket.hll });
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
} else {
const previousBucket =
await this.uniqueConnectionStore.get('previous');
this.hll.merge({ n: 12, buckets: previousBucket });
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
}
this.activeHour = currentHour;

this.hll = HyperLogLog(12);
} else {
if (currentBucket) {
this.hll.merge({ n: 12, buckets: currentBucket });
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'current',
});
}
}
}
14 changes: 14 additions & 0 deletions src/lib/features/unique-connection/unique-connection-store-type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export type UniqueConnections = {
hll: Buffer;
id: 'current' | 'previous';
};

export type TimedUniqueConnections = UniqueConnections & {
updatedAt: Date;
};

// id, hll, updated_at
export interface IUniqueConnectionStore {
insert(uniqueConnections: UniqueConnections): Promise<void>;
get(id: 'current' | 'previous'): Promise<TimedUniqueConnections | null>;
}
30 changes: 30 additions & 0 deletions src/lib/features/unique-connection/unique-connection-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { Db } from '../../db/db';
import type { IUniqueConnectionStore } from '../../types';
import type { UniqueConnections } from './unique-connection-store-type';

export class UniqueConnectionStore implements IUniqueConnectionStore {
private db: Db;

constructor(db: Db) {
this.db = db;
}

async insert(uniqueConnections: UniqueConnections): Promise<void> {
await this.db<UniqueConnections>('unique_connections')
.insert({ id: uniqueConnections.id, hll: uniqueConnections.hll })
.onConflict('id')
.merge();
}

async get(
id: 'current' | 'previous',
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
const row = await this.db('unique_connections')
.select('id', 'hll', 'updated_at')
.where('id', id)
.first();
return row
? { id: row.id, hll: row.hll, updatedAt: row.updated_at }
: null;
}
}
3 changes: 3 additions & 0 deletions src/lib/metric-events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type EventEmitter from 'events';

const REQUEST_TIME = 'request_time';
const SDK_CONNECTION_ID_RECEIVED = 'sdk_connection_id_received';
const DB_TIME = 'db_time';
const FUNCTION_TIME = 'function_time';
const SCHEDULER_JOB_TIME = 'scheduler_job_time';
Expand All @@ -21,6 +22,7 @@ const CLIENT_DELTA_MEMORY = 'client_delta_memory';

type MetricEvent =
| typeof REQUEST_TIME
| typeof SDK_CONNECTION_ID_RECEIVED
| typeof DB_TIME
| typeof FUNCTION_TIME
| typeof SCHEDULER_JOB_TIME
Expand Down Expand Up @@ -71,6 +73,7 @@ const onMetricEvent = <T extends MetricEvent>(

export {
REQUEST_TIME,
SDK_CONNECTION_ID_RECEIVED,
DB_TIME,
SCHEDULER_JOB_TIME,
FUNCTION_TIME,
Expand Down
5 changes: 5 additions & 0 deletions src/lib/middleware/response-time-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ describe('responseTimeMetrics new behavior', () => {
},
method: 'GET',
path: 'should-not-be-used',
headers: {},
};

// @ts-expect-error req and res doesn't have all properties
Expand Down Expand Up @@ -98,6 +99,7 @@ describe('responseTimeMetrics new behavior', () => {
};
const reqWithoutRoute = {
method: 'GET',
headers: {},
};

// @ts-expect-error req and res doesn't have all properties
Expand Down Expand Up @@ -132,6 +134,7 @@ describe('responseTimeMetrics new behavior', () => {
};
const reqWithoutRoute = {
method: 'GET',
headers: {},
};

// @ts-expect-error req and res doesn't have all properties
Expand Down Expand Up @@ -166,6 +169,7 @@ describe('responseTimeMetrics new behavior', () => {
const reqWithoutRoute = {
method: 'GET',
path,
headers: {},
};

// @ts-expect-error req and res doesn't have all properties
Expand Down Expand Up @@ -210,6 +214,7 @@ describe('responseTimeMetrics new behavior', () => {
const reqWithoutRoute = {
method: 'GET',
path,
headers: {},
};

// @ts-expect-error req and res doesn't have all properties
Expand Down
7 changes: 6 additions & 1 deletion src/lib/middleware/response-time-metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as responseTime from 'response-time';
import type EventEmitter from 'events';
import { REQUEST_TIME } from '../metric-events';
import { REQUEST_TIME, SDK_CONNECTION_ID_RECEIVED } from '../metric-events';
import type { IFlagResolver } from '../types/experimental';
import type { InstanceStatsService } from '../services';
import type { RequestHandler } from 'express';
Expand Down Expand Up @@ -66,6 +66,11 @@ export function responseTimeMetrics(
req.query.appName;
}

const connectionId = req.headers['x-unleash-connection-id'];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where it starts. on each detected connection id received we let the interested parties know

if (connectionId && flagResolver.isEnabled('uniqueSdkTracking')) {
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, connectionId);
}

const timingInfo = {
path: pathname,
method: req.method,
Expand Down
6 changes: 6 additions & 0 deletions src/lib/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ import {
createContextService,
createFakeContextService,
} from '../features/context/createContextService';
import { UniqueConnectionService } from '../features/unique-connection/unique-connection-service';

export const createServices = (
stores: IUnleashStores,
Expand Down Expand Up @@ -403,6 +404,9 @@ export const createServices = (
const featureLifecycleService = transactionalFeatureLifecycleService;
featureLifecycleService.listen();

const uniqueConnectionService = new UniqueConnectionService(stores, config);
uniqueConnectionService.listen();

const onboardingService = db
? createOnboardingService(config)(db)
: createFakeOnboardingService(config).onboardingService;
Expand Down Expand Up @@ -484,6 +488,7 @@ export const createServices = (
personalDashboardService,
projectStatusService,
transactionalUserSubscriptionsService,
uniqueConnectionService,
};
};

Expand Down Expand Up @@ -537,4 +542,5 @@ export {
PersonalDashboardService,
ProjectStatusService,
UserSubscriptionsService,
UniqueConnectionService,
};
2 changes: 2 additions & 0 deletions src/lib/types/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import type { OnboardingService } from '../features/onboarding/onboarding-servic
import type { PersonalDashboardService } from '../features/personal-dashboard/personal-dashboard-service';
import type { ProjectStatusService } from '../features/project-status/project-status-service';
import type { UserSubscriptionsService } from '../features/user-subscriptions/user-subscriptions-service';
import type { UniqueConnectionService } from '../features/unique-connection/unique-connection-service';

export interface IUnleashServices {
transactionalAccessService: WithTransactional<AccessService>;
Expand Down Expand Up @@ -131,4 +132,5 @@ export interface IUnleashServices {
personalDashboardService: PersonalDashboardService;
projectStatusService: ProjectStatusService;
transactionalUserSubscriptionsService: WithTransactional<UserSubscriptionsService>;
uniqueConnectionService: UniqueConnectionService;
}
3 changes: 3 additions & 0 deletions src/lib/types/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import { IOnboardingReadModel } from '../features/onboarding/onboarding-read-mod
import { IOnboardingStore } from '../features/onboarding/onboarding-store-type';
import type { IUserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store-type';
import type { IUserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model-type';
import { IUniqueConnectionStore } from '../features/unique-connection/unique-connection-store-type';

export interface IUnleashStores {
accessStore: IAccessStore;
Expand Down Expand Up @@ -110,6 +111,7 @@ export interface IUnleashStores {
onboardingStore: IOnboardingStore;
userUnsubscribeStore: IUserUnsubscribeStore;
userSubscriptionsReadModel: IUserSubscriptionsReadModel;
uniqueConnectionStore: IUniqueConnectionStore;
}

export {
Expand Down Expand Up @@ -165,4 +167,5 @@ export {
type IProjectReadModel,
IOnboardingStore,
type IUserSubscriptionsReadModel,
IUniqueConnectionStore,
};
15 changes: 15 additions & 0 deletions src/migrations/20250109150818-unique-connections-table.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
exports.up = function(db, cb) {
db.runSql(`
CREATE TABLE IF NOT EXISTS unique_connections
(
id VARCHAR(255) NOT NULL,
updated_at TIMESTAMP DEFAULT now(),
hll BYTEA NOT NULL,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hyperlog is a buffer that we translate to BYTEA type in postgres

PRIMARY KEY (id)
);
`, cb)
};

exports.down = function(db, cb) {
db.runSql(`DROP TABLE unique_connections;`, cb);
};
1 change: 1 addition & 0 deletions src/server-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ process.nextTick(async () => {
flagOverviewRedesign: false,
granularAdminPermissions: true,
deltaApi: true,
uniqueSdkTracking: true,
},
},
authentication: {
Expand Down
2 changes: 2 additions & 0 deletions src/test/fixtures/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import { FakeOnboardingStore } from '../../lib/features/onboarding/fake-onboardi
import { createFakeOnboardingReadModel } from '../../lib/features/onboarding/createOnboardingReadModel';
import { FakeUserUnsubscribeStore } from '../../lib/features/user-subscriptions/fake-user-unsubscribe-store';
import { FakeUserSubscriptionsReadModel } from '../../lib/features/user-subscriptions/fake-user-subscriptions-read-model';
import { FakeUniqueConnectionStore } from '../../lib/features/unique-connection/fake-unique-connection-store';

const db = {
select: () => ({
Expand Down Expand Up @@ -121,6 +122,7 @@ const createStores: () => IUnleashStores = () => {
onboardingStore: new FakeOnboardingStore(),
userUnsubscribeStore: new FakeUserUnsubscribeStore(),
userSubscriptionsReadModel: new FakeUserSubscriptionsReadModel(),
uniqueConnectionStore: new FakeUniqueConnectionStore(),
};
};

Expand Down
Loading
Loading