Skip to content

Commit

Permalink
Multiplex ws allow duplicate messages to be filtered (#318)
Browse files Browse the repository at this point in the history
* WIP: Optionally filter multiple of the same message going to the multiplexed ws stream if it satisfies two or more subscriptions

* Use options parameter for options on EventSocket instead of single variable

* Pass filterMultiple option to EventSocket

* Multiplexed ws streams support device queries.

* More tests for device queries over multiplexed ws

* Fix issue with multiplexed data when value is falsely
  • Loading branch information
AdamMagaluk authored Jun 30, 2016
1 parent 58dd7e1 commit 0f817fc
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 14 deletions.
41 changes: 39 additions & 2 deletions lib/event_broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,22 @@ EventBroker.prototype._publishNonStreamEnabledClient = function(client, query, t
};

EventBroker.prototype._publishStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {

var origData = data;

var newMsg = {};
newMsg.type = 'event';
newMsg.topic = sourceTopic;
newMsg.timestamp = data.timestamp;
newMsg.timestamp = data.timestamp || new Date().getTime();
newMsg.subscriptionId = query.subscriptionId;

// check if topic is a device query, rewrite sent topic as the original topic
var qt = querytopic.parse(query.original.pubsubIdentifier());
if (qt) {
newMsg.topic = query.original.hash();
}

if (data.data) {
if (data.data !== undefined) {
newMsg.data = data.data;
} else {
// handle device and server /logs stream
Expand Down Expand Up @@ -174,6 +183,27 @@ EventBroker.prototype._publishStreamEnabledClient = function(client, query, topi
client._unsubscribe(query.subscriptionId)
return;
}
if (client.filterMultiple) {
// If query has caql statement don't filter
if (query.caql !== null) {
data.subscriptionId = [data.subscriptionId];
} else {
var found = client.hasBeenSent(origData);
if (found) {
return;
}

var subscriptionsIds = [];
client._subscriptions.forEach(function(subscription) {
// Only provide id if topic matches and topic doesn't have a caql statement
if (subscription.topic.match(sourceTopic) && subscription.topic.streamQuery === null) {
subscriptionsIds.push(subscription.subscriptionId);
}
});

data.subscriptionId = subscriptionsIds;
}
}

client.send(sourceTopic, data, function(err){
if (err) {
Expand Down Expand Up @@ -205,6 +235,13 @@ EventBroker.prototype._streamEnabledClient = function(client) {
count: 0,
caql: subscription.topic.streamQuery
};

// If topic is a device query appened unique identifier to query
var qt = querytopic.parse(query.topic);
if (qt) {
query.topic = querytopic.format(qt);
}

client.query.push(query);

var connectedPeers = [];
Expand Down
36 changes: 32 additions & 4 deletions lib/event_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,47 @@ var JSCompiler = require('caql-js-compiler');

//Flag to indicate that we expect data back on teh websocket
//Tracking subscriptions
var EventSocket = module.exports = function(ws, query, streamEnabled) {
var EventSocket = module.exports = function(ws, query, options) {
EventEmitter.call(this);

if (options === undefined) {
options = {};
}

this.ws = ws;
this.query = [];
this._queryCache = {};

// list of event streams
this._subscriptions = [];
this._subscriptionIndex = 0;
this.streamEnabled = !!(streamEnabled);

// Flags
this.streamEnabled = !!(options.streamEnabled);
this.filterMultiple = !!(options.filterMultiple);

this.hasBeenSent = function(msg) {
return this._sendBuffer.add(msg);
};
this._sendBuffer = {
add: function(msg) {
if (this._buffer.indexOf(msg) >= 0) {
return true;
}

if (this._buffer.unshift(msg) > this.max) {
this._buffer.pop();
}

return false;
},
max: 50,
_buffer: []
};


// only setup parser when using event stream
if (streamEnabled) {
if (this.streamEnabled) {
var self = this;
this._parser = new EventStreamsParser();
this._parser.on('error', function(err, original) {
Expand Down Expand Up @@ -168,7 +196,7 @@ EventSocket.prototype.send = function(topic, data) {
} else {
data = tmpData;
}
} else if (data['query']) {
} else if (tmpData['query']) {
// format device queries
tmpData = deviceFormatter({ loader: this.ws._loader, env: this.ws._env, model: tmpData.device });
if (this.streamEnabled) {
Expand Down
11 changes: 5 additions & 6 deletions lib/http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,15 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) {

if (/^\/events/.exec(ws.upgradeReq.url)) {
self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
var queryString = url.parse(ws.upgradeReq.url).query;
var parsed = url.parse(ws.upgradeReq.url, true);
var query = parsed.query;

if(!queryString) {
var client = new EventSocket(ws, null, true);
if(!query.topic) {
var client = new EventSocket(ws, null, { streamEnabled: true, filterMultiple: !!(query.filterMultiple) });
self.eventBroker.client(client);
return;
}

var query = querystring.parse(queryString);

function copy(q) {
var c = {};
Object.keys(q).forEach(function(k) {
Expand All @@ -352,7 +351,7 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) {
if (qt) {
q.topic = querytopic.format(qt);
}
var client = new EventSocket(ws, q, false);
var client = new EventSocket(ws, q, { streamEnabled: false });
self.eventBroker.client(client);
}
});
Expand Down
10 changes: 8 additions & 2 deletions test/test_event_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,19 @@ describe('EventSocket', function() {

it('should init parser if passed streaming flag', function() {
var ws = new Ws();
var client = new EventSocket(ws, 'some-topic', true);
var client = new EventSocket(ws, 'some-topic', { streamEnabled: true });
assert(client._parser)
})

it('should pass filterMultiple flag to EventSocket', function() {
var ws = new Ws();
var client = new EventSocket(ws, 'some-topic', { filterMultiple: true });
assert(client.filterMultiple, true);
})

it('should emit subscribe event when subscribe message is parsed', function(done) {
var ws = new Ws();
var client = new EventSocket(ws, 'some-topic', true);
var client = new EventSocket(ws, 'some-topic', { streamEnabled: true });
client.on('subscribe', function(subscription) {
assert(subscription.subscriptionId);
assert(subscription.topic);
Expand Down
Loading

0 comments on commit 0f817fc

Please sign in to comment.