Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/limiter #18

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions packages/core/lib/explorer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { GenericFunction } from './interfaces';
import { JOB_NAME, JOB_OPTIONS } from './queue/constants';
import { QueueMetadata } from './queue/metadata';
import { Injectable } from './foundation';
import { REFILL_INTERVAL, TOKEN_COUNT } from './limiter/constants';
import { ulid } from 'ulid';
import { Limiter } from './limiter';

@Injectable()
export class IntentExplorer {
Expand Down Expand Up @@ -98,4 +101,23 @@ export class IntentExplorer {

CommandMeta.setCommand(command, options, methodRef.bind(instance));
}

lookupLimittedMethods(
instance: Record<string, GenericFunction>,
key: string,
) {
let methodRef = instance[key];
const hasCommandMeta = Reflect.hasMetadata(TOKEN_COUNT, instance, key);

if (!hasCommandMeta) return;

const tokensCount = Reflect.getMetadata(TOKEN_COUNT, instance, key);
const frequency = Reflect.getMetadata(REFILL_INTERVAL, instance, key);
const funcKey = ulid();
Limiter.initializeToken(key + funcKey, tokensCount, frequency);
instance[key] = function (...args) {
Limiter.useToken(key + funcKey);
return methodRef.apply(instance, args);
};
}
}
2 changes: 2 additions & 0 deletions packages/core/lib/limiter/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const TOKEN_COUNT = '__TOKEN_COUNT__';
export const REFILL_INTERVAL = '__REFILL_INTERVAL__';
15 changes: 15 additions & 0 deletions packages/core/lib/limiter/decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { REFILL_INTERVAL, TOKEN_COUNT } from './constants';

export const Limit = (tokens: number, seconds: number) => {
console.log('first(): factory evaluated');
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor,
) {
Reflect.defineMetadata(TOKEN_COUNT, tokens, target, propertyKey);
Reflect.defineMetadata(REFILL_INTERVAL, seconds, target, propertyKey);

return descriptor;
};
};
2 changes: 2 additions & 0 deletions packages/core/lib/limiter/drivers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './memory';
export * from './redis';
75 changes: 75 additions & 0 deletions packages/core/lib/limiter/drivers/memory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { LimiterDriver } from '../interfaces/limiterDriver';

export class MemoryDriver implements LimiterDriver {
private static keyCounts = {};
private static keyScores = {};
private options = { prefix: 'in-memory-keys' };

async setCounter(
key: string,
value: number,
ttlInSec?: number,
): Promise<any> {
await this.set(key, value, ttlInSec);
}

async incrementCounter(key: string): Promise<void> {
MemoryDriver.keyCounts[this.storeKey(key)] += 1;
}

async decrementCounter(key: string): Promise<boolean> {
if ([undefined, 0].includes(await this.getCount(key))) {
return false;
}
MemoryDriver.keyCounts[this.storeKey(key)] -= 1;
return true;
}

async delScoresLessThan(key: string, val: number): Promise<void> {
let ele = MemoryDriver.keyScores[this.storeKey(key)]?.[0];
if (!ele) return;
while (ele && ele < val) {
delete MemoryDriver.keyScores[this.storeKey(key)]?.[0];
ele = MemoryDriver.keyScores[this.storeKey(key)]?.[0];
}
}

async addNewScore(key: string, val: number) {
if (!MemoryDriver.keyScores[this.storeKey(key)]) {
MemoryDriver.keyScores[this.storeKey(key)] = [];
}
return MemoryDriver.keyScores[this.storeKey(key)].append(val);
}

async getScoresCount(key: string): Promise<number> {
return MemoryDriver.keyScores[this.storeKey(key)]?.length ?? 0;
}
async getCount(key: string): Promise<number> {
return MemoryDriver.keyCounts[this.storeKey(key)];
}

private async get(key: string): Promise<any> {
const value = MemoryDriver.keyCounts[this.storeKey(key)];
if (!value) return null;
return value;
}

private async set(
key: string,
value: number,
ttlInSec?: number,
): Promise<boolean> {
const redisKey = this.storeKey(key);
MemoryDriver.keyCounts[redisKey] = value;
if (ttlInSec) {
setTimeout(() => {
delete MemoryDriver.keyCounts[redisKey];
}, ttlInSec * 1000);
}
return true;
}

private storeKey(key: string): string {
return `${this.options.prefix}:::${key}`;
}
}
93 changes: 93 additions & 0 deletions packages/core/lib/limiter/drivers/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { RedisDriverOption } from '../../cache';
import { Package } from '../../utils';
import { LimiterDriver } from '../interfaces/limiterDriver';

