diff --git a/lib/event_broker.js b/lib/event_broker.js index bff6bb5..ef0bf10 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -138,13 +138,22 @@ EventBroker.prototype._publishNonStreamEnabledClient = function(client, query, t }; EventBroker.prototype._publishStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) { + + var origData = data; + var newMsg = {}; newMsg.type = 'event'; newMsg.topic = sourceTopic; - newMsg.timestamp = data.timestamp; + newMsg.timestamp = data.timestamp || new Date().getTime(); newMsg.subscriptionId = query.subscriptionId; + + // check if topic is a device query, rewrite sent topic as the original topic + var qt = querytopic.parse(query.original.pubsubIdentifier()); + if (qt) { + newMsg.topic = query.original.hash(); + } - if (data.data) { + if (data.data !== undefined) { newMsg.data = data.data; } else { // handle device and server /logs stream @@ -174,6 +183,27 @@ EventBroker.prototype._publishStreamEnabledClient = function(client, query, topi client._unsubscribe(query.subscriptionId) return; } + if (client.filterMultiple) { + // If query has caql statement don't filter + if (query.caql !== null) { + data.subscriptionId = [data.subscriptionId]; + } else { + var found = client.hasBeenSent(origData); + if (found) { + return; + } + + var subscriptionsIds = []; + client._subscriptions.forEach(function(subscription) { + // Only provide id if topic matches and topic doesn't have a caql statement + if (subscription.topic.match(sourceTopic) && subscription.topic.streamQuery === null) { + subscriptionsIds.push(subscription.subscriptionId); + } + }); + + data.subscriptionId = subscriptionsIds; + } + } client.send(sourceTopic, data, function(err){ if (err) { @@ -205,6 +235,13 @@ EventBroker.prototype._streamEnabledClient = function(client) { count: 0, caql: subscription.topic.streamQuery }; + + // If topic is a device query appened unique identifier to query + var qt = querytopic.parse(query.topic); + if (qt) { + query.topic = querytopic.format(qt); + } + client.query.push(query); var connectedPeers = []; diff --git a/lib/event_socket.js b/lib/event_socket.js index 8176f00..0cb793c 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -9,8 +9,13 @@ var JSCompiler = require('caql-js-compiler'); //Flag to indicate that we expect data back on teh websocket //Tracking subscriptions -var EventSocket = module.exports = function(ws, query, streamEnabled) { +var EventSocket = module.exports = function(ws, query, options) { EventEmitter.call(this); + + if (options === undefined) { + options = {}; + } + this.ws = ws; this.query = []; this._queryCache = {}; @@ -18,10 +23,33 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) { // list of event streams this._subscriptions = []; this._subscriptionIndex = 0; - this.streamEnabled = !!(streamEnabled); + + // Flags + this.streamEnabled = !!(options.streamEnabled); + this.filterMultiple = !!(options.filterMultiple); + + this.hasBeenSent = function(msg) { + return this._sendBuffer.add(msg); + }; + this._sendBuffer = { + add: function(msg) { + if (this._buffer.indexOf(msg) >= 0) { + return true; + } + + if (this._buffer.unshift(msg) > this.max) { + this._buffer.pop(); + } + + return false; + }, + max: 50, + _buffer: [] + }; + // only setup parser when using event stream - if (streamEnabled) { + if (this.streamEnabled) { var self = this; this._parser = new EventStreamsParser(); this._parser.on('error', function(err, original) { @@ -168,7 +196,7 @@ EventSocket.prototype.send = function(topic, data) { } else { data = tmpData; } - } else if (data['query']) { + } else if (tmpData['query']) { // format device queries tmpData = deviceFormatter({ loader: this.ws._loader, env: this.ws._env, model: tmpData.device }); if (this.streamEnabled) { diff --git a/lib/http_server.js b/lib/http_server.js index 791cea7..d34df96 100644 --- a/lib/http_server.js +++ b/lib/http_server.js @@ -324,16 +324,15 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) { if (/^\/events/.exec(ws.upgradeReq.url)) { self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name); - var queryString = url.parse(ws.upgradeReq.url).query; + var parsed = url.parse(ws.upgradeReq.url, true); + var query = parsed.query; - if(!queryString) { - var client = new EventSocket(ws, null, true); + if(!query.topic) { + var client = new EventSocket(ws, null, { streamEnabled: true, filterMultiple: !!(query.filterMultiple) }); self.eventBroker.client(client); return; } - var query = querystring.parse(queryString); - function copy(q) { var c = {}; Object.keys(q).forEach(function(k) { @@ -352,7 +351,7 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) { if (qt) { q.topic = querytopic.format(qt); } - var client = new EventSocket(ws, q, false); + var client = new EventSocket(ws, q, { streamEnabled: false }); self.eventBroker.client(client); } }); diff --git a/test/test_event_socket.js b/test/test_event_socket.js index 266f46d..511aeca 100644 --- a/test/test_event_socket.js +++ b/test/test_event_socket.js @@ -69,13 +69,19 @@ describe('EventSocket', function() { it('should init parser if passed streaming flag', function() { var ws = new Ws(); - var client = new EventSocket(ws, 'some-topic', true); + var client = new EventSocket(ws, 'some-topic', { streamEnabled: true }); assert(client._parser) }) + it('should pass filterMultiple flag to EventSocket', function() { + var ws = new Ws(); + var client = new EventSocket(ws, 'some-topic', { filterMultiple: true }); + assert(client.filterMultiple, true); + }) + it('should emit subscribe event when subscribe message is parsed', function(done) { var ws = new Ws(); - var client = new EventSocket(ws, 'some-topic', true); + var client = new EventSocket(ws, 'some-topic', { streamEnabled: true }); client.on('subscribe', function(subscription) { assert(subscription.subscriptionId); assert(subscription.topic); diff --git a/test/test_event_streams.js b/test/test_event_streams.js index 3360bc5..06643fe 100644 --- a/test/test_event_streams.js +++ b/test/test_event_streams.js @@ -1374,6 +1374,228 @@ describe('Event Streams', function() { ws.on('error', done); }); + itBoth('Passing filterMultiple options to ws only one data event will be sent', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl + '?filterMultiple=true'); + var topic = validTopics[0]; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + var msg2 = { type: 'subscribe', topic: 'hub/testdriver/*/state' }; + ws.send(JSON.stringify(msg)); + ws.send(JSON.stringify(msg2)); + var subscriptions = []; + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.subscriptionId); + subscriptions.push(json.subscriptionId); + if (subscriptions.length === 2) { + setTimeout(function() { + devices[0].call('change'); + }, 50); + } + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId.length, subscriptions.length); + subscriptions.forEach(function(id) { + assert(json.subscriptionId.indexOf(id) >= -1); + }); + assert(json.data); + done(); + } + }); + }); + ws.on('error', done); + }); + + itBoth('Passing filterMultiple options to ws will apply limits for both topics', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl + '?filterMultiple=true'); + var topic = validTopics[0]; + var topic2 = 'hub/testdriver/*/state'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic, limit: 2 }; + var msg2 = { type: 'subscribe', topic: topic2, limit: 3 }; + ws.send(JSON.stringify(msg)); + ws.send(JSON.stringify(msg2)); + var subscriptions = {}; + + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.subscriptionId); + subscriptions[json.subscriptionId] = 0; + if (Object.keys(subscriptions).length === 2) { + setTimeout(function() { + devices[0].call('change'); + devices[0].call('prepare'); + devices[0].call('change'); + }, 50); + } + } else if (json.type === 'event') { + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.data); + + json.subscriptionId.forEach(function(id) { + subscriptions[id]++; + }); + + if (subscriptions[1] === 2 && subscriptions[2] === 3) { + done(); + } + } + }); + }); + ws.on('error', done); + }); + + itBoth('Passing filterMultiple options to ws will have no effect on topics with caql query', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl + '?filterMultiple=true'); + var topic = validTopics[0] + '?select *'; + var topic2 = 'hub/testdriver/*/state'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + var msg2 = { type: 'subscribe', topic: topic2 }; + ws.send(JSON.stringify(msg)); + ws.send(JSON.stringify(msg2)); + var received = 0; + + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert(json.subscriptionId); + setTimeout(function() { + devices[0].call('change'); + }, 50); + } else if (json.type === 'event') { + assert(json.timestamp); + assert(json.data); + assert.equal(json.subscriptionId.length, 1); + received++; + + if (received === 2) { + done(); + } + } + }); + }); + ws.on('error', done); + }); + + + itBoth('subscribing to a query with hub for hub will return all devices', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = 'hub/query/where type is not missing'; + var count = 0; + var expected = (idx === 1) ? 2 : 2; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + count++; + if (count === expected) { + done(); + } + } + }); + }); + ws.on('error', done); + }); + + + itBoth('subscribing to a query with * for hub will return all devices', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = '*/query/where type is not missing'; + var count = 0; + var expected = (idx === 1) ? 2 : 4; // cloud will have 4 devices + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + assert(json.data); + count++; + if (count === expected) { + done(); + } + } + }); + }); + ws.on('error', done); + }); + + itBoth('when data is 0 value it should be formatted correctly', function(idx, done) { + var endpoint = urls[idx]; + var ws = new WebSocket('ws://' + endpoint + baseUrl); + var subscriptionId = null; + var topic = 'hub/testdriver/' + devices[0].id + '/bar'; + ws.on('open', function() { + var msg = { type: 'subscribe', topic: topic }; + ws.send(JSON.stringify(msg)); + ws.on('message', function(buffer) { + var json = JSON.parse(buffer); + if(json.type === 'subscribe-ack') { + assert.equal(json.type, 'subscribe-ack'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert(json.subscriptionId); + subscriptionId = json.subscriptionId; + + setTimeout(function() { + devices[0].bar = -1; + devices[0].incrementStreamValue(); + }, 50); + } else { + assert.equal(json.type, 'event'); + assert(json.timestamp); + assert.equal(json.topic, topic); + assert.equal(json.subscriptionId, subscriptionId); + assert.equal(json.data, 0); + done(); + } + }); + }); + ws.on('error', done); + }); + describe('Protocol Errors', function() { var makeTopicStringErrorsTest = function(topic) {