Skip to content

Commit

Permalink
add cancel, cancelledAt, startedRunningAt, finishedRunningAt
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarpov15 committed Jan 27, 2025
1 parent 065b72e commit 488e1ce
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
33 changes: 27 additions & 6 deletions src/taskSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const taskSchema = new mongoose.Schema({
required: true
},
scheduledAt: {
type: Date
type: Date,
required: true
},
nextScheduledAt: {
type: Date
Expand All @@ -20,6 +21,15 @@ const taskSchema = new mongoose.Schema({
timeoutMS: {
type: Number
},
cancelledAt: {
type: Date
},
startedRunningAt: {
type: Date
},
finishedRunningAt: {
type: Date
},
previousTaskId: {
type: mongoose.ObjectId
},
Expand Down Expand Up @@ -51,6 +61,14 @@ taskSchema.methods.log = function log(message, extra) {
return this.save();
};

taskSchema.statics.cancelTask = async function cancelTask(filter) {
if (filter != null) {
filter = { $and: [{ status: 'pending' }, filter] }
};
const task = await this.findOneAndUpdate(filter, { status: 'cancelled', cancelledAt: new Date() }, { returnDocument: 'after' });
return task;
};

taskSchema.methods.sideEffect = async function sideEffect(fn, params) {
this.sideEffects.push({ timestamp: time.now(), name: fn.name, params });
const sideEffect = this.sideEffects[this.sideEffects.length - 1];
Expand Down Expand Up @@ -129,9 +147,10 @@ taskSchema.statics.poll = async function poll(opts) {
while (true) {
let tasksInProgress = [];
for (let i = 0; i < parallel; ++i) {
const now = time.now();
const task = await this.findOneAndUpdate(
{ status: 'pending', scheduledAt: { $lte: time.now() } },
{ status: 'in_progress', ...additionalParams },
{ status: 'pending', scheduledAt: { $lte: now } },
{ status: 'in_progress', startedRunningAt: now, ...additionalParams },
{ new: false }
);

Expand All @@ -140,14 +159,14 @@ taskSchema.statics.poll = async function poll(opts) {
}

task.status = 'in_progress';

tasksInProgress.push(this.execute(task));
}

if (tasksInProgress.length === 0) {
break;
}

await Promise.all(tasksInProgress);
}
};
Expand All @@ -156,7 +175,7 @@ taskSchema.statics.execute = async function(task) {
if (!this._handlers.has(task.name)) {
return null;
}

try {
let result = null;
if (typeof task.timeoutMS === 'number') {
Expand All @@ -174,12 +193,14 @@ taskSchema.statics.execute = async function(task) {
);
}
task.status = 'succeeded';
task.finishedRunningAt = time.now();
task.result = result;
await task.save();
} catch (error) {
task.status = 'failed';
task.error.message = error.message;
task.error.stack = error.stack;
task.finishedRunningAt = time.now();
await task.save();
}

Expand Down
4 changes: 3 additions & 1 deletion test/task.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('Task', function() {
}
Task.removeAllHandlers();
});

it('lets you register a new task', async function() {
let resolve;
let reject;
Expand Down Expand Up @@ -217,6 +217,7 @@ describe('Task', function() {
assert.ok(task);
assert.equal(task.status, 'failed');
assert.equal(task.error.message, 'Sample error message');
assert.equal(task.finishedRunningAt.valueOf(), now.valueOf());
});

it('handles task timeouts', async function() {
Expand All @@ -238,5 +239,6 @@ describe('Task', function() {
assert.ok(task);
assert.equal(task.status, 'failed');
assert.equal(task.error.message, 'Task timed out after 50 ms');
assert.equal(task.finishedRunningAt.valueOf(), now.valueOf());
});
});

0 comments on commit 488e1ce

Please sign in to comment.