-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer_message_handler.js
40 lines (37 loc) · 1.51 KB
/
consumer_message_handler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
var redis = require("redis");
var config = require('./consumer_config.js');
module.exports = {
messagesBuffer: [],
messagesBufferSize: 0,
patchSize: 20,
handleMessage: function(messageObj, callback) {
this.messagesBuffer.push(messageObj);
this.messagesBufferSize++
if(this.messagesBufferSize == this.patchSize) {
// send to redis store
redisClient = redis.createClient(config.redisConnectionOptions);
redisClient.on("error", function (err) {
return callback('Redis Error: ' + err);
});
// check if the key 'lastId' exist and get its value if exist
redisClient.get('consumer:lastId', (err, lastId) => {
if (err) {
return callback('Error getting consumer:lastId: ' + err);
}
if(!lastId) { // if no key with the name consumer:lastId, create one
lastId = 1;
redisClient.set('consumer:lastId', lastId);
}
redisClient.set('consumer:' + lastId, JSON.stringify(this.messagesBuffer));
// increment the value of 'lastId'
redisClient.incr('consumer:lastId');
redisClient.quit();
// clear the buffer
this.messagesBuffer = [];
this.messagesBufferSize = 0;
// signaling the caller about the written patch
callback(null, lastId);
});
}
}
};