Skip to content

Commit

Permalink
refactor/fix: new listeners backward compatible to old subscription loop
Browse files Browse the repository at this point in the history
  • Loading branch information
mohitpubnub committed Feb 14, 2024
1 parent 6f22576 commit ba806b5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 246 deletions.
60 changes: 40 additions & 20 deletions src/core/components/eventEmitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ export default class EventEmitter {
if (e.payload.timeout) {
announce.timeout = e.payload.timeout;
}
// deprecated -->
announce.actualChannel = subscriptionMatch != null ? channel : null;
announce.subscribedChannel = subscriptionMatch != null ? subscriptionMatch : channel;
// <-- deprecated

this.listenerManager.announcePresence(announce);
this._announce('presence', announce, announce.channel, announce.subscription);
Expand Down Expand Up @@ -220,36 +224,52 @@ export default class EventEmitter {
} else {
announce.message = e.payload;
}
// deprecated -->
announce.actualChannel = subscriptionMatch != null ? channel : null;
announce.subscribedChannel = subscriptionMatch != null ? subscriptionMatch : channel;
// <-- deprecated

this.listenerManager.announceMessage(announce);
this._announce('message', announce, announce.channel, announce.subscription);
}
}

addListener(l, channels, groups) {
channels.forEach((c) => {
if (this._channelListenerMap[c]) {
if (!this._channelListenerMap[c].includes(l)) this._channelListenerMap[c].push(l);
} else {
this._channelListenerMap[c] = [l];
}
});
groups.forEach((g) => {
if (this._groupListenerMap[g]) {
if (!this._groupListenerMap[g].includes(l)) this._groupListenerMap[g].push(l);
} else {
this._groupListenerMap[g] = [l];
}
});
if (!(channels && groups)) {
this.listenerManager.addListener(l);
} else {
channels?.forEach((c) => {
if (this._channelListenerMap[c]) {
if (!this._channelListenerMap[c].includes(l)) this._channelListenerMap[c].push(l);
} else {
this._channelListenerMap[c] = [l];
}
});
groups?.forEach((g) => {
if (this._groupListenerMap[g]) {
if (!this._groupListenerMap[g].includes(l)) this._groupListenerMap[g].push(l);
} else {
this._groupListenerMap[g] = [l];
}
});
}
}

removeListener(listener, channels, groups) {
channels.forEach((c) => {
this._channelListenerMap[c] = this._channelListenerMap[c]?.filter((l) => l !== listener);
});
groups.forEach((g) => {
this._groupListenerMap[g] = this._groupListenerMap[g]?.filter((l) => l !== listener);
});
if (!(channels && groups)) {
this.listenerManager.removeListener(l);
} else {
channels?.forEach((c) => {
this._channelListenerMap[c] = this._channelListenerMap[c]?.filter((l) => l !== listener);
});
groups?.forEach((g) => {
this._groupListenerMap[g] = this._groupListenerMap[g]?.filter((l) => l !== listener);
});
}
}

removeAllListeners() {
this.listenerManager.removeAllListeners();
}

