forked from vbuzzano/sails-hook-jobs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
176 lines (150 loc) · 5.48 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
var CronJob = require('cron').CronJob;
module.exports = function(sails) {
var Agenda = require('agenda'),
util = require('util'),
_ = require('lodash'),
os = require("os"),
agenda = new Agenda();
agenda.sails = sails;
var stopServer = function() {
agenda.stop(function() {
console.log("agenda stopped");
});
};
sails.on("lower", stopServer);
sails.on("lowering", stopServer);
// return hook
return {
// expose agenda in sails.hooks.jobs.agenda
jobs: agenda,
// Defaults config
defaults: {
jobs: {
"globalJobsObjectName": "Jobs",
"jobsDirectory": "api/jobs",
"db": {
"address" : "localhost:27017/jobs",
"collection" : "agendaJobs"
},
"name": os.hostname() + '-' + process.pid,
"processEvery": "1 minutes",
"maxConcurrency": 20,
"defaultConcurrency": 5,
"defaultLockLifetime": 10000,
}
},
// Runs automatically when the hook initializes
initialize: function (cb) {
var hook = this
, config = sails.config.jobs
// Enable jobs using coffeescript
try {
require('coffee-script/register');
} catch(e0) {
try {
var path = require('path');
var appPath = sails.config.appPath || process.cwd();
require(path.join(appPath, 'node_modules/coffee-script/register'));
} catch(e1) {
sails.log.verbose('Please run `npm install coffee-script` to use coffescript (skipping for now)');
}
}
// init agenda
agenda
.database(config.db.address, config.db.collection,{},function(err, collection) {
// Find all jobs
var jobs = require('include-all')({
dirname : sails.config.appPath + '/' + config.jobsDirectory,
filter : /(.+Job).(?:js|coffee)$/,
excludeDirs : /^\.(git|svn)$/,
optional : true
});
// init jobs
hook.initJobs(jobs);
// Lets wait on some of the sails core hooks to
// finish loading before we load our hook
// that talks about cats.
var eventsToWaitFor = [];
if (sails.hooks.orm)
eventsToWaitFor.push('hook:orm:loaded');
if (sails.hooks.pubsub)
eventsToWaitFor.push('hook:pubsub:loaded');
sails.after(eventsToWaitFor, function(){
// if (jobs.length > 0) {
// start agenda
agenda.start();
sails.log.verbose("sails jobs started")
// }
// Now we will return the callback and our hook
// will be usable.
return cb();
});
})
.name(config.name)
.processEvery(config.processEvery)
.maxConcurrency(config.maxConcurrency)
.defaultConcurrency(config.defaultConcurrency)
.defaultLockLifetime(config.defaultLockLifetime)
global[config.globalJobsObjectName] = agenda;
},
/**
* Function that initialize jobs
*/
initJobs: function(jobs, namespace) {
var hook = this
if (!namespace) namespace = "jobs";
sails.log.verbose("looking for job in " + namespace + "... ")
_.forEach(jobs, function(job, name){
if (typeof job === 'function') {
var log = ""
, _job = job(agenda)
, _dn = namespace + "." + name
, _name = _job.name || _dn.substr(_dn.indexOf('.') +1);
if (_job.disabled) {
log += "-> Disabled Job '" + _name + "' found in '" + namespace + "." + name + "'.";
} else {
var options = (typeof _job.options === 'object')?_job.options:{}
, freq = _job.frequency
, error = false;
if (typeof _job.run === "function")
agenda.define(_name, options, _job.run);
log += "-> Job '" + _name + "' found in '" + namespace + "." + name + "', defined in agenda";
if (typeof freq === 'string') {
freq = freq.trim().toLowerCase();
if (freq.indexOf('every') == 0) {
var interval = freq.substr(6).trim();
agenda.every(interval, _name, _job.data);
log += " and will run " + freq;
} else if (freq.indexOf('schedule') == 0) {
var when = freq.substr(9).trim();
agenda.schedule(when, _name, _job.data);
log += " and scheduled " + when;
} else if (freq === 'now') {
agenda.now(_name, _job.data);
log += " and started";
} else {
//last test for freq to see if its a cron string.
try{
//This will throw an error when freq is not a cron string
new CronJob(freq);
//No error is thrown, so continue with creating an agenda.
agenda.every(freq, _name, _job.data);
log += " and scheduled for this(these) crontime(s) "+ "freq";
}catch(err){
//failed to parse the string as a cronTime(s)
error = true;
log += ". But the frequency, '" + freq + "' is not supported";
}
}
}
}
log += ".";
if (error) sails.log.error(log);
else sails.log.verbose(log);
} else {
hook.initJobs(job, namespace + "." + name);
}
});
}
}
};