From 05a09f8907eb303be592c145e1a152ba7cf0d714 Mon Sep 17 00:00:00 2001 From: Ute Weiss Date: Fri, 18 Aug 2023 13:13:14 +0200 Subject: [PATCH 1/8] fix: prevent spamming mongo errors when checking active schedule * remove unused method in MongoSchedule Signed-off-by: Ute Weiss --- src/repository/SchedulesRepository.ts | 35 ++++++++++++--------------- src/schedule/MongoSchedule.ts | 15 +----------- src/schedule/SchedulePing.ts | 8 +++++- 3 files changed, 23 insertions(+), 35 deletions(-) diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index 88d4610e..2791959c 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -25,14 +25,12 @@ export class SchedulesRepository extends Repository { this.logger = logger; } - /** - * Checks if there is an alive active schedule in the database for the given name. - * - * If there is one -> return true if it is this schedule, otherwise false. - * If there is no such schedule -> we try inserting this schedule as the active one. - * ↳ If it worked return true, otherwise false. - */ - async isActiveSchedule(name: string): Promise { + async getActiveSchedule(name: string): Promise { + const activeSchedule = await this.collection.findOne({ name }); + return activeSchedule?.scheduleId; + } + + async setActiveSchedule(name: string): Promise { const lastAlive = DateTime.now().toMillis(); const threshold = lastAlive - this.deadScheduleThreshold; try { @@ -52,19 +50,16 @@ export class SchedulesRepository extends Repository { }, ); - return result.value === null ? false : result.value.scheduleId === this.scheduleId; + return result.value !== null && result.value.scheduleId === this.scheduleId; } catch (error) { - // We seem to have a schedule that's alive. The unique name index probably prevented the upsert. Is this one the active schedule? - const aliveSchedule = await this.collection.findOne({ name }); - if (aliveSchedule === null) { - this.logger?.error( - 'The database reported an unexpected error', - MomoErrorType.internal, - { scheduleId: this.scheduleId }, - error, - ); - } - return aliveSchedule?.scheduleId === this.scheduleId; + // We seem to have a schedule that's alive. The unique name index probably prevented the upsert. + this.logger?.error( + 'The database reported an unexpected error', + MomoErrorType.internal, + { scheduleId: this.scheduleId }, + error, + ); + return false; } } diff --git a/src/schedule/MongoSchedule.ts b/src/schedule/MongoSchedule.ts index d4b6f42c..c149337b 100644 --- a/src/schedule/MongoSchedule.ts +++ b/src/schedule/MongoSchedule.ts @@ -27,7 +27,7 @@ export class MongoSchedule extends Schedule { protected readonly scheduleId: string, protected readonly connection: Connection, pingIntervalMs: number, - private readonly scheduleName: string, + scheduleName: string, ) { const schedulesRepository = connection.getSchedulesRepository(); const jobRepository = connection.getJobRepository(); @@ -87,19 +87,6 @@ export class MongoSchedule extends Schedule { return this.schedulePing.start(); } - /** - * Returns whether this schedule is currently active. - * - * Only the active schedule will schedule jobs and try to execute them. - * There is always only one active schedule per mongo database. - * - * @throws if the database throws - */ - public async isActiveSchedule(): Promise { - const schedulesRepository = this.connection.getSchedulesRepository(); - return schedulesRepository.isActiveSchedule(this.scheduleName); - } - private async startAllJobs(): Promise { await Promise.all(Object.values(this.getJobSchedulers()).map(async (jobScheduler) => jobScheduler.start())); } diff --git a/src/schedule/SchedulePing.ts b/src/schedule/SchedulePing.ts index 1a0ac793..2ede63f3 100644 --- a/src/schedule/SchedulePing.ts +++ b/src/schedule/SchedulePing.ts @@ -36,8 +36,14 @@ export class SchedulePing { } private async checkActiveSchedule(): Promise { - const active = await this.schedulesRepository.isActiveSchedule(this.scheduleName); + const activeSchedule = await this.schedulesRepository.getActiveSchedule(this.scheduleName); + const active = + activeSchedule === undefined + ? await this.schedulesRepository.setActiveSchedule(this.scheduleName) + : activeSchedule === this.scheduleId; + this.logger.debug(`This schedule is ${active ? '' : 'not '}active`); + if (active) { await this.schedulesRepository.ping(this.scheduleId); if (this.startJobsStatus === StartJobsStatus.notStarted) { From 0d888a71be0a2fc40b0110e1ca142bf352152d5b Mon Sep 17 00:00:00 2001 From: Yani Kolev Date: Fri, 18 Aug 2023 16:23:21 +0300 Subject: [PATCH 2/8] chore: WIP Signed-off-by: Yani Kolev --- src/repository/SchedulesRepository.ts | 17 +- src/schedule/SchedulePing.ts | 2 + .../SchedulesRepository.integration.spec.ts | 416 +++++++++--------- 3 files changed, 221 insertions(+), 214 deletions(-) diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index 2791959c..629d1979 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -26,31 +26,32 @@ export class SchedulesRepository extends Repository { } async getActiveSchedule(name: string): Promise { - const activeSchedule = await this.collection.findOne({ name }); + const threshold = DateTime.now().toMillis() - this.deadScheduleThreshold; + const activeSchedule = await this.collection.findOne({ name, lastAlive: { $gte: threshold } }); + const foo = await this.collection.find({}).toArray(); + // eslint-disable-next-line no-console + console.log(foo); return activeSchedule?.scheduleId; } async setActiveSchedule(name: string): Promise { - const lastAlive = DateTime.now().toMillis(); - const threshold = lastAlive - this.deadScheduleThreshold; try { - const result = await this.collection.findOneAndUpdate( - { lastAlive: { $lt: threshold }, name }, + await this.collection.updateOne( + { name }, { $set: { name, scheduleId: this.scheduleId, - lastAlive, + lastAlive: DateTime.now().toMillis(), executions: {}, }, }, { upsert: true, - returnDocument: 'after', }, ); - return result.value !== null && result.value.scheduleId === this.scheduleId; + return true; } catch (error) { // We seem to have a schedule that's alive. The unique name index probably prevented the upsert. this.logger?.error( diff --git a/src/schedule/SchedulePing.ts b/src/schedule/SchedulePing.ts index 2ede63f3..9cfb13d5 100644 --- a/src/schedule/SchedulePing.ts +++ b/src/schedule/SchedulePing.ts @@ -1,3 +1,5 @@ +import { DateTime } from 'luxon'; + import { SchedulesRepository } from '../repository/SchedulesRepository'; import { Logger } from '../logging/Logger'; import { setSafeInterval } from '../timeout/safeTimeouts'; diff --git a/test/repository/SchedulesRepository.integration.spec.ts b/test/repository/SchedulesRepository.integration.spec.ts index d653be4a..92a0ebbb 100644 --- a/test/repository/SchedulesRepository.integration.spec.ts +++ b/test/repository/SchedulesRepository.integration.spec.ts @@ -2,13 +2,12 @@ import { MongoMemoryServer } from 'mongodb-memory-server'; import { Connection } from '../../src/Connection'; import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; -import { sleep } from '../utils/sleep'; describe('SchedulesRepository', () => { const scheduleName = 'schedule'; const scheduleId = '123'; const pingInterval = 500; - const name = 'test job'; + // const name = 'test job'; let mongo: MongoMemoryServer; let connection: Connection; @@ -29,216 +28,221 @@ describe('SchedulesRepository', () => { }); describe('isActiveSchedule', () => { - it('single schedule is active', async () => { - const active = await schedulesRepository.isActiveSchedule(scheduleName); - - const entities = await schedulesRepository.find({}); - - expect(active).toEqual(true); - expect(entities).toHaveLength(1); - expect(entities[0]?.scheduleId).toEqual(scheduleId); - }); - - it('only one schedule is active', async () => { - const firstIsActive = await schedulesRepository.isActiveSchedule(scheduleName); - - const inactiveScheduleId = 'not active'; - const inactiveConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - inactiveScheduleId, - scheduleName, - ); - const inactiveSchedulesRepository = inactiveConnection.getSchedulesRepository(); - const secondIsActive = await inactiveSchedulesRepository.isActiveSchedule(scheduleName); - await inactiveConnection.disconnect(); - - const entities = await schedulesRepository.find({}); - - expect(firstIsActive).toEqual(true); - expect(secondIsActive).toEqual(false); - expect(entities).toHaveLength(1); - expect(entities[0]?.scheduleId).toEqual(scheduleId); - }); - - it('only one schedule of many concurrent ones is active', async () => { - const connections = await Promise.all( - ['a', 'b', 'c', 'd', 'e'].map(async (id) => - Connection.create({ url: mongo.getUri() }, pingInterval, id, scheduleName), - ), - ); - - const schedulesActiveStatus = await Promise.all( - connections.map(async (connection) => { - const newSchedulesRepository = connection.getSchedulesRepository(); - return newSchedulesRepository.isActiveSchedule(scheduleName); - }), - ); - - const entities = await schedulesRepository.find({}); - await Promise.all(connections.map(async (connection) => connection.disconnect())); - - expect(schedulesActiveStatus.filter((active) => active)).toHaveLength(1); - expect(entities).toHaveLength(1); - }); - - it('should replace dead schedules', async () => { - const active = await schedulesRepository.isActiveSchedule(scheduleName); - const secondScheduleId = 'not active'; - const secondConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - secondScheduleId, - scheduleName, - ); - const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); - const secondActive = await secondSchedulesRepository.isActiveSchedule(scheduleName); - - expect(active).toEqual(true); - expect(secondActive).toEqual(false); - - await sleep(1200); - - const secondTakeOver = await secondSchedulesRepository.isActiveSchedule(scheduleName); - await secondConnection.disconnect(); - expect(secondTakeOver).toEqual(true); - }); - - it('should allow two active schedules with different names', async () => { - const isActive = await schedulesRepository.isActiveSchedule(scheduleName); - const otherScheduleName = 'other schedule'; - const secondScheduleId = 'first other schedule ID'; - const thirdScheduleId = 'second other schedule ID'; - const secondConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - secondScheduleId, - otherScheduleName, - ); - const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); - const isSecondActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); - const thirdConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - thirdScheduleId, - otherScheduleName, - ); - const thirdSchedulesRepository = await thirdConnection.getSchedulesRepository(); - const isThirdActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); - - expect(isActive).toEqual(true); - expect(isSecondActive).toEqual(true); - expect(isThirdActive).toEqual(false); - - await sleep(2 * pingInterval + 100); - - const isFirstStillActive = await schedulesRepository.isActiveSchedule(scheduleName); - const didThirdTakeOverActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); - expect(isFirstStillActive).toEqual(true); - expect(didThirdTakeOverActive).toEqual(true); - const isSecondStillActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); - expect(isSecondStillActive).toEqual(false); + it('sets single schedule as active', async () => { + await schedulesRepository.setActiveSchedule(scheduleName); + const schedule = await schedulesRepository.findOne({ name: scheduleName }); - await secondConnection.disconnect(); - await thirdConnection.disconnect(); + expect(schedule!.scheduleId).toEqual(scheduleId); }); - }); - - describe('with schedule', () => { - beforeEach(async () => { - await schedulesRepository.isActiveSchedule(scheduleName); - }); - - describe('removeJob', () => { - it('can remove a job', async () => { - await schedulesRepository.addExecution(name, 0); - - const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); - - await schedulesRepository.removeJob(scheduleId, name); - const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntity2?.executions).toEqual({}); - }); - - it('keeps other job', async () => { - const otherName = 'other job'; - await schedulesRepository.addExecution(name, 0); - await schedulesRepository.addExecution(otherName, 0); - - const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntity?.executions).toEqual({ [name]: 1, [otherName]: 1 }); - - await schedulesRepository.removeJob(scheduleId, name); - const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntity2?.executions).toEqual({ [otherName]: 1 }); - }); - }); - - describe('execution', () => { - it('can add and remove an execution', async () => { - const { added, running } = await schedulesRepository.addExecution(name, 1); - expect(added).toBe(true); - expect(running).toBe(1); - - const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); - await schedulesRepository.removeExecution(name); - - const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntity2?.executions).toEqual({ [name]: 0 }); - }); - - it('cannot add an execution when maxRunning is reached', async () => { - await schedulesRepository.addExecution(name, 1); - const { added, running } = await schedulesRepository.addExecution(name, 1); - expect(added).toBe(false); - expect(running).toBe(1); - }); - - it('can add an execution with maxRunning set to 0', async () => { - const executionPing = await schedulesRepository.addExecution(name, 0); - expect(executionPing).toBeDefined(); - }); - - it('does not add executions in schedule that is not active', async () => { - const otherScheduleId = 'other schedule'; - const otherConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - otherScheduleId, - scheduleName, - ); - const otherSchedulesRepository = otherConnection.getSchedulesRepository(); - - await otherSchedulesRepository.addExecution(name, 2); - - const running = await otherSchedulesRepository.countRunningExecutions(name); - await otherConnection.disconnect(); - expect(running).toBe(0); - }); - }); + it('single schedule is active', async () => { + await schedulesRepository.setActiveSchedule(scheduleName); - describe('countRunningExecutions', () => { - it('returns number of executions', async () => { - await schedulesRepository.addExecution(name, 2); - await schedulesRepository.addExecution(name, 2); + const active = await schedulesRepository.getActiveSchedule(scheduleName); - const running = await schedulesRepository.countRunningExecutions(name); - expect(running).toBe(2); - }); + expect(active).toEqual(scheduleId); }); - describe('ping', () => { - it('updates timestamp', async () => { - const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - - await schedulesRepository.ping(); - - const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); - }); - }); + // it('only one schedule is active', async () => { + // const firstIsActive = await schedulesRepository.isActiveSchedule(scheduleName); + // + // const inactiveScheduleId = 'not active'; + // const inactiveConnection = await Connection.create( + // { url: mongo.getUri() }, + // pingInterval, + // inactiveScheduleId, + // scheduleName, + // ); + // const inactiveSchedulesRepository = inactiveConnection.getSchedulesRepository(); + // const secondIsActive = await inactiveSchedulesRepository.isActiveSchedule(scheduleName); + // await inactiveConnection.disconnect(); + // + // const entities = await schedulesRepository.find({}); + // + // expect(firstIsActive).toEqual(true); + // expect(secondIsActive).toEqual(false); + // expect(entities).toHaveLength(1); + // expect(entities[0]?.scheduleId).toEqual(scheduleId); + // }); + // + // it('only one schedule of many concurrent ones is active', async () => { + // const connections = await Promise.all( + // ['a', 'b', 'c', 'd', 'e'].map(async (id) => + // Connection.create({ url: mongo.getUri() }, pingInterval, id, scheduleName), + // ), + // ); + // + // const schedulesActiveStatus = await Promise.all( + // connections.map(async (connection) => { + // const newSchedulesRepository = connection.getSchedulesRepository(); + // return newSchedulesRepository.isActiveSchedule(scheduleName); + // }), + // ); + // + // const entities = await schedulesRepository.find({}); + // await Promise.all(connections.map(async (connection) => connection.disconnect())); + // + // expect(schedulesActiveStatus.filter((active) => active)).toHaveLength(1); + // expect(entities).toHaveLength(1); + // }); + // + // it('should replace dead schedules', async () => { + // const active = await schedulesRepository.isActiveSchedule(scheduleName); + // const secondScheduleId = 'not active'; + // const secondConnection = await Connection.create( + // { url: mongo.getUri() }, + // pingInterval, + // secondScheduleId, + // scheduleName, + // ); + // const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); + // const secondActive = await secondSchedulesRepository.isActiveSchedule(scheduleName); + // + // expect(active).toEqual(true); + // expect(secondActive).toEqual(false); + // + // await sleep(1200); + // + // const secondTakeOver = await secondSchedulesRepository.isActiveSchedule(scheduleName); + // await secondConnection.disconnect(); + // expect(secondTakeOver).toEqual(true); + // }); + // + // it('should allow two active schedules with different names', async () => { + // const isActive = await schedulesRepository.isActiveSchedule(scheduleName); + // const otherScheduleName = 'other schedule'; + // const secondScheduleId = 'first other schedule ID'; + // const thirdScheduleId = 'second other schedule ID'; + // const secondConnection = await Connection.create( + // { url: mongo.getUri() }, + // pingInterval, + // secondScheduleId, + // otherScheduleName, + // ); + // const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); + // const isSecondActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); + // const thirdConnection = await Connection.create( + // { url: mongo.getUri() }, + // pingInterval, + // thirdScheduleId, + // otherScheduleName, + // ); + // const thirdSchedulesRepository = await thirdConnection.getSchedulesRepository(); + // const isThirdActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); + // + // expect(isActive).toEqual(true); + // expect(isSecondActive).toEqual(true); + // expect(isThirdActive).toEqual(false); + // + // await sleep(2 * pingInterval + 100); + // + // const isFirstStillActive = await schedulesRepository.isActiveSchedule(scheduleName); + // const didThirdTakeOverActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); + // expect(isFirstStillActive).toEqual(true); + // expect(didThirdTakeOverActive).toEqual(true); + // const isSecondStillActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); + // expect(isSecondStillActive).toEqual(false); + // + // await secondConnection.disconnect(); + // await thirdConnection.disconnect(); + // }); + // }); + // + // describe('with schedule', () => { + // beforeEach(async () => { + // await schedulesRepository.isActiveSchedule(scheduleName); + // }); + // + // describe('removeJob', () => { + // it('can remove a job', async () => { + // await schedulesRepository.addExecution(name, 0); + // + // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + // expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); + // + // await schedulesRepository.removeJob(scheduleId, name); + // const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); + // expect(schedulesEntity2?.executions).toEqual({}); + // }); + // + // it('keeps other job', async () => { + // const otherName = 'other job'; + // await schedulesRepository.addExecution(name, 0); + // await schedulesRepository.addExecution(otherName, 0); + // + // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + // expect(schedulesEntity?.executions).toEqual({ [name]: 1, [otherName]: 1 }); + // + // await schedulesRepository.removeJob(scheduleId, name); + // const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); + // expect(schedulesEntity2?.executions).toEqual({ [otherName]: 1 }); + // }); + // }); + // + // describe('execution', () => { + // it('can add and remove an execution', async () => { + // const { added, running } = await schedulesRepository.addExecution(name, 1); + // expect(added).toBe(true); + // expect(running).toBe(1); + // + // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + // expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); + // + // await schedulesRepository.removeExecution(name); + // + // const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); + // expect(schedulesEntity2?.executions).toEqual({ [name]: 0 }); + // }); + // + // it('cannot add an execution when maxRunning is reached', async () => { + // await schedulesRepository.addExecution(name, 1); + // const { added, running } = await schedulesRepository.addExecution(name, 1); + // expect(added).toBe(false); + // expect(running).toBe(1); + // }); + // + // it('can add an execution with maxRunning set to 0', async () => { + // const executionPing = await schedulesRepository.addExecution(name, 0); + // expect(executionPing).toBeDefined(); + // }); + // + // it('does not add executions in schedule that is not active', async () => { + // const otherScheduleId = 'other schedule'; + // const otherConnection = await Connection.create( + // { url: mongo.getUri() }, + // pingInterval, + // otherScheduleId, + // scheduleName, + // ); + // const otherSchedulesRepository = otherConnection.getSchedulesRepository(); + // + // await otherSchedulesRepository.addExecution(name, 2); + // + // const running = await otherSchedulesRepository.countRunningExecutions(name); + // await otherConnection.disconnect(); + // expect(running).toBe(0); + // }); + // }); + // + // describe('countRunningExecutions', () => { + // it('returns number of executions', async () => { + // await schedulesRepository.addExecution(name, 2); + // await schedulesRepository.addExecution(name, 2); + // + // const running = await schedulesRepository.countRunningExecutions(name); + // expect(running).toBe(2); + // }); + // }); + // + // describe('ping', () => { + // it('updates timestamp', async () => { + // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + // + // await schedulesRepository.ping(); + // + // const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); + // expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); + // }); + // }); }); }); From ff5f2799a8de2164cdf00f378bb0468e4c01ffdb Mon Sep 17 00:00:00 2001 From: Ute Weiss Date: Fri, 18 Aug 2023 17:42:24 +0200 Subject: [PATCH 3/8] fix: ensure only one instance is active for a schedule at a time, without spamming mongo errors * clean up SchedulesRepository methods to not take scheduleName or scheduleId as parameters (these are known in repo anyway) * introduce ScheduleState for clean implementation of the fact whether this instance, another instance or no instance is active for a schedule Signed-off-by: Yani Kolev Signed-off-by: Ute Weiss --- src/repository/SchedulesRepository.ts | 83 ++-- src/schedule/MongoSchedule.ts | 4 +- src/schedule/Schedule.ts | 8 +- src/schedule/SchedulePing.ts | 14 +- src/scheduler/JobScheduler.ts | 6 +- .../SchedulesRepository.integration.spec.ts | 444 +++++++++--------- test/schedule/MongoSchedule.spec.ts | 10 +- test/schedule/Schedule.spec.ts | 6 +- test/schedule/SchedulePing.spec.ts | 32 +- test/scheduler/JobScheduler.spec.ts | 2 - 10 files changed, 324 insertions(+), 285 deletions(-) diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index 629d1979..86d4b3dd 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -8,6 +8,12 @@ import { MomoErrorType } from '../logging/error/MomoErrorType'; export const SCHEDULES_COLLECTION_NAME = 'schedules'; +export enum ScheduleState { + INACTIVE, + DIFFERENT_INSTANCE_ACTIVE, + THIS_INSTANCE_ACTIVE, +} + export class SchedulesRepository extends Repository { private logger: Logger | undefined; @@ -25,22 +31,45 @@ export class SchedulesRepository extends Repository { this.logger = logger; } - async getActiveSchedule(name: string): Promise { - const threshold = DateTime.now().toMillis() - this.deadScheduleThreshold; - const activeSchedule = await this.collection.findOne({ name, lastAlive: { $gte: threshold } }); - const foo = await this.collection.find({}).toArray(); - // eslint-disable-next-line no-console - console.log(foo); - return activeSchedule?.scheduleId; + /** + * Checks the state of the schedule represented by this repository. + * + * INACTIVE: There is currently no active instance for a schedule with this name. + * DIFFERENT_INSTANCE_ACTIVE: Another instance (but not this one) is active for the schedule with this name. + * THIS_INSTANCE_ACTIVE: This instance is active for the schedule with this name. + * + * @param now timestamp in milliseconds + * @returns the schedule's state + */ + async getScheduleState(now: number): Promise { + const threshold = now - this.deadScheduleThreshold; + const activeSchedule = await this.collection.findOne({ name: this.name }); + if (activeSchedule === null) { + return ScheduleState.INACTIVE; + } + + if (activeSchedule.scheduleId !== this.scheduleId) { + return activeSchedule.lastAlive >= threshold ? ScheduleState.DIFFERENT_INSTANCE_ACTIVE : ScheduleState.INACTIVE; + } + + return ScheduleState.THIS_INSTANCE_ACTIVE; } - async setActiveSchedule(name: string): Promise { + /** + * Tries to set this instance as active + * + * @param now timestamp in milliseconds + * @returns true if this instance is now active for the schedule with this name, false otherwise + */ + async setActiveSchedule(now: number): Promise { + const threshold = now - this.deadScheduleThreshold; + try { await this.collection.updateOne( - { name }, + { name: this.name, lastAlive: { $lt: threshold } }, // overwrite a dead (too old) schedule { $set: { - name, + name: this.name, scheduleId: this.scheduleId, lastAlive: DateTime.now().toMillis(), executions: {}, @@ -64,8 +93,8 @@ export class SchedulesRepository extends Repository { } } - async ping(scheduleId = this.scheduleId): Promise { - await this.updateOne({ scheduleId }, { $set: { lastAlive: DateTime.now().toMillis() } }); + async ping(): Promise { + await this.updateOne({ scheduleId: this.scheduleId }, { $set: { lastAlive: DateTime.now().toMillis() } }); } async createIndex(): Promise { @@ -74,44 +103,44 @@ export class SchedulesRepository extends Repository { await this.collection.createIndex({ name: 1 }, { unique: true }); } - async removeJob(scheduleId: string, name: string): Promise { - const schedulesEntity = await this.findOne({ scheduleId }); + async removeJob(jobName: string): Promise { + const schedulesEntity = await this.findOne({ scheduleId: this.scheduleId }); if (!schedulesEntity) { - throw new Error(`schedulesEntity not found for scheduleId=${scheduleId}`); + throw new Error(`schedulesEntity not found for scheduleId=${this.scheduleId}`); } const executions = schedulesEntity.executions; - delete executions[name]; - await this.updateOne({ scheduleId }, { $set: { executions } }); + delete executions[jobName]; + await this.updateOne({ scheduleId: this.scheduleId }, { $set: { executions } }); } - async addExecution(name: string, maxRunning: number): Promise<{ added: boolean; running: number }> { + async addExecution(jobName: string, maxRunning: number): Promise<{ added: boolean; running: number }> { if (maxRunning < 1) { const schedule = await this.collection.findOneAndUpdate( { name: this.name }, - { $inc: { [`executions.${name}`]: 1 } }, + { $inc: { [`executions.${jobName}`]: 1 } }, { returnDocument: 'after' }, ); - return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? 0 }; + return { added: schedule.value !== null, running: schedule.value?.executions[jobName] ?? 0 }; } const schedule = await this.collection.findOneAndUpdate( { name: this.name, - $or: [{ [`executions.${name}`]: { $lt: maxRunning } }, { [`executions.${name}`]: { $exists: false } }], + $or: [{ [`executions.${jobName}`]: { $lt: maxRunning } }, { [`executions.${jobName}`]: { $exists: false } }], }, - { $inc: { [`executions.${name}`]: 1 } }, + { $inc: { [`executions.${jobName}`]: 1 } }, { returnDocument: 'after' }, ); - return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? maxRunning }; + return { added: schedule.value !== null, running: schedule.value?.executions[jobName] ?? maxRunning }; } - async removeExecution(name: string): Promise { - await this.updateOne({ name: this.name }, { $inc: { [`executions.${name}`]: -1 } }); + async removeExecution(jobName: string): Promise { + await this.updateOne({ name: this.name }, { $inc: { [`executions.${jobName}`]: -1 } }); } - async countRunningExecutions(name: string): Promise { - return (await this.findOne({ scheduleId: this.scheduleId }))?.executions[name] ?? 0; + async countRunningExecutions(jobName: string): Promise { + return (await this.findOne({ scheduleId: this.scheduleId }))?.executions[jobName] ?? 0; } } diff --git a/src/schedule/MongoSchedule.ts b/src/schedule/MongoSchedule.ts index c149337b..68d0c871 100644 --- a/src/schedule/MongoSchedule.ts +++ b/src/schedule/MongoSchedule.ts @@ -27,7 +27,6 @@ export class MongoSchedule extends Schedule { protected readonly scheduleId: string, protected readonly connection: Connection, pingIntervalMs: number, - scheduleName: string, ) { const schedulesRepository = connection.getSchedulesRepository(); const jobRepository = connection.getJobRepository(); @@ -40,7 +39,6 @@ export class MongoSchedule extends Schedule { this.disconnectFct = connection.disconnect.bind(connection); this.schedulePing = new SchedulePing( scheduleId, - scheduleName, schedulesRepository, this.logger, pingIntervalMs, @@ -61,7 +59,7 @@ export class MongoSchedule extends Schedule { const scheduleId = uuid(); const connection = await Connection.create(connectionOptions, pingIntervalMs, scheduleId, scheduleName); - return new MongoSchedule(scheduleId, connection, pingIntervalMs, scheduleName); + return new MongoSchedule(scheduleId, connection, pingIntervalMs); } /** diff --git a/src/schedule/Schedule.ts b/src/schedule/Schedule.ts index ea998495..eb514daf 100644 --- a/src/schedule/Schedule.ts +++ b/src/schedule/Schedule.ts @@ -77,13 +77,7 @@ export class Schedule extends LogEmitter { await this.jobRepository.define(job); - this.jobSchedulers[job.name] = JobScheduler.forJob( - this.scheduleId, - job, - this.logger, - this.schedulesRepository, - this.jobRepository, - ); + this.jobSchedulers[job.name] = JobScheduler.forJob(job, this.logger, this.schedulesRepository, this.jobRepository); return true; } diff --git a/src/schedule/SchedulePing.ts b/src/schedule/SchedulePing.ts index 9cfb13d5..53bab081 100644 --- a/src/schedule/SchedulePing.ts +++ b/src/schedule/SchedulePing.ts @@ -1,6 +1,6 @@ import { DateTime } from 'luxon'; -import { SchedulesRepository } from '../repository/SchedulesRepository'; +import { ScheduleState, SchedulesRepository } from '../repository/SchedulesRepository'; import { Logger } from '../logging/Logger'; import { setSafeInterval } from '../timeout/safeTimeouts'; import { MomoErrorType } from '../logging/error/MomoErrorType'; @@ -17,7 +17,6 @@ export class SchedulePing { constructor( private readonly scheduleId: string, - private readonly scheduleName: string, private readonly schedulesRepository: SchedulesRepository, private readonly logger: Logger, private readonly interval: number, @@ -38,16 +37,17 @@ export class SchedulePing { } private async checkActiveSchedule(): Promise { - const activeSchedule = await this.schedulesRepository.getActiveSchedule(this.scheduleName); + const now = DateTime.now().toMillis(); + const scheduleState = await this.schedulesRepository.getScheduleState(now); const active = - activeSchedule === undefined - ? await this.schedulesRepository.setActiveSchedule(this.scheduleName) - : activeSchedule === this.scheduleId; + scheduleState === ScheduleState.INACTIVE + ? await this.schedulesRepository.setActiveSchedule(now) + : scheduleState === ScheduleState.THIS_INSTANCE_ACTIVE; this.logger.debug(`This schedule is ${active ? '' : 'not '}active`); if (active) { - await this.schedulesRepository.ping(this.scheduleId); + await this.schedulesRepository.ping(); if (this.startJobsStatus === StartJobsStatus.notStarted) { this.startJobsStatus = StartJobsStatus.inProgress; this.logger.debug('This schedule just turned active'); diff --git a/src/scheduler/JobScheduler.ts b/src/scheduler/JobScheduler.ts index 90a74c3c..e6f3824c 100644 --- a/src/scheduler/JobScheduler.ts +++ b/src/scheduler/JobScheduler.ts @@ -21,21 +21,19 @@ export class JobScheduler { constructor( private readonly jobName: string, private readonly jobExecutor: JobExecutor, - private readonly scheduleId: string, private readonly schedulesRepository: SchedulesRepository, private readonly jobRepository: JobRepository, private readonly logger: Logger, ) {} static forJob( - scheduleId: string, job: Job, logger: Logger, schedulesRepository: SchedulesRepository, jobRepository: JobRepository, ): JobScheduler { const executor = new JobExecutor(job.handler, schedulesRepository, jobRepository, logger); - return new JobScheduler(job.name, executor, scheduleId, schedulesRepository, jobRepository, logger); + return new JobScheduler(job.name, executor, schedulesRepository, jobRepository, logger); } getUnexpectedErrorCount(): number { @@ -102,7 +100,7 @@ export class JobScheduler { if (this.executableSchedule) { this.executableSchedule.stop(); this.jobExecutor.stop(); - await this.schedulesRepository.removeJob(this.scheduleId, this.jobName); + await this.schedulesRepository.removeJob(this.jobName); this.executableSchedule = undefined; } } diff --git a/test/repository/SchedulesRepository.integration.spec.ts b/test/repository/SchedulesRepository.integration.spec.ts index 92a0ebbb..9bc141e3 100644 --- a/test/repository/SchedulesRepository.integration.spec.ts +++ b/test/repository/SchedulesRepository.integration.spec.ts @@ -1,16 +1,19 @@ import { MongoMemoryServer } from 'mongodb-memory-server'; +import { DateTime } from 'luxon'; import { Connection } from '../../src/Connection'; -import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { sleep } from '../utils/sleep'; describe('SchedulesRepository', () => { const scheduleName = 'schedule'; const scheduleId = '123'; const pingInterval = 500; - // const name = 'test job'; + const name = 'test job'; let mongo: MongoMemoryServer; let connection: Connection; + let secondConnection: Connection | undefined; let schedulesRepository: SchedulesRepository; beforeAll(async () => { @@ -21,228 +24,247 @@ describe('SchedulesRepository', () => { }); beforeEach(async () => schedulesRepository.delete()); + afterEach(async () => secondConnection?.disconnect()); afterAll(async () => { await connection.disconnect(); await mongo.stop(); }); - describe('isActiveSchedule', () => { + describe('setActiveSchedule', () => { it('sets single schedule as active', async () => { - await schedulesRepository.setActiveSchedule(scheduleName); - const schedule = await schedulesRepository.findOne({ name: scheduleName }); + await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - expect(schedule!.scheduleId).toEqual(scheduleId); + const schedule = await schedulesRepository.findOne({ name: scheduleName, scheduleId }); + expect(schedule).not.toBeNull(); }); - it('single schedule is active', async () => { - await schedulesRepository.setActiveSchedule(scheduleName); + it('refuses to set another active schedule', async () => { + await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - const active = await schedulesRepository.getActiveSchedule(scheduleName); + const anotherScheduleId = 'not active'; + const anotherInstance = await Connection.create( + { url: mongo.getUri() }, + pingInterval, + anotherScheduleId, + scheduleName, + ); + const anotherSchedulesRepository = anotherInstance.getSchedulesRepository(); + const error = jest.fn(); + anotherSchedulesRepository.setLogger({ debug: jest.fn(), error }); - expect(active).toEqual(scheduleId); + const active = await anotherSchedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + await anotherInstance.disconnect(); + + expect(error).toHaveBeenCalled(); + expect(active).toEqual(false); + + const schedules = await schedulesRepository.find({ name: scheduleName }); + expect(schedules).toHaveLength(1); + expect(schedules[0]?.scheduleId).toEqual(scheduleId); + }); + + it('only one schedule of many concurrent ones is active', async () => { + const connections = await Promise.all( + ['a', 'b', 'c', 'd', 'e'].map(async (id) => + Connection.create({ url: mongo.getUri() }, pingInterval, id, scheduleName), + ), + ); + + const active = await Promise.all( + connections.map(async (connection) => + connection.getSchedulesRepository().setActiveSchedule(DateTime.now().toMillis()), + ), + ); + + await Promise.all(connections.map(async (connection) => connection.disconnect())); + + expect(active.filter((active) => active)).toHaveLength(1); + const schedules = await schedulesRepository.find({ name: scheduleName }); + expect(schedules).toHaveLength(1); + }); + + it('should replace dead schedules', async () => { + await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + const otherScheduleId = 'not active'; + + secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, scheduleName); + const secondSchedulesRepository = secondConnection.getSchedulesRepository(); + await sleep(2 * pingInterval); + + const active = await secondSchedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + + expect(active).toEqual(true); + }); + + it('sets two active schedules with different names', async () => { + const otherName = 'other schedule'; + const otherScheduleId = 'other schedule ID'; + + secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, otherName); + const secondSchedulesRepository = secondConnection.getSchedulesRepository(); + + const active = await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + const secondActive = await secondSchedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + + expect(active).toEqual(true); + expect(secondActive).toEqual(true); + }); + }); + + describe('getScheduleState', () => { + it('detects active schedule', async () => { + await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + + const active = await schedulesRepository.getScheduleState(DateTime.now().toMillis()); + + expect(active).toBe(ScheduleState.THIS_INSTANCE_ACTIVE); + }); + + it('detects active schedule after ping interval elapsed', async () => { + await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + await sleep(2 * pingInterval); + + const active = await schedulesRepository.getScheduleState(DateTime.now().toMillis()); + + expect(active).toBe(ScheduleState.THIS_INSTANCE_ACTIVE); + }); + + it('detects other active schedule with identical name', async () => { + secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, 'other schedule', scheduleName); + const secondSchedulesRepository = secondConnection.getSchedulesRepository(); + + const now = DateTime.now().toMillis(); + await schedulesRepository.setActiveSchedule(now); + + const active = await secondSchedulesRepository.getScheduleState(now); + + expect(active).toBe(ScheduleState.DIFFERENT_INSTANCE_ACTIVE); + }); + + it('does not consider dead schedule with identical name', async () => { + secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, 'other schedule', scheduleName); + const secondSchedulesRepository = secondConnection.getSchedulesRepository(); + + await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + await sleep(2 * pingInterval); + + const active = await secondSchedulesRepository.getScheduleState(DateTime.now().toMillis()); + + expect(active).toBe(ScheduleState.INACTIVE); + }); + + it('does not consider schedule with different name', async () => { + secondConnection = await Connection.create( + { url: mongo.getUri() }, + pingInterval, + 'other schedule', + 'other schedule', + ); + const secondSchedulesRepository = secondConnection.getSchedulesRepository(); + + const now = DateTime.now().toMillis(); + await schedulesRepository.setActiveSchedule(now); + + const active = await secondSchedulesRepository.getScheduleState(now); + + expect(active).toBe(ScheduleState.INACTIVE); + }); + }); + + describe('with active schedule', () => { + beforeEach(async () => schedulesRepository.setActiveSchedule(DateTime.now().toMillis())); + + describe('removeJob', () => { + it('can remove a job', async () => { + await schedulesRepository.addExecution(name, 0); + + const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); + + await schedulesRepository.removeJob(name); + const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntity2?.executions).toEqual({}); + }); + + it('keeps other job', async () => { + const otherName = 'other job'; + await schedulesRepository.addExecution(name, 0); + await schedulesRepository.addExecution(otherName, 0); + + const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntity?.executions).toEqual({ [name]: 1, [otherName]: 1 }); + + await schedulesRepository.removeJob(name); + const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntity2?.executions).toEqual({ [otherName]: 1 }); + }); }); - // it('only one schedule is active', async () => { - // const firstIsActive = await schedulesRepository.isActiveSchedule(scheduleName); - // - // const inactiveScheduleId = 'not active'; - // const inactiveConnection = await Connection.create( - // { url: mongo.getUri() }, - // pingInterval, - // inactiveScheduleId, - // scheduleName, - // ); - // const inactiveSchedulesRepository = inactiveConnection.getSchedulesRepository(); - // const secondIsActive = await inactiveSchedulesRepository.isActiveSchedule(scheduleName); - // await inactiveConnection.disconnect(); - // - // const entities = await schedulesRepository.find({}); - // - // expect(firstIsActive).toEqual(true); - // expect(secondIsActive).toEqual(false); - // expect(entities).toHaveLength(1); - // expect(entities[0]?.scheduleId).toEqual(scheduleId); - // }); - // - // it('only one schedule of many concurrent ones is active', async () => { - // const connections = await Promise.all( - // ['a', 'b', 'c', 'd', 'e'].map(async (id) => - // Connection.create({ url: mongo.getUri() }, pingInterval, id, scheduleName), - // ), - // ); - // - // const schedulesActiveStatus = await Promise.all( - // connections.map(async (connection) => { - // const newSchedulesRepository = connection.getSchedulesRepository(); - // return newSchedulesRepository.isActiveSchedule(scheduleName); - // }), - // ); - // - // const entities = await schedulesRepository.find({}); - // await Promise.all(connections.map(async (connection) => connection.disconnect())); - // - // expect(schedulesActiveStatus.filter((active) => active)).toHaveLength(1); - // expect(entities).toHaveLength(1); - // }); - // - // it('should replace dead schedules', async () => { - // const active = await schedulesRepository.isActiveSchedule(scheduleName); - // const secondScheduleId = 'not active'; - // const secondConnection = await Connection.create( - // { url: mongo.getUri() }, - // pingInterval, - // secondScheduleId, - // scheduleName, - // ); - // const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); - // const secondActive = await secondSchedulesRepository.isActiveSchedule(scheduleName); - // - // expect(active).toEqual(true); - // expect(secondActive).toEqual(false); - // - // await sleep(1200); - // - // const secondTakeOver = await secondSchedulesRepository.isActiveSchedule(scheduleName); - // await secondConnection.disconnect(); - // expect(secondTakeOver).toEqual(true); - // }); - // - // it('should allow two active schedules with different names', async () => { - // const isActive = await schedulesRepository.isActiveSchedule(scheduleName); - // const otherScheduleName = 'other schedule'; - // const secondScheduleId = 'first other schedule ID'; - // const thirdScheduleId = 'second other schedule ID'; - // const secondConnection = await Connection.create( - // { url: mongo.getUri() }, - // pingInterval, - // secondScheduleId, - // otherScheduleName, - // ); - // const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); - // const isSecondActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); - // const thirdConnection = await Connection.create( - // { url: mongo.getUri() }, - // pingInterval, - // thirdScheduleId, - // otherScheduleName, - // ); - // const thirdSchedulesRepository = await thirdConnection.getSchedulesRepository(); - // const isThirdActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); - // - // expect(isActive).toEqual(true); - // expect(isSecondActive).toEqual(true); - // expect(isThirdActive).toEqual(false); - // - // await sleep(2 * pingInterval + 100); - // - // const isFirstStillActive = await schedulesRepository.isActiveSchedule(scheduleName); - // const didThirdTakeOverActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); - // expect(isFirstStillActive).toEqual(true); - // expect(didThirdTakeOverActive).toEqual(true); - // const isSecondStillActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); - // expect(isSecondStillActive).toEqual(false); - // - // await secondConnection.disconnect(); - // await thirdConnection.disconnect(); - // }); - // }); - // - // describe('with schedule', () => { - // beforeEach(async () => { - // await schedulesRepository.isActiveSchedule(scheduleName); - // }); - // - // describe('removeJob', () => { - // it('can remove a job', async () => { - // await schedulesRepository.addExecution(name, 0); - // - // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - // expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); - // - // await schedulesRepository.removeJob(scheduleId, name); - // const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); - // expect(schedulesEntity2?.executions).toEqual({}); - // }); - // - // it('keeps other job', async () => { - // const otherName = 'other job'; - // await schedulesRepository.addExecution(name, 0); - // await schedulesRepository.addExecution(otherName, 0); - // - // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - // expect(schedulesEntity?.executions).toEqual({ [name]: 1, [otherName]: 1 }); - // - // await schedulesRepository.removeJob(scheduleId, name); - // const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); - // expect(schedulesEntity2?.executions).toEqual({ [otherName]: 1 }); - // }); - // }); - // - // describe('execution', () => { - // it('can add and remove an execution', async () => { - // const { added, running } = await schedulesRepository.addExecution(name, 1); - // expect(added).toBe(true); - // expect(running).toBe(1); - // - // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - // expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); - // - // await schedulesRepository.removeExecution(name); - // - // const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); - // expect(schedulesEntity2?.executions).toEqual({ [name]: 0 }); - // }); - // - // it('cannot add an execution when maxRunning is reached', async () => { - // await schedulesRepository.addExecution(name, 1); - // const { added, running } = await schedulesRepository.addExecution(name, 1); - // expect(added).toBe(false); - // expect(running).toBe(1); - // }); - // - // it('can add an execution with maxRunning set to 0', async () => { - // const executionPing = await schedulesRepository.addExecution(name, 0); - // expect(executionPing).toBeDefined(); - // }); - // - // it('does not add executions in schedule that is not active', async () => { - // const otherScheduleId = 'other schedule'; - // const otherConnection = await Connection.create( - // { url: mongo.getUri() }, - // pingInterval, - // otherScheduleId, - // scheduleName, - // ); - // const otherSchedulesRepository = otherConnection.getSchedulesRepository(); - // - // await otherSchedulesRepository.addExecution(name, 2); - // - // const running = await otherSchedulesRepository.countRunningExecutions(name); - // await otherConnection.disconnect(); - // expect(running).toBe(0); - // }); - // }); - // - // describe('countRunningExecutions', () => { - // it('returns number of executions', async () => { - // await schedulesRepository.addExecution(name, 2); - // await schedulesRepository.addExecution(name, 2); - // - // const running = await schedulesRepository.countRunningExecutions(name); - // expect(running).toBe(2); - // }); - // }); - // - // describe('ping', () => { - // it('updates timestamp', async () => { - // const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - // - // await schedulesRepository.ping(); - // - // const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); - // expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); - // }); - // }); + describe('execution', () => { + it('can add and remove an execution', async () => { + const { added, running } = await schedulesRepository.addExecution(name, 1); + expect(added).toBe(true); + expect(running).toBe(1); + + const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); + + await schedulesRepository.removeExecution(name); + + const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntity2?.executions).toEqual({ [name]: 0 }); + }); + + it('cannot add an execution when maxRunning is reached', async () => { + await schedulesRepository.addExecution(name, 1); + const { added, running } = await schedulesRepository.addExecution(name, 1); + expect(added).toBe(false); + expect(running).toBe(1); + }); + + it('can add an execution with maxRunning set to 0', async () => { + const executionPing = await schedulesRepository.addExecution(name, 0); + expect(executionPing).toBeDefined(); + }); + + it('does not add executions in schedule that is not active', async () => { + const otherScheduleId = 'other schedule'; + const otherConnection = await Connection.create( + { url: mongo.getUri() }, + pingInterval, + otherScheduleId, + scheduleName, + ); + const otherSchedulesRepository = otherConnection.getSchedulesRepository(); + + await otherSchedulesRepository.addExecution(name, 2); + + const running = await otherSchedulesRepository.countRunningExecutions(name); + await otherConnection.disconnect(); + expect(running).toBe(0); + }); + }); + + describe('countRunningExecutions', () => { + it('returns number of executions', async () => { + await schedulesRepository.addExecution(name, 2); + await schedulesRepository.addExecution(name, 2); + + const running = await schedulesRepository.countRunningExecutions(name); + expect(running).toBe(2); + }); + }); + + describe('ping', () => { + it('updates timestamp', async () => { + const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + + await schedulesRepository.ping(); + + const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); + }); + }); }); }); diff --git a/test/schedule/MongoSchedule.spec.ts b/test/schedule/MongoSchedule.spec.ts index 43a26a0e..dfbbb28c 100644 --- a/test/schedule/MongoSchedule.spec.ts +++ b/test/schedule/MongoSchedule.spec.ts @@ -1,6 +1,6 @@ -import { anyString, deepEqual, instance, mock, verify } from 'ts-mockito'; +import { anyNumber, anyString, deepEqual, instance, mock, verify, when } from 'ts-mockito'; -import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { JobRepository } from '../../src/repository/JobRepository'; import { MomoOptions, MongoSchedule } from '../../src'; @@ -27,6 +27,8 @@ describe('MongoSchedule', () => { }); it('connects and starts the ping and disconnects and stops the ping', async () => { + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.INACTIVE); + const mongoSchedule = await MongoSchedule.connect({ scheduleName: 'schedule', url: 'mongodb://does.not/matter' }); const secondSchedule = await MongoSchedule.connect({ scheduleName: 'secondSchedule', @@ -34,9 +36,9 @@ describe('MongoSchedule', () => { }); await mongoSchedule.start(); - verify(schedulesRepository.isActiveSchedule('schedule')).once(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); await secondSchedule.start(); - verify(schedulesRepository.isActiveSchedule('secondSchedule')).once(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); await mongoSchedule.disconnect(); await secondSchedule.disconnect(); diff --git a/test/schedule/Schedule.spec.ts b/test/schedule/Schedule.spec.ts index d75b041d..5e7c85e7 100644 --- a/test/schedule/Schedule.spec.ts +++ b/test/schedule/Schedule.spec.ts @@ -1,8 +1,8 @@ -import { deepEqual, instance, mock, when } from 'ts-mockito'; +import { anyNumber, deepEqual, instance, mock, when } from 'ts-mockito'; import { ObjectId } from 'mongodb'; import { ExecutionStatus, MomoEvent, MomoJob, MomoOptions, MongoSchedule } from '../../src'; -import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { JobRepository } from '../../src/repository/JobRepository'; import { initLoggingForTests } from '../utils/logging'; import { toJobDefinition, tryToIntervalJob } from '../../src/job/Job'; @@ -39,7 +39,7 @@ describe('Schedule', () => { jest.clearAllMocks(); when(jobRepository.find(deepEqual({ name: momoJob.name }))).thenResolve([]); - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); mongoSchedule = await MongoSchedule.connect({ scheduleName, url: 'mongodb://does.not/matter' }); initLoggingForTests(mongoSchedule); diff --git a/test/schedule/SchedulePing.spec.ts b/test/schedule/SchedulePing.spec.ts index 8cb493db..e196e1ce 100644 --- a/test/schedule/SchedulePing.spec.ts +++ b/test/schedule/SchedulePing.spec.ts @@ -1,12 +1,11 @@ -import { deepEqual, instance, mock, verify, when } from 'ts-mockito'; +import { anyNumber, deepEqual, instance, mock, verify, when } from 'ts-mockito'; -import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { SchedulePing } from '../../src/schedule/SchedulePing'; import { sleep } from '../utils/sleep'; describe('SchedulePing', () => { const scheduleId = '123'; - const scheduleName = 'schedule'; const interval = 1000; let error: jest.Mock; @@ -20,7 +19,6 @@ describe('SchedulePing', () => { error = jest.fn(); schedulePing = new SchedulePing( scheduleId, - scheduleName, instance(schedulesRepository), { debug: jest.fn(), error }, interval, @@ -31,33 +29,33 @@ describe('SchedulePing', () => { afterEach(async () => schedulePing.stop()); it('starts, pings, cleans and stops', async () => { - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); await schedulePing.start(); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.ping(scheduleId)).once(); + verify(schedulesRepository.ping()).once(); await sleep(1.1 * interval); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.ping(scheduleId)).twice(); + verify(schedulesRepository.ping()).twice(); await schedulePing.stop(); await sleep(interval); - verify(schedulesRepository.ping(scheduleId)).twice(); + verify(schedulesRepository.ping()).twice(); verify(schedulesRepository.deleteOne(deepEqual({ scheduleId }))).once(); }); it('handles mongo errors', async () => { - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); const message = 'I am an error that should be caught'; - when(schedulesRepository.ping(scheduleId)).thenReject({ + when(schedulesRepository.ping()).thenReject({ message, } as Error); await schedulePing.start(); - verify(schedulesRepository.ping(scheduleId)).once(); + verify(schedulesRepository.ping()).once(); expect(error).toHaveBeenCalledWith( 'Pinging or cleaning the Schedules repository failed', 'an internal error occurred', @@ -67,28 +65,28 @@ describe('SchedulePing', () => { }); it('handles job start taking longer than interval', async () => { - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(false); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.DIFFERENT_INSTANCE_ACTIVE); startAllJobs.mockImplementation(async () => sleep(2 * interval)); await schedulePing.start(); - verify(schedulesRepository.ping(scheduleId)).never(); + verify(schedulesRepository.ping()).never(); expect(startAllJobs).toHaveBeenCalledTimes(0); - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); await sleep(1.1 * interval); - verify(schedulesRepository.ping(scheduleId)).once(); + verify(schedulesRepository.ping()).once(); expect(startAllJobs).toHaveBeenCalledTimes(1); await sleep(1.1 * interval); - verify(schedulesRepository.ping(scheduleId)).twice(); + verify(schedulesRepository.ping()).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); await schedulePing.stop(); await sleep(interval); - verify(schedulesRepository.ping(scheduleId)).twice(); + verify(schedulesRepository.ping()).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); verify(schedulesRepository.deleteOne(deepEqual({ scheduleId }))).once(); }); diff --git a/test/scheduler/JobScheduler.spec.ts b/test/scheduler/JobScheduler.spec.ts index 08c70022..27f80b5b 100644 --- a/test/scheduler/JobScheduler.spec.ts +++ b/test/scheduler/JobScheduler.spec.ts @@ -13,7 +13,6 @@ import { CronSchedule } from '../../src/job/MomoJob'; describe('JobScheduler', () => { const errorFn = jest.fn(); - const scheduleId = '123'; let schedulesRepository: SchedulesRepository; let jobRepository: JobRepository; @@ -38,7 +37,6 @@ describe('JobScheduler', () => { jobScheduler = new JobScheduler( job.name, instance(jobExecutor), - scheduleId, instance(schedulesRepository), instance(jobRepository), loggerForTests(errorFn), From 9f0f295a4de10fcf7d7c35c787346aad436edad9 Mon Sep 17 00:00:00 2001 From: Ute Weiss Date: Fri, 18 Aug 2023 17:52:35 +0200 Subject: [PATCH 4/8] refactor: avoid storing scheduleId in SchedulePing, get it from repo; fix enum for consistency Signed-off-by: Ute Weiss --- src/repository/JobRepository.ts | 4 ---- src/repository/Repository.ts | 4 ---- src/repository/SchedulesRepository.ts | 20 +++++++++++----- src/schedule/MongoSchedule.ts | 1 - src/schedule/SchedulePing.ts | 10 ++++---- .../SchedulesRepository.integration.spec.ts | 10 ++++---- test/schedule/MongoSchedule.spec.ts | 6 ++--- test/schedule/Schedule.spec.ts | 2 +- test/schedule/SchedulePing.spec.ts | 23 +++++++------------ 9 files changed, 36 insertions(+), 44 deletions(-) diff --git a/src/repository/JobRepository.ts b/src/repository/JobRepository.ts index 6d2a7633..25ef26cd 100644 --- a/src/repository/JobRepository.ts +++ b/src/repository/JobRepository.ts @@ -26,10 +26,6 @@ export class JobRepository extends Repository { return job?.executionInfo; } - async clear(): Promise { - await this.delete(); - } - async define(job: JobDefinition): Promise { const { name, schedule, concurrency, maxRunning } = job; diff --git a/src/repository/Repository.ts b/src/repository/Repository.ts index cfb5e1b0..5e305671 100644 --- a/src/repository/Repository.ts +++ b/src/repository/Repository.ts @@ -34,10 +34,6 @@ export class Repository { return result.deletedCount; } - async deleteOne(filter: Filter): Promise { - await this.collection.deleteOne(filter); - } - // mongodb returns null instead of undefined for optional fields private mapNullToUndefined(entity: WithId): WithId { const keys = Object.keys(entity); diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index 86d4b3dd..7658ef22 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -9,9 +9,9 @@ import { MomoErrorType } from '../logging/error/MomoErrorType'; export const SCHEDULES_COLLECTION_NAME = 'schedules'; export enum ScheduleState { - INACTIVE, - DIFFERENT_INSTANCE_ACTIVE, - THIS_INSTANCE_ACTIVE, + inactive, + differentInstanceActive, + thisInstanceActive, } export class SchedulesRepository extends Repository { @@ -31,6 +31,14 @@ export class SchedulesRepository extends Repository { this.logger = logger; } + getScheduleId(): string { + return this.scheduleId; + } + + async deleteOne(): Promise { + await this.collection.deleteOne({ scheduleId: this.scheduleId }); + } + /** * Checks the state of the schedule represented by this repository. * @@ -45,14 +53,14 @@ export class SchedulesRepository extends Repository { const threshold = now - this.deadScheduleThreshold; const activeSchedule = await this.collection.findOne({ name: this.name }); if (activeSchedule === null) { - return ScheduleState.INACTIVE; + return ScheduleState.inactive; } if (activeSchedule.scheduleId !== this.scheduleId) { - return activeSchedule.lastAlive >= threshold ? ScheduleState.DIFFERENT_INSTANCE_ACTIVE : ScheduleState.INACTIVE; + return activeSchedule.lastAlive >= threshold ? ScheduleState.differentInstanceActive : ScheduleState.inactive; } - return ScheduleState.THIS_INSTANCE_ACTIVE; + return ScheduleState.thisInstanceActive; } /** diff --git a/src/schedule/MongoSchedule.ts b/src/schedule/MongoSchedule.ts index 68d0c871..efdf9138 100644 --- a/src/schedule/MongoSchedule.ts +++ b/src/schedule/MongoSchedule.ts @@ -38,7 +38,6 @@ export class MongoSchedule extends Schedule { this.disconnectFct = connection.disconnect.bind(connection); this.schedulePing = new SchedulePing( - scheduleId, schedulesRepository, this.logger, pingIntervalMs, diff --git a/src/schedule/SchedulePing.ts b/src/schedule/SchedulePing.ts index 53bab081..8328099c 100644 --- a/src/schedule/SchedulePing.ts +++ b/src/schedule/SchedulePing.ts @@ -16,7 +16,6 @@ export class SchedulePing { private startJobsStatus: StartJobsStatus = StartJobsStatus.notStarted; constructor( - private readonly scheduleId: string, private readonly schedulesRepository: SchedulesRepository, private readonly logger: Logger, private readonly interval: number, @@ -40,9 +39,9 @@ export class SchedulePing { const now = DateTime.now().toMillis(); const scheduleState = await this.schedulesRepository.getScheduleState(now); const active = - scheduleState === ScheduleState.INACTIVE + scheduleState === ScheduleState.inactive ? await this.schedulesRepository.setActiveSchedule(now) - : scheduleState === ScheduleState.THIS_INSTANCE_ACTIVE; + : scheduleState === ScheduleState.thisInstanceActive; this.logger.debug(`This schedule is ${active ? '' : 'not '}active`); @@ -62,9 +61,10 @@ export class SchedulePing { async stop(): Promise { if (this.handle) { - this.logger.debug('stop SchedulePing', { scheduleId: this.scheduleId }); + this.logger.debug('stop SchedulePing', { scheduleId: this.schedulesRepository.getScheduleId() }); clearInterval(this.handle); } - await this.schedulesRepository.deleteOne({ scheduleId: this.scheduleId }); + + await this.schedulesRepository.deleteOne(); } } diff --git a/test/repository/SchedulesRepository.integration.spec.ts b/test/repository/SchedulesRepository.integration.spec.ts index 9bc141e3..d339beb2 100644 --- a/test/repository/SchedulesRepository.integration.spec.ts +++ b/test/repository/SchedulesRepository.integration.spec.ts @@ -118,7 +118,7 @@ describe('SchedulesRepository', () => { const active = await schedulesRepository.getScheduleState(DateTime.now().toMillis()); - expect(active).toBe(ScheduleState.THIS_INSTANCE_ACTIVE); + expect(active).toBe(ScheduleState.thisInstanceActive); }); it('detects active schedule after ping interval elapsed', async () => { @@ -127,7 +127,7 @@ describe('SchedulesRepository', () => { const active = await schedulesRepository.getScheduleState(DateTime.now().toMillis()); - expect(active).toBe(ScheduleState.THIS_INSTANCE_ACTIVE); + expect(active).toBe(ScheduleState.thisInstanceActive); }); it('detects other active schedule with identical name', async () => { @@ -139,7 +139,7 @@ describe('SchedulesRepository', () => { const active = await secondSchedulesRepository.getScheduleState(now); - expect(active).toBe(ScheduleState.DIFFERENT_INSTANCE_ACTIVE); + expect(active).toBe(ScheduleState.differentInstanceActive); }); it('does not consider dead schedule with identical name', async () => { @@ -151,7 +151,7 @@ describe('SchedulesRepository', () => { const active = await secondSchedulesRepository.getScheduleState(DateTime.now().toMillis()); - expect(active).toBe(ScheduleState.INACTIVE); + expect(active).toBe(ScheduleState.inactive); }); it('does not consider schedule with different name', async () => { @@ -168,7 +168,7 @@ describe('SchedulesRepository', () => { const active = await secondSchedulesRepository.getScheduleState(now); - expect(active).toBe(ScheduleState.INACTIVE); + expect(active).toBe(ScheduleState.inactive); }); }); diff --git a/test/schedule/MongoSchedule.spec.ts b/test/schedule/MongoSchedule.spec.ts index dfbbb28c..d1128d63 100644 --- a/test/schedule/MongoSchedule.spec.ts +++ b/test/schedule/MongoSchedule.spec.ts @@ -1,4 +1,4 @@ -import { anyNumber, anyString, deepEqual, instance, mock, verify, when } from 'ts-mockito'; +import { anyNumber, instance, mock, verify, when } from 'ts-mockito'; import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { JobRepository } from '../../src/repository/JobRepository'; @@ -27,7 +27,7 @@ describe('MongoSchedule', () => { }); it('connects and starts the ping and disconnects and stops the ping', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.INACTIVE); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.inactive); const mongoSchedule = await MongoSchedule.connect({ scheduleName: 'schedule', url: 'mongodb://does.not/matter' }); const secondSchedule = await MongoSchedule.connect({ @@ -42,7 +42,7 @@ describe('MongoSchedule', () => { await mongoSchedule.disconnect(); await secondSchedule.disconnect(); - verify(schedulesRepository.deleteOne(deepEqual({ scheduleId: anyString() }))).twice(); + verify(schedulesRepository.deleteOne()).twice(); expect(disconnect).toHaveBeenCalledTimes(2); }); diff --git a/test/schedule/Schedule.spec.ts b/test/schedule/Schedule.spec.ts index 5e7c85e7..e12a178f 100644 --- a/test/schedule/Schedule.spec.ts +++ b/test/schedule/Schedule.spec.ts @@ -39,7 +39,7 @@ describe('Schedule', () => { jest.clearAllMocks(); when(jobRepository.find(deepEqual({ name: momoJob.name }))).thenResolve([]); - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); mongoSchedule = await MongoSchedule.connect({ scheduleName, url: 'mongodb://does.not/matter' }); initLoggingForTests(mongoSchedule); diff --git a/test/schedule/SchedulePing.spec.ts b/test/schedule/SchedulePing.spec.ts index e196e1ce..9a1cfbf4 100644 --- a/test/schedule/SchedulePing.spec.ts +++ b/test/schedule/SchedulePing.spec.ts @@ -1,11 +1,10 @@ -import { anyNumber, deepEqual, instance, mock, verify, when } from 'ts-mockito'; +import { anyNumber, instance, mock, verify, when } from 'ts-mockito'; import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { SchedulePing } from '../../src/schedule/SchedulePing'; import { sleep } from '../utils/sleep'; describe('SchedulePing', () => { - const scheduleId = '123'; const interval = 1000; let error: jest.Mock; @@ -17,19 +16,13 @@ describe('SchedulePing', () => { startAllJobs = jest.fn(); schedulesRepository = mock(SchedulesRepository); error = jest.fn(); - schedulePing = new SchedulePing( - scheduleId, - instance(schedulesRepository), - { debug: jest.fn(), error }, - interval, - startAllJobs, - ); + schedulePing = new SchedulePing(instance(schedulesRepository), { debug: jest.fn(), error }, interval, startAllJobs); }); afterEach(async () => schedulePing.stop()); it('starts, pings, cleans and stops', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); await schedulePing.start(); expect(startAllJobs).toHaveBeenCalledTimes(1); @@ -43,11 +36,11 @@ describe('SchedulePing', () => { await sleep(interval); verify(schedulesRepository.ping()).twice(); - verify(schedulesRepository.deleteOne(deepEqual({ scheduleId }))).once(); + verify(schedulesRepository.deleteOne()).once(); }); it('handles mongo errors', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); const message = 'I am an error that should be caught'; when(schedulesRepository.ping()).thenReject({ message, @@ -65,7 +58,7 @@ describe('SchedulePing', () => { }); it('handles job start taking longer than interval', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.DIFFERENT_INSTANCE_ACTIVE); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.differentInstanceActive); startAllJobs.mockImplementation(async () => sleep(2 * interval)); @@ -74,7 +67,7 @@ describe('SchedulePing', () => { verify(schedulesRepository.ping()).never(); expect(startAllJobs).toHaveBeenCalledTimes(0); - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.THIS_INSTANCE_ACTIVE); + when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); await sleep(1.1 * interval); verify(schedulesRepository.ping()).once(); @@ -88,6 +81,6 @@ describe('SchedulePing', () => { await sleep(interval); verify(schedulesRepository.ping()).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.deleteOne(deepEqual({ scheduleId }))).once(); + verify(schedulesRepository.deleteOne()).once(); }); }); From a7a8b3f6845182e0f1e2213163823e571a3f0920 Mon Sep 17 00:00:00 2001 From: Ute Weiss Date: Fri, 1 Sep 2023 17:27:12 +0200 Subject: [PATCH 5/8] refactor: replace ping with setActiveSchedule Signed-off-by: Ute Weiss --- src/repository/SchedulesRepository.ts | 81 +++++++++---------- src/schedule/SchedulePing.ts | 39 ++++----- .../SchedulesRepository.integration.spec.ts | 74 ++++++++--------- test/schedule/MongoSchedule.spec.ts | 4 +- test/schedule/Schedule.spec.ts | 5 +- test/schedule/SchedulePing.spec.ts | 67 +++++++++++---- 6 files changed, 153 insertions(+), 117 deletions(-) diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index 7658ef22..f4b121ff 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -1,18 +1,14 @@ -import { DateTime } from 'luxon'; -import { MongoClient } from 'mongodb'; +import { Filter, MongoClient, MongoServerError } from 'mongodb'; import { ScheduleEntity } from './ScheduleEntity'; import { Repository } from './Repository'; import { Logger } from '../logging/Logger'; import { MomoErrorType } from '../logging/error/MomoErrorType'; +import { MomoEventData } from '../logging/MomoEvents'; export const SCHEDULES_COLLECTION_NAME = 'schedules'; -export enum ScheduleState { - inactive, - differentInstanceActive, - thisInstanceActive, -} +const duplicateKeyErrorCode = 11000; export class SchedulesRepository extends Repository { private logger: Logger | undefined; @@ -31,8 +27,8 @@ export class SchedulesRepository extends Repository { this.logger = logger; } - getScheduleId(): string { - return this.scheduleId; + getLogData(): MomoEventData { + return { name: this.name, scheduleId: this.scheduleId }; } async deleteOne(): Promise { @@ -49,18 +45,15 @@ export class SchedulesRepository extends Repository { * @param now timestamp in milliseconds * @returns the schedule's state */ - async getScheduleState(now: number): Promise { + async isActiveSchedule(now: number): Promise { const threshold = now - this.deadScheduleThreshold; const activeSchedule = await this.collection.findOne({ name: this.name }); - if (activeSchedule === null) { - return ScheduleState.inactive; - } - if (activeSchedule.scheduleId !== this.scheduleId) { - return activeSchedule.lastAlive >= threshold ? ScheduleState.differentInstanceActive : ScheduleState.inactive; + if (activeSchedule === null || activeSchedule.scheduleId === this.scheduleId) { + return true; } - return ScheduleState.thisInstanceActive; + return activeSchedule.lastAlive < threshold; // if activeSchedule is too old, take over and make this one active } /** @@ -72,42 +65,48 @@ export class SchedulesRepository extends Repository { async setActiveSchedule(now: number): Promise { const threshold = now - this.deadScheduleThreshold; + const deadSchedule: Filter = { name: this.name, lastAlive: { $lt: threshold } }; + const thisSchedule: Filter = { scheduleId: this.scheduleId }; + + const updatedSchedule: ScheduleEntity = { + name: this.name, + scheduleId: this.scheduleId, + lastAlive: now, + executions: {}, + }; + try { await this.collection.updateOne( - { name: this.name, lastAlive: { $lt: threshold } }, // overwrite a dead (too old) schedule - { - $set: { - name: this.name, - scheduleId: this.scheduleId, - lastAlive: DateTime.now().toMillis(), - executions: {}, - }, - }, - { - upsert: true, - }, + { $or: [deadSchedule, thisSchedule] }, + { $set: updatedSchedule }, + { upsert: true }, ); return true; } catch (error) { - // We seem to have a schedule that's alive. The unique name index probably prevented the upsert. - this.logger?.error( - 'The database reported an unexpected error', - MomoErrorType.internal, - { scheduleId: this.scheduleId }, - error, - ); + if (error instanceof MongoServerError && error.code === duplicateKeyErrorCode) { + this.logger?.debug( + 'Cannot set active schedule - another schedule with this name is already active', + this.getLogData(), + ); + } else { + this.logger?.error( + 'The database reported an unexpected error', + MomoErrorType.internal, + this.getLogData(), + error, + ); + } + return false; } } - async ping(): Promise { - await this.updateOne({ scheduleId: this.scheduleId }, { $set: { lastAlive: DateTime.now().toMillis() } }); - } - + /** + * This unique index ensures that we do not insert more than one active schedule per schedule name + * into the repository. + */ async createIndex(): Promise { - // this unique index ensures that we do not insert more than one active schedule - // in the repository per schedule name await this.collection.createIndex({ name: 1 }, { unique: true }); } diff --git a/src/schedule/SchedulePing.ts b/src/schedule/SchedulePing.ts index 8328099c..ff14e21f 100644 --- a/src/schedule/SchedulePing.ts +++ b/src/schedule/SchedulePing.ts @@ -1,6 +1,6 @@ import { DateTime } from 'luxon'; -import { ScheduleState, SchedulesRepository } from '../repository/SchedulesRepository'; +import { SchedulesRepository } from '../repository/SchedulesRepository'; import { Logger } from '../logging/Logger'; import { setSafeInterval } from '../timeout/safeTimeouts'; import { MomoErrorType } from '../logging/error/MomoErrorType'; @@ -30,38 +30,39 @@ export class SchedulePing { try { await this.checkActiveSchedule(); } catch (e) { - this.logger.error(errorMessage, MomoErrorType.internal, {}, e); + this.logger.error(errorMessage, MomoErrorType.internal, this.schedulesRepository.getLogData(), e); } this.handle = setSafeInterval(this.checkActiveSchedule.bind(this), this.interval, this.logger, errorMessage); } private async checkActiveSchedule(): Promise { const now = DateTime.now().toMillis(); - const scheduleState = await this.schedulesRepository.getScheduleState(now); - const active = - scheduleState === ScheduleState.inactive - ? await this.schedulesRepository.setActiveSchedule(now) - : scheduleState === ScheduleState.thisInstanceActive; - - this.logger.debug(`This schedule is ${active ? '' : 'not '}active`); + const active = await this.schedulesRepository.isActiveSchedule(now); + if (!active) { + this.logger.debug('This schedule is not active', this.schedulesRepository.getLogData()); + return; + } - if (active) { - await this.schedulesRepository.ping(); - if (this.startJobsStatus === StartJobsStatus.notStarted) { - this.startJobsStatus = StartJobsStatus.inProgress; - this.logger.debug('This schedule just turned active'); + if (!(await this.schedulesRepository.setActiveSchedule(now))) { + return; + } - await this.startAllJobs(); + this.logger.debug('This schedule is active', this.schedulesRepository.getLogData()); - this.startJobsStatus = StartJobsStatus.finished; - this.logger.debug('Finished starting scheduled jobs'); - } + if (this.startJobsStatus !== StartJobsStatus.notStarted) { + return; } + + this.startJobsStatus = StartJobsStatus.inProgress; + this.logger.debug('This schedule just turned active', this.schedulesRepository.getLogData()); + await this.startAllJobs(); + this.startJobsStatus = StartJobsStatus.finished; + this.logger.debug('Finished starting scheduled jobs', this.schedulesRepository.getLogData()); } async stop(): Promise { if (this.handle) { - this.logger.debug('stop SchedulePing', { scheduleId: this.schedulesRepository.getScheduleId() }); + this.logger.debug('stop SchedulePing', this.schedulesRepository.getLogData()); clearInterval(this.handle); } diff --git a/test/repository/SchedulesRepository.integration.spec.ts b/test/repository/SchedulesRepository.integration.spec.ts index d339beb2..89509f3a 100644 --- a/test/repository/SchedulesRepository.integration.spec.ts +++ b/test/repository/SchedulesRepository.integration.spec.ts @@ -2,8 +2,7 @@ import { MongoMemoryServer } from 'mongodb-memory-server'; import { DateTime } from 'luxon'; import { Connection } from '../../src/Connection'; -import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; -import { sleep } from '../utils/sleep'; +import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; describe('SchedulesRepository', () => { const scheduleName = 'schedule'; @@ -39,8 +38,21 @@ describe('SchedulesRepository', () => { expect(schedule).not.toBeNull(); }); + it('updates timestamp', async () => { + const now = DateTime.now().toMillis(); + await schedulesRepository.setActiveSchedule(now - pingInterval); + const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + + await schedulesRepository.setActiveSchedule(now); + + const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); + }); + it('refuses to set another active schedule', async () => { - await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + const now = DateTime.now().toMillis(); + const lastAlive = now - 10; + await schedulesRepository.setActiveSchedule(lastAlive); const anotherScheduleId = 'not active'; const anotherInstance = await Connection.create( @@ -50,18 +62,19 @@ describe('SchedulesRepository', () => { scheduleName, ); const anotherSchedulesRepository = anotherInstance.getSchedulesRepository(); - const error = jest.fn(); - anotherSchedulesRepository.setLogger({ debug: jest.fn(), error }); + const debug = jest.fn(); + anotherSchedulesRepository.setLogger({ debug, error: jest.fn() }); - const active = await anotherSchedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + const active = await anotherSchedulesRepository.setActiveSchedule(now); await anotherInstance.disconnect(); - expect(error).toHaveBeenCalled(); + expect(debug).toHaveBeenCalled(); expect(active).toEqual(false); const schedules = await schedulesRepository.find({ name: scheduleName }); expect(schedules).toHaveLength(1); expect(schedules[0]?.scheduleId).toEqual(scheduleId); + expect(schedules[0]?.lastAlive).toEqual(lastAlive); }); it('only one schedule of many concurrent ones is active', async () => { @@ -85,14 +98,14 @@ describe('SchedulesRepository', () => { }); it('should replace dead schedules', async () => { - await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - const otherScheduleId = 'not active'; + const now = DateTime.now().toMillis(); + await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10); + const otherScheduleId = 'other schedule ID'; secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, scheduleName); const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - await sleep(2 * pingInterval); - const active = await secondSchedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + const active = await secondSchedulesRepository.setActiveSchedule(now); expect(active).toEqual(true); }); @@ -116,18 +129,18 @@ describe('SchedulesRepository', () => { it('detects active schedule', async () => { await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - const active = await schedulesRepository.getScheduleState(DateTime.now().toMillis()); + const active = await schedulesRepository.isActiveSchedule(DateTime.now().toMillis()); - expect(active).toBe(ScheduleState.thisInstanceActive); + expect(active).toBe(true); }); it('detects active schedule after ping interval elapsed', async () => { - await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - await sleep(2 * pingInterval); + const now = DateTime.now().toMillis(); + await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10); - const active = await schedulesRepository.getScheduleState(DateTime.now().toMillis()); + const active = await schedulesRepository.isActiveSchedule(now); - expect(active).toBe(ScheduleState.thisInstanceActive); + expect(active).toBe(true); }); it('detects other active schedule with identical name', async () => { @@ -137,21 +150,21 @@ describe('SchedulesRepository', () => { const now = DateTime.now().toMillis(); await schedulesRepository.setActiveSchedule(now); - const active = await secondSchedulesRepository.getScheduleState(now); + const active = await secondSchedulesRepository.isActiveSchedule(now); - expect(active).toBe(ScheduleState.differentInstanceActive); + expect(active).toBe(false); }); it('does not consider dead schedule with identical name', async () => { secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, 'other schedule', scheduleName); const secondSchedulesRepository = secondConnection.getSchedulesRepository(); + const now = DateTime.now().toMillis(); - await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - await sleep(2 * pingInterval); + await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10); - const active = await secondSchedulesRepository.getScheduleState(DateTime.now().toMillis()); + const active = await secondSchedulesRepository.isActiveSchedule(now); - expect(active).toBe(ScheduleState.inactive); + expect(active).toBe(true); }); it('does not consider schedule with different name', async () => { @@ -166,9 +179,9 @@ describe('SchedulesRepository', () => { const now = DateTime.now().toMillis(); await schedulesRepository.setActiveSchedule(now); - const active = await secondSchedulesRepository.getScheduleState(now); + const active = await secondSchedulesRepository.isActiveSchedule(now); - expect(active).toBe(ScheduleState.inactive); + expect(active).toBe(true); }); }); @@ -255,16 +268,5 @@ describe('SchedulesRepository', () => { expect(running).toBe(2); }); }); - - describe('ping', () => { - it('updates timestamp', async () => { - const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - - await schedulesRepository.ping(); - - const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); - }); - }); }); }); diff --git a/test/schedule/MongoSchedule.spec.ts b/test/schedule/MongoSchedule.spec.ts index d1128d63..85f2b8b1 100644 --- a/test/schedule/MongoSchedule.spec.ts +++ b/test/schedule/MongoSchedule.spec.ts @@ -1,6 +1,6 @@ import { anyNumber, instance, mock, verify, when } from 'ts-mockito'; -import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { JobRepository } from '../../src/repository/JobRepository'; import { MomoOptions, MongoSchedule } from '../../src'; @@ -27,7 +27,7 @@ describe('MongoSchedule', () => { }); it('connects and starts the ping and disconnects and stops the ping', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.inactive); + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); const mongoSchedule = await MongoSchedule.connect({ scheduleName: 'schedule', url: 'mongodb://does.not/matter' }); const secondSchedule = await MongoSchedule.connect({ diff --git a/test/schedule/Schedule.spec.ts b/test/schedule/Schedule.spec.ts index e12a178f..4039634e 100644 --- a/test/schedule/Schedule.spec.ts +++ b/test/schedule/Schedule.spec.ts @@ -2,7 +2,7 @@ import { anyNumber, deepEqual, instance, mock, when } from 'ts-mockito'; import { ObjectId } from 'mongodb'; import { ExecutionStatus, MomoEvent, MomoJob, MomoOptions, MongoSchedule } from '../../src'; -import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { JobRepository } from '../../src/repository/JobRepository'; import { initLoggingForTests } from '../utils/logging'; import { toJobDefinition, tryToIntervalJob } from '../../src/job/Job'; @@ -39,7 +39,8 @@ describe('Schedule', () => { jest.clearAllMocks(); when(jobRepository.find(deepEqual({ name: momoJob.name }))).thenResolve([]); - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); mongoSchedule = await MongoSchedule.connect({ scheduleName, url: 'mongodb://does.not/matter' }); initLoggingForTests(mongoSchedule); diff --git a/test/schedule/SchedulePing.spec.ts b/test/schedule/SchedulePing.spec.ts index 9a1cfbf4..88e829d7 100644 --- a/test/schedule/SchedulePing.spec.ts +++ b/test/schedule/SchedulePing.spec.ts @@ -1,11 +1,12 @@ import { anyNumber, instance, mock, verify, when } from 'ts-mockito'; -import { ScheduleState, SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { SchedulePing } from '../../src/schedule/SchedulePing'; import { sleep } from '../utils/sleep'; describe('SchedulePing', () => { const interval = 1000; + const logData = { name: 'name', scheduleId: 'scheduleId' }; let error: jest.Mock; let schedulesRepository: SchedulesRepository; @@ -22,64 +23,96 @@ describe('SchedulePing', () => { afterEach(async () => schedulePing.stop()); it('starts, pings, cleans and stops', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); await schedulePing.start(); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.ping()).once(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); await sleep(1.1 * interval); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.ping()).twice(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); await schedulePing.stop(); await sleep(interval); - verify(schedulesRepository.ping()).twice(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); verify(schedulesRepository.deleteOne()).once(); }); it('handles mongo errors', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); + when(schedulesRepository.getLogData()).thenReturn(logData); + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); const message = 'I am an error that should be caught'; - when(schedulesRepository.ping()).thenReject({ + when(schedulesRepository.setActiveSchedule(anyNumber())).thenReject({ message, } as Error); await schedulePing.start(); - verify(schedulesRepository.ping()).once(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); expect(error).toHaveBeenCalledWith( 'Pinging or cleaning the Schedules repository failed', 'an internal error occurred', - {}, + logData, { message }, ); }); - it('handles job start taking longer than interval', async () => { - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.differentInstanceActive); + it('does not start any jobs for inactive schedule', async () => { + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(false); - startAllJobs.mockImplementation(async () => sleep(2 * interval)); + await schedulePing.start(); + + verify(schedulesRepository.setActiveSchedule(anyNumber())).never(); + expect(startAllJobs).not.toHaveBeenCalled(); + }); + + it('does not start any jobs if setting active schedule fails', async () => { + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(false); await schedulePing.start(); - verify(schedulesRepository.ping()).never(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); + expect(startAllJobs).not.toHaveBeenCalled(); + }); + + it('becomes active when other schedule dies', async () => { + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(false); + + await schedulePing.start(); + + verify(schedulesRepository.setActiveSchedule(anyNumber())).never(); expect(startAllJobs).toHaveBeenCalledTimes(0); - when(schedulesRepository.getScheduleState(anyNumber())).thenResolve(ScheduleState.thisInstanceActive); + // other schedule dies, this one becomes active + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); await sleep(1.1 * interval); - verify(schedulesRepository.ping()).once(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); + expect(startAllJobs).toHaveBeenCalledTimes(1); + }); + + it('handles job start taking longer than interval', async () => { + startAllJobs.mockImplementation(async () => sleep(2 * interval)); + when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); + + await schedulePing.start(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); expect(startAllJobs).toHaveBeenCalledTimes(1); await sleep(1.1 * interval); - verify(schedulesRepository.ping()).twice(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); await schedulePing.stop(); + await sleep(interval); - verify(schedulesRepository.ping()).twice(); + verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); verify(schedulesRepository.deleteOne()).once(); }); From eeaadd7fba755de961de481fade3ebf0934e5ee8 Mon Sep 17 00:00:00 2001 From: Ute Weiss Date: Fri, 6 Oct 2023 13:40:48 +0200 Subject: [PATCH 6/8] fix: correct type for mongo 4 Signed-off-by: Ute Weiss --- src/repository/SchedulesRepository.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index f5085bfd..eaf6d7f5 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -73,7 +73,7 @@ export class SchedulesRepository extends Repository { const deadSchedule: Filter = { name: this.name, lastAlive: { $lt: threshold } }; const thisSchedule: Filter = { scheduleId: this.scheduleId }; - const updatedSchedule: ScheduleEntity = { + const updatedSchedule: Partial = { name: this.name, scheduleId: this.scheduleId, lastAlive: now, From 327f351b3c5f1835c70fe684a7c78d3c7e0b889b Mon Sep 17 00:00:00 2001 From: Ute Weiss Date: Fri, 6 Oct 2023 15:05:39 +0200 Subject: [PATCH 7/8] refactor: change setActiveSchedule to call isActiveSchedule internally Signed-off-by: Ute Weiss --- src/repository/SchedulesRepository.ts | 36 ++++--- src/schedule/SchedulePing.ts | 10 +- .../SchedulesRepository.integration.spec.ts | 98 ++++--------------- test/schedule/MongoSchedule.spec.ts | 8 +- test/schedule/Schedule.spec.ts | 5 +- test/schedule/SchedulePing.spec.ts | 36 ++----- 6 files changed, 60 insertions(+), 133 deletions(-) diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index eaf6d7f5..2fe893cf 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -1,4 +1,5 @@ import { Filter, FindOneAndUpdateOptions, MongoClient, MongoServerError } from 'mongodb'; +import { DateTime } from 'luxon'; import { ScheduleEntity } from './ScheduleEntity'; import { Repository } from './Repository'; @@ -43,15 +44,10 @@ export class SchedulesRepository extends Repository { /** * Checks the state of the schedule represented by this repository. * - * INACTIVE: There is currently no active instance for a schedule with this name. - * DIFFERENT_INSTANCE_ACTIVE: Another instance (but not this one) is active for the schedule with this name. - * THIS_INSTANCE_ACTIVE: This instance is active for the schedule with this name. - * - * @param now timestamp in milliseconds + * @param threshold a schedule older than (i.e. timestamp below) the threshold is considered dead and will be replaced * @returns the schedule's state */ - async isActiveSchedule(now: number): Promise { - const threshold = now - this.deadScheduleThreshold; + private async isActiveSchedule(threshold: number): Promise { const activeSchedule = await this.collection.findOne({ name: this.name }); if (activeSchedule === null || activeSchedule.scheduleId === this.scheduleId) { @@ -62,14 +58,30 @@ export class SchedulesRepository extends Repository { } /** - * Tries to set this instance as active + * Tries to set this instance as active schedule of this name. + * + * There are 4 possible cases: + * + * 1) Another instance already is active for this name. This instance does not need to become active. Nothing is done. + * + * 2) Another instance was active, but it's last ping was before the threshold. Hence, it is considered dead and this instance will take over and become active. The DB is updated accordingly. * - * @param now timestamp in milliseconds - * @returns true if this instance is now active for the schedule with this name, false otherwise + * 3) There is currently no active schedule with this name. In this case, this instance will become the active schedule. The DB is updated accordingly. + * + * 4) This instance already is active. It will stay active and send a ping to the DB to indicate that it is still alive. + * + * @returns true if this instance is now active for the schedule with this name (cases 2-4), false otherwise (case 1) */ - async setActiveSchedule(now: number): Promise { + async setActiveSchedule(): Promise { + const now = DateTime.now().toMillis(); const threshold = now - this.deadScheduleThreshold; + const active = await this.isActiveSchedule(now); + if (!active) { + this.logger?.debug('This schedule is not active', this.getLogData()); + return false; + } + const deadSchedule: Filter = { name: this.name, lastAlive: { $lt: threshold } }; const thisSchedule: Filter = { scheduleId: this.scheduleId }; @@ -82,6 +94,8 @@ export class SchedulesRepository extends Repository { try { await this.collection.updateOne( + // we already checked with isActiveSchedule that this instance should be the active one, but to prevent + // concurrent modification, we use a filter to ensure that we only overwrite a dead schedule or ping this schedule { $or: [deadSchedule, thisSchedule] }, { $set: updatedSchedule }, { ...mongoOptions, upsert: true }, diff --git a/src/schedule/SchedulePing.ts b/src/schedule/SchedulePing.ts index ff14e21f..dccef135 100644 --- a/src/schedule/SchedulePing.ts +++ b/src/schedule/SchedulePing.ts @@ -1,5 +1,3 @@ -import { DateTime } from 'luxon'; - import { SchedulesRepository } from '../repository/SchedulesRepository'; import { Logger } from '../logging/Logger'; import { setSafeInterval } from '../timeout/safeTimeouts'; @@ -36,14 +34,8 @@ export class SchedulePing { } private async checkActiveSchedule(): Promise { - const now = DateTime.now().toMillis(); - const active = await this.schedulesRepository.isActiveSchedule(now); + const active = await this.schedulesRepository.setActiveSchedule(); if (!active) { - this.logger.debug('This schedule is not active', this.schedulesRepository.getLogData()); - return; - } - - if (!(await this.schedulesRepository.setActiveSchedule(now))) { return; } diff --git a/test/repository/SchedulesRepository.integration.spec.ts b/test/repository/SchedulesRepository.integration.spec.ts index 89509f3a..65a42df6 100644 --- a/test/repository/SchedulesRepository.integration.spec.ts +++ b/test/repository/SchedulesRepository.integration.spec.ts @@ -3,6 +3,7 @@ import { DateTime } from 'luxon'; import { Connection } from '../../src/Connection'; import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; +import { sleep } from '../utils/sleep'; describe('SchedulesRepository', () => { const scheduleName = 'schedule'; @@ -32,27 +33,26 @@ describe('SchedulesRepository', () => { describe('setActiveSchedule', () => { it('sets single schedule as active', async () => { - await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + await schedulesRepository.setActiveSchedule(); const schedule = await schedulesRepository.findOne({ name: scheduleName, scheduleId }); expect(schedule).not.toBeNull(); }); it('updates timestamp', async () => { - const now = DateTime.now().toMillis(); - await schedulesRepository.setActiveSchedule(now - pingInterval); + await schedulesRepository.setActiveSchedule(); const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - await schedulesRepository.setActiveSchedule(now); + await sleep(pingInterval); + await schedulesRepository.setActiveSchedule(); const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); }); it('refuses to set another active schedule', async () => { - const now = DateTime.now().toMillis(); - const lastAlive = now - 10; - await schedulesRepository.setActiveSchedule(lastAlive); + await schedulesRepository.setActiveSchedule(); + const lastAlive = DateTime.now().toMillis(); const anotherScheduleId = 'not active'; const anotherInstance = await Connection.create( @@ -65,7 +65,7 @@ describe('SchedulesRepository', () => { const debug = jest.fn(); anotherSchedulesRepository.setLogger({ debug, error: jest.fn() }); - const active = await anotherSchedulesRepository.setActiveSchedule(now); + const active = await anotherSchedulesRepository.setActiveSchedule(); await anotherInstance.disconnect(); expect(debug).toHaveBeenCalled(); @@ -74,10 +74,10 @@ describe('SchedulesRepository', () => { const schedules = await schedulesRepository.find({ name: scheduleName }); expect(schedules).toHaveLength(1); expect(schedules[0]?.scheduleId).toEqual(scheduleId); - expect(schedules[0]?.lastAlive).toEqual(lastAlive); + expect(schedules[0]?.lastAlive).toBeLessThanOrEqual(lastAlive); }); - it('only one schedule of many concurrent ones is active', async () => { + it('sets only one schedule of many concurrent ones as active', async () => { const connections = await Promise.all( ['a', 'b', 'c', 'd', 'e'].map(async (id) => Connection.create({ url: mongo.getUri() }, pingInterval, id, scheduleName), @@ -85,9 +85,7 @@ describe('SchedulesRepository', () => { ); const active = await Promise.all( - connections.map(async (connection) => - connection.getSchedulesRepository().setActiveSchedule(DateTime.now().toMillis()), - ), + connections.map(async (connection) => connection.getSchedulesRepository().setActiveSchedule()), ); await Promise.all(connections.map(async (connection) => connection.disconnect())); @@ -97,15 +95,15 @@ describe('SchedulesRepository', () => { expect(schedules).toHaveLength(1); }); - it('should replace dead schedules', async () => { - const now = DateTime.now().toMillis(); - await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10); + it('replaces a dead schedule', async () => { + await schedulesRepository.setActiveSchedule(); const otherScheduleId = 'other schedule ID'; secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, scheduleName); const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - const active = await secondSchedulesRepository.setActiveSchedule(now); + await sleep(2 * pingInterval + 10); + const active = await secondSchedulesRepository.setActiveSchedule(); expect(active).toEqual(true); }); @@ -117,76 +115,16 @@ describe('SchedulesRepository', () => { secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, otherName); const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - const active = await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - const secondActive = await secondSchedulesRepository.setActiveSchedule(DateTime.now().toMillis()); + const active = await schedulesRepository.setActiveSchedule(); + const secondActive = await secondSchedulesRepository.setActiveSchedule(); expect(active).toEqual(true); expect(secondActive).toEqual(true); }); }); - describe('getScheduleState', () => { - it('detects active schedule', async () => { - await schedulesRepository.setActiveSchedule(DateTime.now().toMillis()); - - const active = await schedulesRepository.isActiveSchedule(DateTime.now().toMillis()); - - expect(active).toBe(true); - }); - - it('detects active schedule after ping interval elapsed', async () => { - const now = DateTime.now().toMillis(); - await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10); - - const active = await schedulesRepository.isActiveSchedule(now); - - expect(active).toBe(true); - }); - - it('detects other active schedule with identical name', async () => { - secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, 'other schedule', scheduleName); - const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - - const now = DateTime.now().toMillis(); - await schedulesRepository.setActiveSchedule(now); - - const active = await secondSchedulesRepository.isActiveSchedule(now); - - expect(active).toBe(false); - }); - - it('does not consider dead schedule with identical name', async () => { - secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, 'other schedule', scheduleName); - const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - const now = DateTime.now().toMillis(); - - await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10); - - const active = await secondSchedulesRepository.isActiveSchedule(now); - - expect(active).toBe(true); - }); - - it('does not consider schedule with different name', async () => { - secondConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - 'other schedule', - 'other schedule', - ); - const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - - const now = DateTime.now().toMillis(); - await schedulesRepository.setActiveSchedule(now); - - const active = await secondSchedulesRepository.isActiveSchedule(now); - - expect(active).toBe(true); - }); - }); - describe('with active schedule', () => { - beforeEach(async () => schedulesRepository.setActiveSchedule(DateTime.now().toMillis())); + beforeEach(async () => schedulesRepository.setActiveSchedule()); describe('removeJob', () => { it('can remove a job', async () => { diff --git a/test/schedule/MongoSchedule.spec.ts b/test/schedule/MongoSchedule.spec.ts index 85f2b8b1..b0a5756b 100644 --- a/test/schedule/MongoSchedule.spec.ts +++ b/test/schedule/MongoSchedule.spec.ts @@ -1,4 +1,4 @@ -import { anyNumber, instance, mock, verify, when } from 'ts-mockito'; +import { instance, mock, verify, when } from 'ts-mockito'; import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { JobRepository } from '../../src/repository/JobRepository'; @@ -27,7 +27,7 @@ describe('MongoSchedule', () => { }); it('connects and starts the ping and disconnects and stops the ping', async () => { - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); const mongoSchedule = await MongoSchedule.connect({ scheduleName: 'schedule', url: 'mongodb://does.not/matter' }); const secondSchedule = await MongoSchedule.connect({ @@ -36,9 +36,9 @@ describe('MongoSchedule', () => { }); await mongoSchedule.start(); - verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); + verify(schedulesRepository.setActiveSchedule()).once(); await secondSchedule.start(); - verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); + verify(schedulesRepository.setActiveSchedule()).twice(); await mongoSchedule.disconnect(); await secondSchedule.disconnect(); diff --git a/test/schedule/Schedule.spec.ts b/test/schedule/Schedule.spec.ts index 4039634e..fc4e77b7 100644 --- a/test/schedule/Schedule.spec.ts +++ b/test/schedule/Schedule.spec.ts @@ -1,4 +1,4 @@ -import { anyNumber, deepEqual, instance, mock, when } from 'ts-mockito'; +import { deepEqual, instance, mock, when } from 'ts-mockito'; import { ObjectId } from 'mongodb'; import { ExecutionStatus, MomoEvent, MomoJob, MomoOptions, MongoSchedule } from '../../src'; @@ -39,8 +39,7 @@ describe('Schedule', () => { jest.clearAllMocks(); when(jobRepository.find(deepEqual({ name: momoJob.name }))).thenResolve([]); - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); - when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); mongoSchedule = await MongoSchedule.connect({ scheduleName, url: 'mongodb://does.not/matter' }); initLoggingForTests(mongoSchedule); diff --git a/test/schedule/SchedulePing.spec.ts b/test/schedule/SchedulePing.spec.ts index 88e829d7..1c5820a8 100644 --- a/test/schedule/SchedulePing.spec.ts +++ b/test/schedule/SchedulePing.spec.ts @@ -1,4 +1,4 @@ -import { anyNumber, instance, mock, verify, when } from 'ts-mockito'; +import { instance, mock, verify, when } from 'ts-mockito'; import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { SchedulePing } from '../../src/schedule/SchedulePing'; @@ -23,35 +23,33 @@ describe('SchedulePing', () => { afterEach(async () => schedulePing.stop()); it('starts, pings, cleans and stops', async () => { - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); - when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); await schedulePing.start(); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); + verify(schedulesRepository.setActiveSchedule()).once(); await sleep(1.1 * interval); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); + verify(schedulesRepository.setActiveSchedule()).twice(); await schedulePing.stop(); await sleep(interval); - verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); + verify(schedulesRepository.setActiveSchedule()).twice(); verify(schedulesRepository.deleteOne()).once(); }); it('handles mongo errors', async () => { when(schedulesRepository.getLogData()).thenReturn(logData); - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); const message = 'I am an error that should be caught'; - when(schedulesRepository.setActiveSchedule(anyNumber())).thenReject({ + when(schedulesRepository.setActiveSchedule()).thenReject({ message, } as Error); await schedulePing.start(); - verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); + verify(schedulesRepository.setActiveSchedule()).once(); expect(error).toHaveBeenCalledWith( 'Pinging or cleaning the Schedules repository failed', 'an internal error occurred', @@ -61,58 +59,44 @@ describe('SchedulePing', () => { }); it('does not start any jobs for inactive schedule', async () => { - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(false); - await schedulePing.start(); - verify(schedulesRepository.setActiveSchedule(anyNumber())).never(); expect(startAllJobs).not.toHaveBeenCalled(); }); it('does not start any jobs if setting active schedule fails', async () => { - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); - when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(false); + when(schedulesRepository.setActiveSchedule()).thenResolve(false); await schedulePing.start(); - verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); expect(startAllJobs).not.toHaveBeenCalled(); }); it('becomes active when other schedule dies', async () => { - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(false); - await schedulePing.start(); - verify(schedulesRepository.setActiveSchedule(anyNumber())).never(); expect(startAllJobs).toHaveBeenCalledTimes(0); // other schedule dies, this one becomes active - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); - when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); await sleep(1.1 * interval); - verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); expect(startAllJobs).toHaveBeenCalledTimes(1); }); it('handles job start taking longer than interval', async () => { startAllJobs.mockImplementation(async () => sleep(2 * interval)); - when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true); - when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); await schedulePing.start(); - verify(schedulesRepository.setActiveSchedule(anyNumber())).once(); expect(startAllJobs).toHaveBeenCalledTimes(1); await sleep(1.1 * interval); - verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); await schedulePing.stop(); await sleep(interval); - verify(schedulesRepository.setActiveSchedule(anyNumber())).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); verify(schedulesRepository.deleteOne()).once(); }); From 6702f29eec3eb2eb40dba340640f1ee3dae38550 Mon Sep 17 00:00:00 2001 From: Ute Weiss Date: Fri, 6 Oct 2023 15:28:58 +0200 Subject: [PATCH 8/8] fix: remove unnecessary mongo option Signed-off-by: Ute Weiss --- src/repository/SchedulesRepository.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index 2fe893cf..0cb0940c 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -98,7 +98,7 @@ export class SchedulesRepository extends Repository { // concurrent modification, we use a filter to ensure that we only overwrite a dead schedule or ping this schedule { $or: [deadSchedule, thisSchedule] }, { $set: updatedSchedule }, - { ...mongoOptions, upsert: true }, + { upsert: true }, ); return true;