Skip to content

Commit

Permalink
Merge pull request #42 from TNG/schedule-job-with-delay
Browse files Browse the repository at this point in the history
feat: allow scheduling immediate jobs with delay
  • Loading branch information
NiklasEi authored Sep 29, 2021
2 parents 0801ac4 + 5766bab commit 2bfc163
Show file tree
Hide file tree
Showing 22 changed files with 215 additions and 121 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:

strategy:
matrix:
node: [10, 12, 14, 16]
node: [12, 14, 16]

steps:
- name: Checkout the repository
Expand Down Expand Up @@ -42,7 +42,7 @@ jobs:

strategy:
matrix:
node: [10, 12, 14, 16]
node: [12, 14, 16]

steps:
- name: Checkout the repository
Expand Down
149 changes: 101 additions & 48 deletions README.md

Large diffs are not rendered by default.

38 changes: 25 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"human-interval": "2.0.1",
"lodash": "4.17.21",
"luxon": "2.0.2",
"mongodb": "4.1.0",
"mongodb": "4.1.1",
"typed-emitter": "1.3.1",
"uuid": "8.3.2"
},
Expand Down
7 changes: 4 additions & 3 deletions src/job/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@ export type MomoJobStatus = WithoutId<JobEntity>;
export interface JobDefinition {
name: string;
interval: string;
firstRunAfter?: number;
concurrency: number;
maxRunning: number;
}

export interface Job extends JobDefinition {
immediate: boolean;
handler: Handler;
}

export function toJob(job: MomoJob): Job {
return { immediate: false, concurrency: 1, maxRunning: 0, ...job };
return { concurrency: 1, maxRunning: 0, ...job };
}

