diff --git a/src/repository/JobRepository.ts b/src/repository/JobRepository.ts index 6d2a7633..25ef26cd 100644 --- a/src/repository/JobRepository.ts +++ b/src/repository/JobRepository.ts @@ -26,10 +26,6 @@ export class JobRepository extends Repository { return job?.executionInfo; } - async clear(): Promise { - await this.delete(); - } - async define(job: JobDefinition): Promise { const { name, schedule, concurrency, maxRunning } = job; diff --git a/src/repository/Repository.ts b/src/repository/Repository.ts index cfb5e1b0..5e305671 100644 --- a/src/repository/Repository.ts +++ b/src/repository/Repository.ts @@ -34,10 +34,6 @@ export class Repository { return result.deletedCount; } - async deleteOne(filter: Filter): Promise { - await this.collection.deleteOne(filter); - } - // mongodb returns null instead of undefined for optional fields private mapNullToUndefined(entity: WithId): WithId { const keys = Object.keys(entity); diff --git a/src/repository/SchedulesRepository.ts b/src/repository/SchedulesRepository.ts index 098d3083..0cb0940c 100644 --- a/src/repository/SchedulesRepository.ts +++ b/src/repository/SchedulesRepository.ts @@ -1,10 +1,11 @@ +import { Filter, FindOneAndUpdateOptions, MongoClient, MongoServerError } from 'mongodb'; import { DateTime } from 'luxon'; -import { FindOneAndUpdateOptions, MongoClient } from 'mongodb'; import { ScheduleEntity } from './ScheduleEntity'; import { Repository } from './Repository'; import { Logger } from '../logging/Logger'; import { MomoErrorType } from '../logging/error/MomoErrorType'; +import { MomoEventData } from '../logging/MomoEvents'; export const SCHEDULES_COLLECTION_NAME = 'schedules'; @@ -13,6 +14,8 @@ const mongoOptions: FindOneAndUpdateOptions & { includeResultMetadata: true } = includeResultMetadata: true, // ensures backwards compatibility with mongodb <6 }; +const duplicateKeyErrorCode = 11000; + export class SchedulesRepository extends Repository { private logger: Logger | undefined; @@ -30,97 +33,140 @@ export class SchedulesRepository extends Repository { this.logger = logger; } + getLogData(): MomoEventData { + return { name: this.name, scheduleId: this.scheduleId }; + } + + async deleteOne(): Promise { + await this.collection.deleteOne({ scheduleId: this.scheduleId }); + } + + /** + * Checks the state of the schedule represented by this repository. + * + * @param threshold a schedule older than (i.e. timestamp below) the threshold is considered dead and will be replaced + * @returns the schedule's state + */ + private async isActiveSchedule(threshold: number): Promise { + const activeSchedule = await this.collection.findOne({ name: this.name }); + + if (activeSchedule === null || activeSchedule.scheduleId === this.scheduleId) { + return true; + } + + return activeSchedule.lastAlive < threshold; // if activeSchedule is too old, take over and make this one active + } + /** - * Checks if there is an alive active schedule in the database for the given name. + * Tries to set this instance as active schedule of this name. + * + * There are 4 possible cases: * - * If there is one -> return true if it is this schedule, otherwise false. - * If there is no such schedule -> we try inserting this schedule as the active one. - * ↳ If it worked return true, otherwise false. + * 1) Another instance already is active for this name. This instance does not need to become active. Nothing is done. + * + * 2) Another instance was active, but it's last ping was before the threshold. Hence, it is considered dead and this instance will take over and become active. The DB is updated accordingly. + * + * 3) There is currently no active schedule with this name. In this case, this instance will become the active schedule. The DB is updated accordingly. + * + * 4) This instance already is active. It will stay active and send a ping to the DB to indicate that it is still alive. + * + * @returns true if this instance is now active for the schedule with this name (cases 2-4), false otherwise (case 1) */ - async isActiveSchedule(name: string): Promise { - const lastAlive = DateTime.now().toMillis(); - const threshold = lastAlive - this.deadScheduleThreshold; + async setActiveSchedule(): Promise { + const now = DateTime.now().toMillis(); + const threshold = now - this.deadScheduleThreshold; + + const active = await this.isActiveSchedule(now); + if (!active) { + this.logger?.debug('This schedule is not active', this.getLogData()); + return false; + } + + const deadSchedule: Filter = { name: this.name, lastAlive: { $lt: threshold } }; + const thisSchedule: Filter = { scheduleId: this.scheduleId }; + + const updatedSchedule: Partial = { + name: this.name, + scheduleId: this.scheduleId, + lastAlive: now, + executions: {}, + }; + try { - const result = await this.collection.findOneAndUpdate( - { lastAlive: { $lt: threshold }, name }, - { - $set: { - name, - scheduleId: this.scheduleId, - lastAlive, - executions: {}, - }, - }, - { - ...mongoOptions, - upsert: true, - }, + await this.collection.updateOne( + // we already checked with isActiveSchedule that this instance should be the active one, but to prevent + // concurrent modification, we use a filter to ensure that we only overwrite a dead schedule or ping this schedule + { $or: [deadSchedule, thisSchedule] }, + { $set: updatedSchedule }, + { upsert: true }, ); - return result.value === null ? false : result.value.scheduleId === this.scheduleId; + return true; } catch (error) { - // We seem to have a schedule that's alive. The unique name index probably prevented the upsert. Is this one the active schedule? - const aliveSchedule = await this.collection.findOne({ name }); - if (aliveSchedule === null) { + if (error instanceof MongoServerError && error.code === duplicateKeyErrorCode) { + this.logger?.debug( + 'Cannot set active schedule - another schedule with this name is already active', + this.getLogData(), + ); + } else { this.logger?.error( 'The database reported an unexpected error', MomoErrorType.internal, - { scheduleId: this.scheduleId }, + this.getLogData(), error, ); } - return aliveSchedule?.scheduleId === this.scheduleId; - } - } - async ping(scheduleId = this.scheduleId): Promise { - await this.updateOne({ scheduleId }, { $set: { lastAlive: DateTime.now().toMillis() } }); + return false; + } } + /** + * This unique index ensures that we do not insert more than one active schedule per schedule name + * into the repository. + */ async createIndex(): Promise { - // this unique index ensures that we do not insert more than one active schedule - // in the repository per schedule name await this.collection.createIndex({ name: 1 }, { unique: true }); } - async removeJob(scheduleId: string, name: string): Promise { - const schedulesEntity = await this.findOne({ scheduleId }); + async removeJob(jobName: string): Promise { + const schedulesEntity = await this.findOne({ scheduleId: this.scheduleId }); if (!schedulesEntity) { - throw new Error(`schedulesEntity not found for scheduleId=${scheduleId}`); + throw new Error(`schedulesEntity not found for scheduleId=${this.scheduleId}`); } const executions = schedulesEntity.executions; - delete executions[name]; - await this.updateOne({ scheduleId }, { $set: { executions } }); + delete executions[jobName]; + await this.updateOne({ scheduleId: this.scheduleId }, { $set: { executions } }); } - async addExecution(name: string, maxRunning: number): Promise<{ added: boolean; running: number }> { + async addExecution(jobName: string, maxRunning: number): Promise<{ added: boolean; running: number }> { if (maxRunning < 1) { const schedule = await this.collection.findOneAndUpdate( { name: this.name }, - { $inc: { [`executions.${name}`]: 1 } }, + { $inc: { [`executions.${jobName}`]: 1 } }, mongoOptions, ); - return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? 0 }; + return { added: schedule.value !== null, running: schedule.value?.executions[jobName] ?? 0 }; } const schedule = await this.collection.findOneAndUpdate( { name: this.name, - $or: [{ [`executions.${name}`]: { $lt: maxRunning } }, { [`executions.${name}`]: { $exists: false } }], + $or: [{ [`executions.${jobName}`]: { $lt: maxRunning } }, { [`executions.${jobName}`]: { $exists: false } }], }, - { $inc: { [`executions.${name}`]: 1 } }, + { $inc: { [`executions.${jobName}`]: 1 } }, mongoOptions, ); - return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? maxRunning }; + return { added: schedule.value !== null, running: schedule.value?.executions[jobName] ?? maxRunning }; } - async removeExecution(name: string): Promise { - await this.updateOne({ name: this.name }, { $inc: { [`executions.${name}`]: -1 } }); + async removeExecution(jobName: string): Promise { + await this.updateOne({ name: this.name }, { $inc: { [`executions.${jobName}`]: -1 } }); } - async countRunningExecutions(name: string): Promise { - return (await this.findOne({ scheduleId: this.scheduleId }))?.executions[name] ?? 0; + async countRunningExecutions(jobName: string): Promise { + return (await this.findOne({ scheduleId: this.scheduleId }))?.executions[jobName] ?? 0; } } diff --git a/src/schedule/MongoSchedule.ts b/src/schedule/MongoSchedule.ts index d4b6f42c..efdf9138 100644 --- a/src/schedule/MongoSchedule.ts +++ b/src/schedule/MongoSchedule.ts @@ -27,7 +27,6 @@ export class MongoSchedule extends Schedule { protected readonly scheduleId: string, protected readonly connection: Connection, pingIntervalMs: number, - private readonly scheduleName: string, ) { const schedulesRepository = connection.getSchedulesRepository(); const jobRepository = connection.getJobRepository(); @@ -39,8 +38,6 @@ export class MongoSchedule extends Schedule { this.disconnectFct = connection.disconnect.bind(connection); this.schedulePing = new SchedulePing( - scheduleId, - scheduleName, schedulesRepository, this.logger, pingIntervalMs, @@ -61,7 +58,7 @@ export class MongoSchedule extends Schedule { const scheduleId = uuid(); const connection = await Connection.create(connectionOptions, pingIntervalMs, scheduleId, scheduleName); - return new MongoSchedule(scheduleId, connection, pingIntervalMs, scheduleName); + return new MongoSchedule(scheduleId, connection, pingIntervalMs); } /** @@ -87,19 +84,6 @@ export class MongoSchedule extends Schedule { return this.schedulePing.start(); } - /** - * Returns whether this schedule is currently active. - * - * Only the active schedule will schedule jobs and try to execute them. - * There is always only one active schedule per mongo database. - * - * @throws if the database throws - */ - public async isActiveSchedule(): Promise { - const schedulesRepository = this.connection.getSchedulesRepository(); - return schedulesRepository.isActiveSchedule(this.scheduleName); - } - private async startAllJobs(): Promise { await Promise.all(Object.values(this.getJobSchedulers()).map(async (jobScheduler) => jobScheduler.start())); } diff --git a/src/schedule/Schedule.ts b/src/schedule/Schedule.ts index ea998495..eb514daf 100644 --- a/src/schedule/Schedule.ts +++ b/src/schedule/Schedule.ts @@ -77,13 +77,7 @@ export class Schedule extends LogEmitter { await this.jobRepository.define(job); - this.jobSchedulers[job.name] = JobScheduler.forJob( - this.scheduleId, - job, - this.logger, - this.schedulesRepository, - this.jobRepository, - ); + this.jobSchedulers[job.name] = JobScheduler.forJob(job, this.logger, this.schedulesRepository, this.jobRepository); return true; } diff --git a/src/schedule/SchedulePing.ts b/src/schedule/SchedulePing.ts index 1a0ac793..dccef135 100644 --- a/src/schedule/SchedulePing.ts +++ b/src/schedule/SchedulePing.ts @@ -14,8 +14,6 @@ export class SchedulePing { private startJobsStatus: StartJobsStatus = StartJobsStatus.notStarted; constructor( - private readonly scheduleId: string, - private readonly scheduleName: string, private readonly schedulesRepository: SchedulesRepository, private readonly logger: Logger, private readonly interval: number, @@ -30,33 +28,36 @@ export class SchedulePing { try { await this.checkActiveSchedule(); } catch (e) { - this.logger.error(errorMessage, MomoErrorType.internal, {}, e); + this.logger.error(errorMessage, MomoErrorType.internal, this.schedulesRepository.getLogData(), e); } this.handle = setSafeInterval(this.checkActiveSchedule.bind(this), this.interval, this.logger, errorMessage); } private async checkActiveSchedule(): Promise { - const active = await this.schedulesRepository.isActiveSchedule(this.scheduleName); - this.logger.debug(`This schedule is ${active ? '' : 'not '}active`); - if (active) { - await this.schedulesRepository.ping(this.scheduleId); - if (this.startJobsStatus === StartJobsStatus.notStarted) { - this.startJobsStatus = StartJobsStatus.inProgress; - this.logger.debug('This schedule just turned active'); - - await this.startAllJobs(); - - this.startJobsStatus = StartJobsStatus.finished; - this.logger.debug('Finished starting scheduled jobs'); - } + const active = await this.schedulesRepository.setActiveSchedule(); + if (!active) { + return; } + + this.logger.debug('This schedule is active', this.schedulesRepository.getLogData()); + + if (this.startJobsStatus !== StartJobsStatus.notStarted) { + return; + } + + this.startJobsStatus = StartJobsStatus.inProgress; + this.logger.debug('This schedule just turned active', this.schedulesRepository.getLogData()); + await this.startAllJobs(); + this.startJobsStatus = StartJobsStatus.finished; + this.logger.debug('Finished starting scheduled jobs', this.schedulesRepository.getLogData()); } async stop(): Promise { if (this.handle) { - this.logger.debug('stop SchedulePing', { scheduleId: this.scheduleId }); + this.logger.debug('stop SchedulePing', this.schedulesRepository.getLogData()); clearInterval(this.handle); } - await this.schedulesRepository.deleteOne({ scheduleId: this.scheduleId }); + + await this.schedulesRepository.deleteOne(); } } diff --git a/src/scheduler/JobScheduler.ts b/src/scheduler/JobScheduler.ts index 90a74c3c..e6f3824c 100644 --- a/src/scheduler/JobScheduler.ts +++ b/src/scheduler/JobScheduler.ts @@ -21,21 +21,19 @@ export class JobScheduler { constructor( private readonly jobName: string, private readonly jobExecutor: JobExecutor, - private readonly scheduleId: string, private readonly schedulesRepository: SchedulesRepository, private readonly jobRepository: JobRepository, private readonly logger: Logger, ) {} static forJob( - scheduleId: string, job: Job, logger: Logger, schedulesRepository: SchedulesRepository, jobRepository: JobRepository, ): JobScheduler { const executor = new JobExecutor(job.handler, schedulesRepository, jobRepository, logger); - return new JobScheduler(job.name, executor, scheduleId, schedulesRepository, jobRepository, logger); + return new JobScheduler(job.name, executor, schedulesRepository, jobRepository, logger); } getUnexpectedErrorCount(): number { @@ -102,7 +100,7 @@ export class JobScheduler { if (this.executableSchedule) { this.executableSchedule.stop(); this.jobExecutor.stop(); - await this.schedulesRepository.removeJob(this.scheduleId, this.jobName); + await this.schedulesRepository.removeJob(this.jobName); this.executableSchedule = undefined; } } diff --git a/test/repository/SchedulesRepository.integration.spec.ts b/test/repository/SchedulesRepository.integration.spec.ts index d653be4a..65a42df6 100644 --- a/test/repository/SchedulesRepository.integration.spec.ts +++ b/test/repository/SchedulesRepository.integration.spec.ts @@ -1,4 +1,5 @@ import { MongoMemoryServer } from 'mongodb-memory-server'; +import { DateTime } from 'luxon'; import { Connection } from '../../src/Connection'; import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; @@ -12,6 +13,7 @@ describe('SchedulesRepository', () => { let mongo: MongoMemoryServer; let connection: Connection; + let secondConnection: Connection | undefined; let schedulesRepository: SchedulesRepository; beforeAll(async () => { @@ -22,132 +24,107 @@ describe('SchedulesRepository', () => { }); beforeEach(async () => schedulesRepository.delete()); + afterEach(async () => secondConnection?.disconnect()); afterAll(async () => { await connection.disconnect(); await mongo.stop(); }); - describe('isActiveSchedule', () => { - it('single schedule is active', async () => { - const active = await schedulesRepository.isActiveSchedule(scheduleName); + describe('setActiveSchedule', () => { + it('sets single schedule as active', async () => { + await schedulesRepository.setActiveSchedule(); - const entities = await schedulesRepository.find({}); + const schedule = await schedulesRepository.findOne({ name: scheduleName, scheduleId }); + expect(schedule).not.toBeNull(); + }); - expect(active).toEqual(true); - expect(entities).toHaveLength(1); - expect(entities[0]?.scheduleId).toEqual(scheduleId); + it('updates timestamp', async () => { + await schedulesRepository.setActiveSchedule(); + const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); + + await sleep(pingInterval); + await schedulesRepository.setActiveSchedule(); + + const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); + expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); }); - it('only one schedule is active', async () => { - const firstIsActive = await schedulesRepository.isActiveSchedule(scheduleName); + it('refuses to set another active schedule', async () => { + await schedulesRepository.setActiveSchedule(); + const lastAlive = DateTime.now().toMillis(); - const inactiveScheduleId = 'not active'; - const inactiveConnection = await Connection.create( + const anotherScheduleId = 'not active'; + const anotherInstance = await Connection.create( { url: mongo.getUri() }, pingInterval, - inactiveScheduleId, + anotherScheduleId, scheduleName, ); - const inactiveSchedulesRepository = inactiveConnection.getSchedulesRepository(); - const secondIsActive = await inactiveSchedulesRepository.isActiveSchedule(scheduleName); - await inactiveConnection.disconnect(); + const anotherSchedulesRepository = anotherInstance.getSchedulesRepository(); + const debug = jest.fn(); + anotherSchedulesRepository.setLogger({ debug, error: jest.fn() }); - const entities = await schedulesRepository.find({}); + const active = await anotherSchedulesRepository.setActiveSchedule(); + await anotherInstance.disconnect(); - expect(firstIsActive).toEqual(true); - expect(secondIsActive).toEqual(false); - expect(entities).toHaveLength(1); - expect(entities[0]?.scheduleId).toEqual(scheduleId); + expect(debug).toHaveBeenCalled(); + expect(active).toEqual(false); + + const schedules = await schedulesRepository.find({ name: scheduleName }); + expect(schedules).toHaveLength(1); + expect(schedules[0]?.scheduleId).toEqual(scheduleId); + expect(schedules[0]?.lastAlive).toBeLessThanOrEqual(lastAlive); }); - it('only one schedule of many concurrent ones is active', async () => { + it('sets only one schedule of many concurrent ones as active', async () => { const connections = await Promise.all( ['a', 'b', 'c', 'd', 'e'].map(async (id) => Connection.create({ url: mongo.getUri() }, pingInterval, id, scheduleName), ), ); - const schedulesActiveStatus = await Promise.all( - connections.map(async (connection) => { - const newSchedulesRepository = connection.getSchedulesRepository(); - return newSchedulesRepository.isActiveSchedule(scheduleName); - }), + const active = await Promise.all( + connections.map(async (connection) => connection.getSchedulesRepository().setActiveSchedule()), ); - const entities = await schedulesRepository.find({}); await Promise.all(connections.map(async (connection) => connection.disconnect())); - expect(schedulesActiveStatus.filter((active) => active)).toHaveLength(1); - expect(entities).toHaveLength(1); + expect(active.filter((active) => active)).toHaveLength(1); + const schedules = await schedulesRepository.find({ name: scheduleName }); + expect(schedules).toHaveLength(1); }); - it('should replace dead schedules', async () => { - const active = await schedulesRepository.isActiveSchedule(scheduleName); - const secondScheduleId = 'not active'; - const secondConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - secondScheduleId, - scheduleName, - ); - const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); - const secondActive = await secondSchedulesRepository.isActiveSchedule(scheduleName); + it('replaces a dead schedule', async () => { + await schedulesRepository.setActiveSchedule(); + const otherScheduleId = 'other schedule ID'; - expect(active).toEqual(true); - expect(secondActive).toEqual(false); + secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, scheduleName); + const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - await sleep(1200); + await sleep(2 * pingInterval + 10); + const active = await secondSchedulesRepository.setActiveSchedule(); - const secondTakeOver = await secondSchedulesRepository.isActiveSchedule(scheduleName); - await secondConnection.disconnect(); - expect(secondTakeOver).toEqual(true); + expect(active).toEqual(true); }); - it('should allow two active schedules with different names', async () => { - const isActive = await schedulesRepository.isActiveSchedule(scheduleName); - const otherScheduleName = 'other schedule'; - const secondScheduleId = 'first other schedule ID'; - const thirdScheduleId = 'second other schedule ID'; - const secondConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - secondScheduleId, - otherScheduleName, - ); - const secondSchedulesRepository = await secondConnection.getSchedulesRepository(); - const isSecondActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); - const thirdConnection = await Connection.create( - { url: mongo.getUri() }, - pingInterval, - thirdScheduleId, - otherScheduleName, - ); - const thirdSchedulesRepository = await thirdConnection.getSchedulesRepository(); - const isThirdActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); - - expect(isActive).toEqual(true); - expect(isSecondActive).toEqual(true); - expect(isThirdActive).toEqual(false); + it('sets two active schedules with different names', async () => { + const otherName = 'other schedule'; + const otherScheduleId = 'other schedule ID'; - await sleep(2 * pingInterval + 100); + secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, otherName); + const secondSchedulesRepository = secondConnection.getSchedulesRepository(); - const isFirstStillActive = await schedulesRepository.isActiveSchedule(scheduleName); - const didThirdTakeOverActive = await thirdSchedulesRepository.isActiveSchedule(otherScheduleName); - expect(isFirstStillActive).toEqual(true); - expect(didThirdTakeOverActive).toEqual(true); - const isSecondStillActive = await secondSchedulesRepository.isActiveSchedule(otherScheduleName); - expect(isSecondStillActive).toEqual(false); + const active = await schedulesRepository.setActiveSchedule(); + const secondActive = await secondSchedulesRepository.setActiveSchedule(); - await secondConnection.disconnect(); - await thirdConnection.disconnect(); + expect(active).toEqual(true); + expect(secondActive).toEqual(true); }); }); - describe('with schedule', () => { - beforeEach(async () => { - await schedulesRepository.isActiveSchedule(scheduleName); - }); + describe('with active schedule', () => { + beforeEach(async () => schedulesRepository.setActiveSchedule()); describe('removeJob', () => { it('can remove a job', async () => { @@ -156,7 +133,7 @@ describe('SchedulesRepository', () => { const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); expect(schedulesEntity?.executions).toEqual({ [name]: 1 }); - await schedulesRepository.removeJob(scheduleId, name); + await schedulesRepository.removeJob(name); const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); expect(schedulesEntity2?.executions).toEqual({}); }); @@ -169,7 +146,7 @@ describe('SchedulesRepository', () => { const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); expect(schedulesEntity?.executions).toEqual({ [name]: 1, [otherName]: 1 }); - await schedulesRepository.removeJob(scheduleId, name); + await schedulesRepository.removeJob(name); const schedulesEntity2 = await schedulesRepository.findOne({ scheduleId }); expect(schedulesEntity2?.executions).toEqual({ [otherName]: 1 }); }); @@ -229,16 +206,5 @@ describe('SchedulesRepository', () => { expect(running).toBe(2); }); }); - - describe('ping', () => { - it('updates timestamp', async () => { - const schedulesEntity = await schedulesRepository.findOne({ scheduleId }); - - await schedulesRepository.ping(); - - const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId }); - expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive); - }); - }); }); }); diff --git a/test/schedule/MongoSchedule.spec.ts b/test/schedule/MongoSchedule.spec.ts index 43a26a0e..b0a5756b 100644 --- a/test/schedule/MongoSchedule.spec.ts +++ b/test/schedule/MongoSchedule.spec.ts @@ -1,4 +1,4 @@ -import { anyString, deepEqual, instance, mock, verify } from 'ts-mockito'; +import { instance, mock, verify, when } from 'ts-mockito'; import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { JobRepository } from '../../src/repository/JobRepository'; @@ -27,6 +27,8 @@ describe('MongoSchedule', () => { }); it('connects and starts the ping and disconnects and stops the ping', async () => { + when(schedulesRepository.setActiveSchedule()).thenResolve(true); + const mongoSchedule = await MongoSchedule.connect({ scheduleName: 'schedule', url: 'mongodb://does.not/matter' }); const secondSchedule = await MongoSchedule.connect({ scheduleName: 'secondSchedule', @@ -34,13 +36,13 @@ describe('MongoSchedule', () => { }); await mongoSchedule.start(); - verify(schedulesRepository.isActiveSchedule('schedule')).once(); + verify(schedulesRepository.setActiveSchedule()).once(); await secondSchedule.start(); - verify(schedulesRepository.isActiveSchedule('secondSchedule')).once(); + verify(schedulesRepository.setActiveSchedule()).twice(); await mongoSchedule.disconnect(); await secondSchedule.disconnect(); - verify(schedulesRepository.deleteOne(deepEqual({ scheduleId: anyString() }))).twice(); + verify(schedulesRepository.deleteOne()).twice(); expect(disconnect).toHaveBeenCalledTimes(2); }); diff --git a/test/schedule/Schedule.spec.ts b/test/schedule/Schedule.spec.ts index d75b041d..fc4e77b7 100644 --- a/test/schedule/Schedule.spec.ts +++ b/test/schedule/Schedule.spec.ts @@ -39,7 +39,7 @@ describe('Schedule', () => { jest.clearAllMocks(); when(jobRepository.find(deepEqual({ name: momoJob.name }))).thenResolve([]); - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); mongoSchedule = await MongoSchedule.connect({ scheduleName, url: 'mongodb://does.not/matter' }); initLoggingForTests(mongoSchedule); diff --git a/test/schedule/SchedulePing.spec.ts b/test/schedule/SchedulePing.spec.ts index 8cb493db..1c5820a8 100644 --- a/test/schedule/SchedulePing.spec.ts +++ b/test/schedule/SchedulePing.spec.ts @@ -1,13 +1,12 @@ -import { deepEqual, instance, mock, verify, when } from 'ts-mockito'; +import { instance, mock, verify, when } from 'ts-mockito'; import { SchedulesRepository } from '../../src/repository/SchedulesRepository'; import { SchedulePing } from '../../src/schedule/SchedulePing'; import { sleep } from '../utils/sleep'; describe('SchedulePing', () => { - const scheduleId = '123'; - const scheduleName = 'schedule'; const interval = 1000; + const logData = { name: 'name', scheduleId: 'scheduleId' }; let error: jest.Mock; let schedulesRepository: SchedulesRepository; @@ -18,78 +17,87 @@ describe('SchedulePing', () => { startAllJobs = jest.fn(); schedulesRepository = mock(SchedulesRepository); error = jest.fn(); - schedulePing = new SchedulePing( - scheduleId, - scheduleName, - instance(schedulesRepository), - { debug: jest.fn(), error }, - interval, - startAllJobs, - ); + schedulePing = new SchedulePing(instance(schedulesRepository), { debug: jest.fn(), error }, interval, startAllJobs); }); afterEach(async () => schedulePing.stop()); it('starts, pings, cleans and stops', async () => { - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); await schedulePing.start(); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.ping(scheduleId)).once(); + verify(schedulesRepository.setActiveSchedule()).once(); await sleep(1.1 * interval); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.ping(scheduleId)).twice(); + verify(schedulesRepository.setActiveSchedule()).twice(); await schedulePing.stop(); await sleep(interval); - verify(schedulesRepository.ping(scheduleId)).twice(); - verify(schedulesRepository.deleteOne(deepEqual({ scheduleId }))).once(); + verify(schedulesRepository.setActiveSchedule()).twice(); + verify(schedulesRepository.deleteOne()).once(); }); it('handles mongo errors', async () => { - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + when(schedulesRepository.getLogData()).thenReturn(logData); const message = 'I am an error that should be caught'; - when(schedulesRepository.ping(scheduleId)).thenReject({ + when(schedulesRepository.setActiveSchedule()).thenReject({ message, } as Error); await schedulePing.start(); - verify(schedulesRepository.ping(scheduleId)).once(); + verify(schedulesRepository.setActiveSchedule()).once(); expect(error).toHaveBeenCalledWith( 'Pinging or cleaning the Schedules repository failed', 'an internal error occurred', - {}, + logData, { message }, ); }); - it('handles job start taking longer than interval', async () => { - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(false); + it('does not start any jobs for inactive schedule', async () => { + await schedulePing.start(); - startAllJobs.mockImplementation(async () => sleep(2 * interval)); + expect(startAllJobs).not.toHaveBeenCalled(); + }); + + it('does not start any jobs if setting active schedule fails', async () => { + when(schedulesRepository.setActiveSchedule()).thenResolve(false); + + await schedulePing.start(); + + expect(startAllJobs).not.toHaveBeenCalled(); + }); + it('becomes active when other schedule dies', async () => { await schedulePing.start(); - verify(schedulesRepository.ping(scheduleId)).never(); expect(startAllJobs).toHaveBeenCalledTimes(0); - when(schedulesRepository.isActiveSchedule(scheduleName)).thenResolve(true); + // other schedule dies, this one becomes active + when(schedulesRepository.setActiveSchedule()).thenResolve(true); await sleep(1.1 * interval); - verify(schedulesRepository.ping(scheduleId)).once(); + expect(startAllJobs).toHaveBeenCalledTimes(1); + }); + + it('handles job start taking longer than interval', async () => { + startAllJobs.mockImplementation(async () => sleep(2 * interval)); + when(schedulesRepository.setActiveSchedule()).thenResolve(true); + + await schedulePing.start(); expect(startAllJobs).toHaveBeenCalledTimes(1); await sleep(1.1 * interval); - verify(schedulesRepository.ping(scheduleId)).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); await schedulePing.stop(); + await sleep(interval); - verify(schedulesRepository.ping(scheduleId)).twice(); expect(startAllJobs).toHaveBeenCalledTimes(1); - verify(schedulesRepository.deleteOne(deepEqual({ scheduleId }))).once(); + verify(schedulesRepository.deleteOne()).once(); }); }); diff --git a/test/scheduler/JobScheduler.spec.ts b/test/scheduler/JobScheduler.spec.ts index 08c70022..27f80b5b 100644 --- a/test/scheduler/JobScheduler.spec.ts +++ b/test/scheduler/JobScheduler.spec.ts @@ -13,7 +13,6 @@ import { CronSchedule } from '../../src/job/MomoJob'; describe('JobScheduler', () => { const errorFn = jest.fn(); - const scheduleId = '123'; let schedulesRepository: SchedulesRepository; let jobRepository: JobRepository; @@ -38,7 +37,6 @@ describe('JobScheduler', () => { jobScheduler = new JobScheduler( job.name, instance(jobExecutor), - scheduleId, instance(schedulesRepository), instance(jobRepository), loggerForTests(errorFn),