Skip to content

Commit

Permalink
meshlink
Browse files Browse the repository at this point in the history
  • Loading branch information
trmidboe committed Nov 1, 2021
1 parent 9cd12ad commit d42eae4
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 70 deletions.
4 changes: 2 additions & 2 deletions src/adapters/serverless/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { makeServerlessAdapter } from './serverless-adapter'
import * as localParsers from './parsers'

const getRemoteParsers = async () => import('aegis-services/parsers')
const getRemoteParsers = async () => null //import('aegis-services/parsers')

export const getParsers = async function () {
try {
Expand All @@ -14,4 +14,4 @@ export const getParsers = async function () {
return localParsers
}

export const ServerlessAdapter = () => makeServerlessAdapter(getParsers)
export const ServerlessAdapter = () => makeServerlessAdapter(getParsers)
2 changes: 1 addition & 1 deletion src/adapters/webassembly/wasm-import.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export async function importWebAssembly (remoteEntry, type = 'model') {
return
}

observer.on(event, eventData => adapter.callWasmFunction(fn, eventData))
observer.on(event, eventData => adapter.callWasmFunction(fn, eventData), true)
},

/**
Expand Down
2 changes: 1 addition & 1 deletion src/adapters/webassembly/wasm-interop.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ exports.WasmInterop = function (module) {
const obj = __getArray(ptr)
.map(inner => __getArray(inner))
.map(tuple => ({ [__getString(tuple[0])]: __getString(tuple[1]) }))
.reduce((prop1, prop2) => ({ ...prop1, ...prop2 }))
.reduce((obj1, obj2) => ({ ...obj1, ...obj2 }))

const immutableClone = Object.freeze({ ...obj })
!unpin || __unpin(ptr)
Expand Down
8 changes: 5 additions & 3 deletions src/domain/distributed-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,10 @@ export default function DistributedCache ({
* @param {*} externalEvent
*/
const forwardSearchRequest = (internalEvent, externalEvent) =>
observer.on(internalEvent, async event =>
publish({ ...event, eventName: externalEvent })
observer.on(
internalEvent,
async event => publish({ ...event, eventName: externalEvent }),
true
)

/**
Expand All @@ -336,7 +338,7 @@ export default function DistributedCache ({
*/
const broadcastCrudEvent = eventName =>
observer.on(eventName, async event =>
publish({ ...event, eventName: externalCrudEvent(eventName) })
publish({ ...event, eventName: externalCrudEvent(eventName) }, true)
)

/**
Expand Down
2 changes: 1 addition & 1 deletion src/domain/make-relations.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export function requireRemoteObject (model, relation, observer, ...args) {

return new Promise(async function (resolve) {
setTimeout(resolve, maxwait)
observer.on(response, execute(resolve))
observer.on(response, execute(resolve), true)
await observer.notify(request, requestData)
})
}
Expand Down
22 changes: 19 additions & 3 deletions src/domain/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ async function notify (eventName, eventData, forward = false) {
try {
if (this.handlers.has(eventName)) {
await Promise.allSettled(
this.handlers
.get(eventName)
.map(handler => run(eventName, eventData, handler, forward))
this.handlers.get(eventName).map(handler => {
console.debug('hander running', {
eventName,
handler: handlers.toString()
})
run(eventName, eventData, handler, forward)
})
)
}

Expand Down Expand Up @@ -129,6 +133,18 @@ class ObserverImpl extends Observer {
}
return true
}

serialize () {
return JSON.stringify(
[...this.handlers].map(([k, v]) => ({ [k]: v.map(fn => fn.toString()) })),
null,
2
)
}

toString () {
console.log('toString', this.serialize())
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/services/dns/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

import * as localClients from './providers'
const getRemoteClients = async () => import('aegis-services/dns')
const getRemoteClients = async () => null //import('aegis-services/dns')

const dns = async function () {
const name = process.env.DNS_SERVICE
Expand Down
73 changes: 31 additions & 42 deletions src/services/service-mesh/mesh-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const nanoid = require('nanoid').nanoid
const begins = Date.now()
const uptime = () => Math.round(Math.abs((Date.now() - begins) / 1000 / 60))
const userConfig = require('../../../../microlib/public/aegis.config.json')
const DEBUG =
const debug =
/true/i.test(userConfig.services.serviceMesh.MeshLink.debug) || false

const defaultCfg = {
Expand All @@ -23,11 +23,7 @@ const defaultCfg = {

const cfg = userConfig.services.serviceMesh.MeshLink.config || defaultCfg

let started = false
const subscriptions = new Map()

function numericHash (str) {
if (subscriptions.has(str)) return null
let hash = 0
let i
let chr
Expand All @@ -37,8 +33,7 @@ function numericHash (str) {
hash = (hash << 5) - hash + chr
hash |= 0 // Convert to 32bit integer
}
const handleId = Math.abs(parseInt(hash % 10000)) //keep under 0xffff
subscriptions.set(str, handleId)
return Math.abs(parseInt(hash % 10000)) //keep under 0xffff
}

const sharedObjects = new Map()
Expand Down Expand Up @@ -101,25 +96,22 @@ const SharedObjEvent = {
}

const registerSharedObjEvents = observer =>
observer.on(/^externalCrudEvent_.*/, async (eventName, eventData) =>
SharedObjEvent[eventName.split('_')[1].substr(0, 6)](eventData)
observer.on(
/^externalCrudEvent_.*/,
async (eventName, eventData) =>
SharedObjEvent[eventName.split('_')[1].substr(0, 6)](eventData),
true
)

/**
*
* @param {cfg} config
* @returns
*/
async function start (config = defaultCfg, observer = null) {
if (started) return
started = true
observer && registerSharedObjEvents(observer)

async function start (config = cfg) {
mlink
.start(config)
.then(() => {
// connect to uplink if configured
!config.uplink || uplink(config)
console.info('meshlink started')
})
.catch(error => {
Expand All @@ -128,23 +120,18 @@ async function start (config = defaultCfg, observer = null) {
}

async function publish (event, observer) {
console.debug('publish called', event.eventName, event, observer)
const deserEvent = JSON.parse(JSON.stringify(event))
const handlerId = numericHash(event.eventName)
DEBUG && console.debug('mlink publish', handlerId, deserEvent)

start(cfg, observer).then(() => {
return mlink.send(
handlerId,
mlink.getNodeEndPoints(),
deserEvent,
response => {
console.debug('response to publish ', handlerId, response)
const eventData = JSON.parse(response)
if (eventData?.eventName) {
observer.notify(eventData.eventName, eventData)
}
}
)
console.debug('mlink stringify', handlerId, JSON.stringify(event))
console.debug('mlink parse', handlerId, JSON.parse(JSON.stringify(event)))

mlink.send(handlerId, mlink.getNodeEndPoints(), deserEvent, response => {
console.debug('response to publish ', handlerId, response)
const eventData = JSON.parse(response)
if (eventData?.eventName) {
observer.notify(eventData.eventName, eventData)
}
})
try {
global.broadcast(JSON.stringify(event), {
Expand All @@ -158,18 +145,18 @@ async function publish (event, observer) {
async function subscribe (eventName, callback) {
const handlerId = numericHash(eventName)
if (!handlerId) return // we've already registered a callback for this event
DEBUG && console.debug('mlink subscribe', eventName, handlerId)
start(cfg).then(() =>
mlink.handler(handlerId, (data, cb) => {
console.log('mlink.handler called with data', handlerId, data)
cb(callback(data))
})
)
debug && console.debug('mlink subscribe', eventName, handlerId)

mlink.handler(handlerId, (data, cb) => {
console.log('mlink.handler called with data', handlerId, data)
callback(data)
cb(data)
})
}

function attachServer (server) {
let messagesSent = 0

start()
/**
*
* @param {object} data
Expand All @@ -178,7 +165,7 @@ function attachServer (server) {
server.broadcast = function (data, sender) {
server.clients.forEach(function (client) {
if (client.OPEN && client.info.id !== sender.info.id) {
!DEBUG || console.debug('sending client', client.info, data.toString())
debug && console.debug('sending client', client.info, data.toString())
client.send(data)
messagesSent++
}
Expand All @@ -196,9 +183,11 @@ function attachServer (server) {
server.sendStatus = function (client) {
client.send(
JSON.stringify({
servicePlugin: 'MeshLink',
uptimeMinutes: uptime(),
messagesSent,
clientsConnected: server.clients.size
clientsConnected: server.clients.size,
meshLinkNodes: mlink.getNodeEndPoints()
})
)
}
Expand All @@ -207,7 +196,7 @@ function attachServer (server) {
client.info = { address: client._socket.address(), id: nanoid() }

client.addListener('ping', function () {
!DEBUG || console.debug('responding to client ping', client.info)
debug && console.debug('responding to client ping', client.info)
client.pong(0xa)
})

Expand Down
4 changes: 3 additions & 1 deletion src/services/service-mesh/webswitch/web-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const config = configFile.services.serviceMesh.WebSwitch
const DEBUG = /true|yes|y/i.test(config.debug) || false
const heartbeat = config.heartbeat || 10000

if (!configFile) console.error('WebSwitch', 'cannot access config file')

/**
* @type import("ws/lib/websocket")
*/
Expand Down Expand Up @@ -125,7 +127,7 @@ function setupHeartBeat (ws) {
}

export async function subscribe (eventName, callback, observer) {
observer.on(eventName, callback)
observer.on(eventName, callback, true)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/use-cases/edit-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ export default function makeEditModel ({
} = {}) {
const eventType = models.EventTypes.UPDATE
const eventName = models.getEventName(eventType, modelName)
handlers.forEach(handler => observer.on(eventName, handler))
handlers.forEach(handler => observer.on(eventName, handler, false))

// Add an event that can be used to edit this model
observer.on(domainEvents.editModel(modelName), editModelHandler)
observer.on(domainEvents.editModel(modelName), editModelHandler, false)

async function editModel (id, changes, command) {
const model = await repository.find(id)
Expand Down
12 changes: 8 additions & 4 deletions src/use-cases/forward-events.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ export function forwardEvents ({ observer, models, publish, subscribe }) {
)

producerEvents.forEach(producerEvent =>
observer.on(producerEvent, eventData =>
publish(producerEvent, eventData || 'no data')
observer.on(
producerEvent,
eventData => publish(producerEvent, eventData || 'no data'),
true
)
)
}
Expand All @@ -82,8 +84,10 @@ export function forwardEvents ({ observer, models, publish, subscribe }) {
* Forward events so marked.
*/
function forwardUserEvents () {
observer.on(domainEvents.forwardEvent, eventData =>
publish(eventData.eventName, eventData)
observer.on(
domainEvents.forwardEvent(),
eventData => publish(eventData.eventName, eventData),
false
)
}

Expand Down
2 changes: 1 addition & 1 deletion src/use-cases/remove-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export default function removeModelFactory ({
} = {}) {
const eventType = models.EventTypes.DELETE
const eventName = models.getEventName(eventType, modelName)
handlers.forEach(handler => observer.on(eventName, handler))
handlers.forEach(handler => observer.on(eventName, handler, false))

return async function removeModel (id) {
const model = await repository.find(id)
Expand Down
2 changes: 1 addition & 1 deletion target/npmlist.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.0.0-alpha.95",
"version": "1.0.0-alpha.95",
"name": "@module-federation/aegis",
"dependencies": {
"@octokit/core": {
Expand Down
11 changes: 4 additions & 7 deletions wasm/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,13 @@ async function importWebAssembly () {
wasm.exports.__getString(callbackName)
)



if (typeof fn === 'function') {
observer.on(eventName, eventData => {

if (typeof fn === 'function') {
observer.on(eventName, eventData => {
adapter.callWasmFunction(fn, eventData, false)
return
}
})
console.log('no command found')
})
}
},

/**
Expand Down

0 comments on commit d42eae4

Please sign in to comment.