_renameEvent(e) {
Expand Down
223 changes: 6 additions & 217 deletions src/core/components/subscription_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export default class {

_dedupingManager;

_eventEmitter;

constructor({
subscribeEndpoint,
leaveEndpoint,
Expand All @@ -75,6 +77,7 @@ export default class {
crypto,
listenerManager,
cryptoModule,
eventEmitter,
}) {
this._listenerManager = listenerManager;
this._config = config;
Expand Down Expand Up @@ -112,6 +115,8 @@ export default class {
this._dedupingManager = new DedupingManager({ config });

if (this._cryptoModule) this._decoder = new TextDecoder();

this._eventEmitter = eventEmitter;
}

adaptStateChange(args, callback) {
Expand Down Expand Up @@ -534,7 +539,6 @@ export default class {
messages.forEach((message) => {
const { channel } = message;
let { subscriptionMatch } = message;
const { publishMetaData } = message;

if (channel === subscriptionMatch) {
subscriptionMatch = null;
Expand All @@ -547,222 +551,7 @@ export default class {
this._dedupingManager.addEntry(message);
}

if (utils.endsWith(message.channel, '-pnpres')) {
const announce = {};
announce.channel = null;
announce.subscription = null;

// deprecated -->
announce.actualChannel = subscriptionMatch != null ? channel : null;
announce.subscribedChannel = subscriptionMatch != null ? subscriptionMatch : channel;
// <-- deprecated

if (channel) {
announce.channel = channel.substring(0, channel.lastIndexOf('-pnpres'));
}

if (subscriptionMatch) {
announce.subscription = subscriptionMatch.substring(0, subscriptionMatch.lastIndexOf('-pnpres'));
}

announce.action = message.payload.action;
announce.state = message.payload.data;
announce.timetoken = publishMetaData.publishTimetoken;
announce.occupancy = message.payload.occupancy;
announce.uuid = message.payload.uuid;
announce.timestamp = message.payload.timestamp;

if (message.payload.join) {
announce.join = message.payload.join;
}

if (message.payload.leave) {
announce.leave = message.payload.leave;
}

if (message.payload.timeout) {
announce.timeout = message.payload.timeout;
}

this._listenerManager.announcePresence(announce);
} else if (message.messageType === 1) {
// this is a signal message
const announce = {};
announce.channel = null;
announce.subscription = null;

announce.channel = channel;
announce.subscription = subscriptionMatch;
announce.timetoken = publishMetaData.publishTimetoken;
announce.publisher = message.issuingClientId;

if (message.userMetadata) {
announce.userMetadata = message.userMetadata;
}

announce.message = message.payload;

this._listenerManager.announceSignal(announce);
} else if (message.messageType === 2) {
// this is an object message

const announce = {};

announce.channel = null;
announce.subscription = null;

announce.channel = channel;
announce.subscription = subscriptionMatch;
announce.timetoken = publishMetaData.publishTimetoken;
announce.publisher = message.issuingClientId;

if (message.userMetadata) {
announce.userMetadata = message.userMetadata;
}

announce.message = {
event: message.payload.event,
type: message.payload.type,
data: message.payload.data,
};

this._listenerManager.announceObjects(announce);

if (message.payload.type === 'uuid') {
const eventData = this._renameChannelField(announce);
this._listenerManager.announceUser({
...eventData,
message: {
...eventData.message,
event: this._renameEvent(eventData.message.event),
type: 'user',
},
});
} else if (message.payload.type === 'channel') {
const eventData = this._renameChannelField(announce);
this._listenerManager.announceSpace({
...eventData,
message: {
...eventData.message,
event: this._renameEvent(eventData.message.event),
type: 'space',
},
});
} else if (message.payload.type === 'membership') {
const eventData = this._renameChannelField(announce);
const { uuid: user, channel: space, ...membershipData } = eventData.message.data;
membershipData.user = user;
membershipData.space = space;
this._listenerManager.announceMembership({
...eventData,
message: {
...eventData.message,
event: this._renameEvent(eventData.message.event),
data: membershipData,
},
});
}
} else if (message.messageType === 3) {
// this is a message action
const announce = {};
announce.channel = channel;
announce.subscription = subscriptionMatch;
announce.timetoken = publishMetaData.publishTimetoken;
announce.publisher = message.issuingClientId;

announce.data = {
messageTimetoken: message.payload.data.messageTimetoken,
actionTimetoken: message.payload.data.actionTimetoken,
type: message.payload.data.type,
uuid: message.issuingClientId,
value: message.payload.data.value,
};

announce.event = message.payload.event;

this._listenerManager.announceMessageAction(announce);
} else if (message.messageType === 4) {
// this is a file message
const announce = {};
announce.channel = channel;
announce.subscription = subscriptionMatch;
announce.timetoken = publishMetaData.publishTimetoken;
announce.publisher = message.issuingClientId;

let msgPayload = message.payload;

if (this._cryptoModule) {
let decryptedPayload;
try {
const decryptedData = this._cryptoModule.decrypt(message.payload);
decryptedPayload =
decryptedData instanceof ArrayBuffer ? JSON.parse(this._decoder.decode(decryptedData)) : decryptedData;
} catch (e) {
decryptedPayload = null;
announce.error = `Error while decrypting message content: ${e.message}`;
}
if (decryptedPayload !== null) {
msgPayload = decryptedPayload;
}
}

if (message.userMetadata) {
announce.userMetadata = message.userMetadata;
}

announce.message = msgPayload.message;

announce.file = {
id: msgPayload.file.id,
name: msgPayload.file.name,
url: this._getFileUrl({
id: msgPayload.file.id,
name: msgPayload.file.name,
channel,
}),
};

this._listenerManager.announceFile(announce);
} else {
const announce = {};
announce.channel = null;
announce.subscription = null;

// deprecated -->
announce.actualChannel = subscriptionMatch != null ? channel : null;
announce.subscribedChannel = subscriptionMatch != null ? subscriptionMatch : channel;
// <-- deprecated

announce.channel = channel;
announce.subscription = subscriptionMatch;
announce.timetoken = publishMetaData.publishTimetoken;
announce.publisher = message.issuingClientId;

if (message.userMetadata) {
announce.userMetadata = message.userMetadata;
}

if (this._cryptoModule) {
let decryptedPayload;
try {
const decryptedData = this._cryptoModule.decrypt(message.payload);
decryptedPayload =
decryptedData instanceof ArrayBuffer ? JSON.parse(this._decoder.decode(decryptedData)) : decryptedData;
} catch (e) {
decryptedPayload = null;
announce.error = `Error while decrypting message content: ${e.message}`;
}
if (decryptedPayload != null) {
announce.message = decryptedPayload;
} else {
announce.message = message.payload;
}
} else {
announce.message = message.payload;
}

this._listenerManager.announceMessage(announce);
}
this._eventEmitter.emitEvent(message);
});

this._region = payload.metadata.region;
Expand Down
17 changes: 9 additions & 8 deletions src/core/pubnub-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,12 @@ export default class {
this.handshake = endpointCreator.bind(this, modules, handshakeEndpointConfig);
this.receiveMessages = endpointCreator.bind(this, modules, receiveMessagesConfig);

this._eventEmitter = new EventEmitter({
modules: modules,
listenerManager: this._listenerManager,
getFileUrl: (params) => getFileUrlFunction(modules, params),
});
if (config.enableEventEngine === true) {
this._eventEmitter = new EventEmitter({
modules: modules,
listenerManager: this._listenerManager,
getFileUrl: (params) => getFileUrlFunction(modules, params),
});
if (config.maintainPresenceState) {
this.presenceState = {};
this.setState = (args) => {
Expand Down Expand Up @@ -424,6 +424,7 @@ export default class {
listenerManager,
getFileUrl: (params) => getFileUrlFunction(modules, params),
cryptoModule: modules.cryptoModule,
eventEmitter: this._eventEmitter,
});

this.subscribe = subscriptionManager.adaptSubscribeChange.bind(subscriptionManager);
Expand All @@ -443,9 +444,9 @@ export default class {
};
}

this.addListener = listenerManager.addListener.bind(listenerManager);
this.removeListener = listenerManager.removeListener.bind(listenerManager);
this.removeAllListeners = listenerManager.removeAllListeners.bind(listenerManager);
this.addListener = this._eventEmitter.addListener.bind(this._eventEmitter);
this.removeListener = this._eventEmitter.removeListener.bind(this._eventEmitter);
this.removeAllListeners = this._eventEmitter.removeAllListeners.bind(this._eventEmitter);

this.parseToken = tokenManager.parseToken.bind(tokenManager);
this.setToken = tokenManager.setToken.bind(tokenManager);
Expand Down
1 change: 0 additions & 1 deletion test/integration/components/subscription_manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,6 @@ describe('#components/subscription_manager', () => {
pubnubWithCrypto.addListener({
message(messagePayload) {
incomingPayloads.push(messagePayload);
console.log('\n\n\n incomingpayload = ', JSON.stringify(incomingPayloads));
if (incomingPayloads.length === 1) {
assert.equal(scope.isDone(), true);
assert.deepEqual(incomingPayloads, [
Expand Down

0 comments on commit ba806b5

Please sign in to comment.