export function toJobDefinition(job: Job): JobDefinition {
export function toJobDefinition<T extends JobDefinition>(job: T): JobDefinition {
return {
name: job.name,
interval: job.interval,
firstRunAfter: job.firstRunAfter,
maxRunning: job.maxRunning,
concurrency: job.concurrency,
};
Expand Down
2 changes: 1 addition & 1 deletion src/job/MomoJob.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
export type Handler = () => Promise<string | undefined | void> | string | undefined | void;

export interface MomoJob {
immediate?: boolean;
handler: Handler;
name: string;
interval: string;
firstRunAfter?: number;
concurrency?: number;
maxRunning?: number;
}
4 changes: 2 additions & 2 deletions src/job/MomoJobBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ export class MomoJobBuilder {
return this;
}

withImmediate(immediate: boolean): this {
this.momoJob.immediate = immediate;
withFirstRunAfter(firstRunAfter: number): this {
this.momoJob.firstRunAfter = firstRunAfter;
return this;
}

Expand Down
12 changes: 11 additions & 1 deletion src/job/validate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import { Logger } from '../logging/Logger';
import { MomoErrorType } from '../logging/error/MomoErrorType';
import { momoError } from '../logging/error/MomoError';

export function validate({ name, interval, concurrency, maxRunning }: Job, logger?: Logger): boolean {
export function validate({ name, interval, firstRunAfter, concurrency, maxRunning }: Job, logger?: Logger): boolean {
if (firstRunAfter !== undefined && firstRunAfter < 0) {
logger?.error(
'job cannot be defined',
MomoErrorType.defineJob,
{ name, firstRunAfter },
momoError.invalidFirstRunAfter
);
return false;
}

if (maxRunning < 0) {
logger?.error('job cannot be defined', MomoErrorType.defineJob, { name, maxRunning }, momoError.invalidMaxRunning);
return false;
Expand Down
1 change: 1 addition & 0 deletions src/logging/error/MomoError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export const momoError = {
invalidConcurrency: new Error('concurrency must be at least 1'),
invalidMaxRunning: new Error('maxRunning must be at least 0'),
jobNotFound: new Error('job not found in database'),
invalidFirstRunAfter: new Error('firstRunAfter must be at least 0'),
};
10 changes: 1 addition & 9 deletions src/repository/JobRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,7 @@ export class JobRepository extends Repository<JobEntity> {
async list(): Promise<MomoJobStatus[]> {
const jobs = await this.find();

return jobs.map((job) => {
return {
name: job.name,
interval: job.interval,
concurrency: job.concurrency,
maxRunning: job.maxRunning,
executionInfo: job.executionInfo,
};
});
return jobs.map((job) => ({ ...toJobDefinition(job), executionInfo: job.executionInfo }));
}

async updateJob(name: string, update: Partial<JobEntity>): Promise<void> {
Expand Down
14 changes: 12 additions & 2 deletions src/repository/Repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ export class Repository<ENTITY extends { _id?: ObjectId }> {
}

async find(filter: Filter<ENTITY> = {}): Promise<ENTITY[]> {
return this.collection.find(filter).toArray();
const entities = await this.collection.find(filter).toArray();
return entities.map(this.mapNullToUndefined);
}

async findOne(filter: Filter<ENTITY> = {}): Promise<ENTITY | undefined> {
return this.collection.findOne(filter);
const entity = await this.collection.findOne(filter);
return entity === null ? undefined : this.mapNullToUndefined(entity);
}

async delete(filter: Filter<ENTITY> = {}): Promise<number> {
Expand All @@ -33,4 +35,12 @@ export class Repository<ENTITY extends { _id?: ObjectId }> {
async deleteOne(filter: Filter<ENTITY>): Promise<void> {
await this.collection.deleteOne(filter);
}

// mongodb returns null instead of undefined for optional fields
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private mapNullToUndefined(entity: any): ENTITY {
const keys = Object.keys(entity);
const entries = keys.map((key) => [key, entity[key] ?? undefined]);
return Object.fromEntries(entries) as ENTITY;
}
}
1 change: 1 addition & 0 deletions src/schedule/Schedule.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { sum } from 'lodash';

import { ExecutionInfo, ExecutionStatus, JobResult } from '../job/ExecutionInfo';
import { ExecutionsRepository } from '../repository/ExecutionsRepository';
import { JobRepository } from '../repository/JobRepository';
Expand Down
5 changes: 2 additions & 3 deletions src/scheduler/JobScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export class JobScheduler {

constructor(
private readonly jobName: string,
private readonly immediate: boolean,
private readonly jobExecutor: JobExecutor,
private readonly scheduleId: string,
private readonly executionsRepository: ExecutionsRepository,
Expand All @@ -37,7 +36,7 @@ export class JobScheduler {
jobRepository: JobRepository
): JobScheduler {
const executor = new JobExecutor(job.handler, scheduleId, executionsRepository, jobRepository, logger);
return new JobScheduler(job.name, job.immediate, executor, scheduleId, executionsRepository, jobRepository, logger);
return new JobScheduler(job.name, executor, scheduleId, executionsRepository, jobRepository, logger);
}

getUnexpectedErrorCount(): number {
Expand Down Expand Up @@ -87,7 +86,7 @@ export class JobScheduler {

this.interval = jobEntity.interval;

const delay = calculateDelay(interval, this.immediate, jobEntity);
const delay = calculateDelay(interval, jobEntity);

this.jobHandle = setIntervalWithDelay(this.executeConcurrently.bind(this), interval, delay);

Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/calculateDelay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { max } from 'lodash';

import { JobEntity } from '../repository/JobEntity';

export function calculateDelay(millisecondsInterval: number, immediate: boolean, job: JobEntity): number {
export function calculateDelay(millisecondsInterval: number, job: JobEntity): number {
const nextStart = calculateNextStart(millisecondsInterval, job);
if (nextStart === undefined) {
return immediate ? 0 : millisecondsInterval;
return job.firstRunAfter ?? millisecondsInterval;
}

return max([nextStart - DateTime.now().toMillis(), 0]) ?? 0;
Expand Down
1 change: 0 additions & 1 deletion test/executor/JobExecutor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ describe('JobExecutor', () => {
const job: Job = {
name: 'test',
interval: '1 minute',
immediate: false,
concurrency: 1,
maxRunning: 0,
handler,
Expand Down
1 change: 0 additions & 1 deletion test/job/Job.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ describe('fromMomoJob', () => {
const job = { name: 'test', interval: '1 second', handler: () => undefined };
expect(toJob(job)).toMatchObject({
...job,
immediate: false,
concurrency: 1,
maxRunning: 0,
});
Expand Down
6 changes: 3 additions & 3 deletions test/job/MomoJobBuilder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ describe('MomoJobBuilder', () => {
const momoJob = new MomoJobBuilder()
.withName('name')
.withInterval('one minute')
.withImmediate(true)
.withFirstRunAfter(0)
.withConcurrency(1)
.withMaxRunning(1)
.withHandler(jest.fn())
.build();

expect(momoJob.name).toEqual('name');
expect(momoJob.interval).toEqual('one minute');
expect(momoJob.immediate).toEqual(true);
expect(momoJob.firstRunAfter).toEqual(0);
expect(momoJob.concurrency).toEqual(1);
expect(momoJob.maxRunning).toEqual(1);
expect(momoJob.handler.toString()).toEqual(jest.fn().toString());
Expand All @@ -24,7 +24,7 @@ describe('MomoJobBuilder', () => {

expect(momoJob.name).toEqual('name');
expect(momoJob.interval).toEqual('one minute');
expect(momoJob.immediate).toBeUndefined();
expect(momoJob.firstRunAfter).toBeUndefined();
expect(momoJob.concurrency).toBeUndefined();
expect(momoJob.maxRunning).toBeUndefined();
expect(momoJob.handler.toString()).toEqual(jest.fn().toString());
Expand Down
13 changes: 13 additions & 0 deletions test/job/validate.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ describe('validate', () => {
);
});

it('reports error when firstRunAfter is invalid', async () => {
const job: Job = toJob({ name: 'test', interval: '1 minute', handler: () => 'finished', firstRunAfter: -1 });
expect(validate(job, logger)).toBe(false);

expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
'job cannot be defined',
MomoErrorType.defineJob,
{ name: job.name, firstRunAfter: -1 },
momoError.invalidFirstRunAfter
);
});

it('reports error when maxRunning is invalid', async () => {
const job: Job = toJob({ name: 'test', interval: '1 minute', handler: () => 'finished', maxRunning: -1 });
expect(validate(job, logger)).toBe(false);
Expand Down
10 changes: 6 additions & 4 deletions test/repository/JobRepository.integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,19 @@ describe('JobRepository', () => {

describe('list', () => {
it('returns jobs', async () => {
const job1 = {
const job1: JobEntity = {
name: 'job1',
interval: '1 minute',
firstRunAfter: 0,
executionInfo: {} as ExecutionInfo,
running: 2,
concurrency: 1,
maxRunning: 3,
};
const job2 = {
const job2: JobEntity = {
name: 'job2',
interval: '2 minutes',
firstRunAfter: 0,
executionInfo: {} as ExecutionInfo,
running: 0,
concurrency: 1,
maxRunning: 0,
};
Expand All @@ -117,13 +117,15 @@ describe('JobRepository', () => {
{
name: job1.name,
interval: job1.interval,
firstRunAfter: job1.firstRunAfter,
concurrency: job1.concurrency,
maxRunning: job1.maxRunning,
executionInfo: {},
},
{
name: job2.name,
interval: job2.interval,
firstRunAfter: job1.firstRunAfter,
concurrency: job2.concurrency,
maxRunning: job2.maxRunning,
executionInfo: {},
Expand Down
2 changes: 1 addition & 1 deletion test/schedule/momo.integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ describe('Momo', () => {
});

it('executes an immediate job periodically', async () => {
await mongoSchedule.define({ ...momoJob, immediate: true });
await mongoSchedule.define({ ...momoJob, firstRunAfter: 0 });

await mongoSchedule.start();

Expand Down
Loading

0 comments on commit 2bfc163

Please sign in to comment.