export class RedisDriver implements LimiterDriver {
private client: any;

constructor(private options: RedisDriverOption) {
const IORedis = Package.load('ioredis');
if (options.url) {
this.client = new IORedis(options.url, { db: options.database || 0 });
} else {
this.client = new IORedis({
host: options.host,
port: options.port,
username: options.username,
password: options.password,
db: options.database,
});
}
}

async setCounter(
key: string,
value: number,
ttlInSec?: number,
): Promise<void> {
await this.set(key, value, ttlInSec);
}

async incrementCounter(key: string): Promise<void> {
await this.client.incr(this.storeKey(key));
}

async decrementCounter(key: string): Promise<boolean> {
if ([undefined, 0].includes(+(await this.get(key)))) {
return false;
}
await this.client.decr(this.storeKey(key));
return true;
}

async delScoresLessThan(key: string, val: number): Promise<any> {
let ele = await this.client.lindex(this.storeKey(key), 0);
if (!ele) return;
while (ele && ele < val) {
await this.client.lpop(this.storeKey(key), 0);
ele = await this.client.lindex(this.storeKey(key), 0);
}
}

async addNewScore(key: string, val: number) {
await this.client.ladd(this.storeKey(key), val);
}

async getScoresCount(key: string): Promise<number> {
return await this.client.llen(this.storeKey(key));
}

async getCount(key: string): Promise<number> {
return +(await this.client.get(key));
}

private async get(key: string): Promise<any> {
const value = await this.client.get(this.storeKey(key));
if (!value) return null;
try {
return JSON.parse(value);
} catch (e) {
return value;
}
}

private async set(
key: string,
value: string | number | Record<string, any>,
ttlInSec?: number,
): Promise<boolean> {
try {
const redisKey = this.storeKey(key);
ttlInSec
? await this.client.set(redisKey, JSON.stringify(value), 'EX', ttlInSec)
: await this.client.set(redisKey, JSON.stringify(value));
return true;
} catch {
return false;
}
}

private storeKey(key: string): string {
return `${this.options.prefix}:::${key}`;
}
}
4 changes: 4 additions & 0 deletions packages/core/lib/limiter/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './rateLimiter';
export * from './strategies';
export * from './drivers';
export * from './decorator';
9 changes: 9 additions & 0 deletions packages/core/lib/limiter/interfaces/limiterDriver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export interface LimiterDriver {
setCounter(key: string, value: number, ttlInSec?: number): Promise<any>;
incrementCounter(key: string): Promise<any>;
decrementCounter(key: string): Promise<any>;
delScoresLessThan(key: string, val: number): Promise<any>;
addNewScore(key: string, val: number): void;
getScoresCount(key: string): Promise<number>;
getCount(key: string): Promise<number>;
}
33 changes: 33 additions & 0 deletions packages/core/lib/limiter/interfaces/options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { MemoryDriver, RedisDriver } from '../drivers';

export interface LimiterOptions {
isGlobal?: boolean;
driver: LimiterDriverType;
defaultTokensCount?: number;
defaultRefillIntervalInSeconds?: number;
connection?: RedisConnection;
}

export interface RedisConnection {
host: string;
port: number;
database: number;
password: string;
}

export enum LimiterDriverType {
REDIS = 'redis',
IN_MEMORY = 'in-memory',
}

export const defaultOptions = {
isGlobal: true,
driver: LimiterDriverType.IN_MEMORY,
defaultTokensCount: 40,
defaultRefillIntervalInSeconds: 10,
};

export const DriversMap = {
[LimiterDriverType.REDIS]: RedisDriver,
[LimiterDriverType.IN_MEMORY]: MemoryDriver,
};
41 changes: 41 additions & 0 deletions packages/core/lib/limiter/rateLimiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { LimiterDriver } from './interfaces/limiterDriver';
import { BaseStrategy } from './strategies/baseStrategy';
import { Injectable } from '@nestjs/common';
import { ConfigService } from '../config/service';
import {
DriversMap,
LimiterDriverType,
} from './interfaces/options';

@Injectable()
export class Limiter {
private static driver: LimiterDriver;
private static strategy: BaseStrategy;
constructor(private config: ConfigService) {
const options = this.config.get('limiter');
if(!options) return
switch (options.driver) {
case LimiterDriverType.REDIS: {
Limiter.driver = new DriversMap[LimiterDriverType.REDIS](
options.connection,
);
}
default: {
Limiter.driver = new DriversMap[LimiterDriverType.IN_MEMORY]();
}
}
Limiter.strategy = new BaseStrategy(Limiter.driver);
}

static initializeToken = (
key: string,
tokensCount: number,
intervalInSeconds: number,
) => {
Limiter.strategy.initializeToken(key, tokensCount, intervalInSeconds);
};

static useToken = (key: string) => {
Limiter.strategy.useToken(key);
};
}
37 changes: 37 additions & 0 deletions packages/core/lib/limiter/strategies/baseStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { GenericException } from '../../exceptions';
import { LimiterDriver } from '../interfaces/limiterDriver';

export class BaseStrategy {
protected static tokensQuota = {};
protected static tokensIntervals = {};

constructor(protected driver: LimiterDriver) {
this.driver = driver;
}

initializeToken = async (
key: string,
tokensCount: number,
intervalInSeconds: number,
) => {
BaseStrategy.tokensQuota[key] = tokensCount;
BaseStrategy.tokensIntervals[key] = intervalInSeconds;
await this.driver.setCounter(key, tokensCount, intervalInSeconds);
};

useToken = async (key: string) => {
if ((await this.driver.getCount(key)) == undefined) {
await this.initializeToken(
key,
BaseStrategy.tokensQuota[key] - 1,
BaseStrategy.tokensIntervals[key],
);
return;
}
const cut = await this.driver.decrementCounter(key);
console.log('current Count', await this.driver.getCount(key), cut);
if (!cut) {
throw new GenericException('Cannot be called.');
}
};
}
4 changes: 4 additions & 0 deletions packages/core/lib/limiter/strategies/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './baseStrategy';
export * from './slidingWindowCounter';
export * from './tokenBucket';
export * from './windowCounter';
35 changes: 35 additions & 0 deletions packages/core/lib/limiter/strategies/slidingWindowCounter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { GenericException } from '../../exceptions';
import { LimiterDriver } from '../interfaces/limiterDriver';
import { BaseStrategy } from './baseStrategy';

export class SlidingWindowCounter extends BaseStrategy {
constructor(driver: LimiterDriver) {
super(driver);
}

initializeToken = async (
key: string,
tokensCount: number,
intervalInSeconds: number,
) => {
SlidingWindowCounter.tokensQuota[key] = tokensCount;
SlidingWindowCounter.tokensIntervals[key] = intervalInSeconds;
await this.driver.setCounter(key, tokensCount);
};

useToken = async (key: string) => {
const current = new Date().valueOf();
await this.driver.delScoresLessThan(key, current);

if (
(await this.driver.getScoresCount(key)) >= BaseStrategy.tokensQuota[key]
) {
throw new GenericException('Cannot be called.');
}
await this.driver.addNewScore(key, current);
};

setTimer = () => {
// this.setTokens();
};
}
Loading
Loading