Skip to content

Commit

Permalink
add failover logic to switch
Browse files Browse the repository at this point in the history
  • Loading branch information
trmidboe committed Dec 29, 2021
1 parent 33e9fc8 commit a2f8f79
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 140 deletions.
2 changes: 0 additions & 2 deletions public/aegis.config.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
"WebSwitch": {
"desc": "Default implementation. Switched mesh over web sockets.",
"enabled": true,
"host": "localhost",
"port": 80,
"heartbeat": 30000,
"debug": false,
"uplink": null
Expand Down
8 changes: 4 additions & 4 deletions src/domain/distributed-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ const {
* broker:import("./event-broker").EventBroker,
* datasources:import("./datasource-factory").DataSourceFactory,
* models:import("./model-factory").ModelFactory,
* subscribe:function(...args),
* publish:function(...args),
* subscribe:function(string,function()),
* publish:function(string,object),
* }} param0
*/
export default function DistributedCache ({
Expand Down Expand Up @@ -121,7 +121,7 @@ export default function DistributedCache ({
* @param {function(m)=>m.id} return id to save
*/
async function saveModel (model, datasource) {
return datasource.save(model.id || model.getId(), model)
return datasource.save(models.getModelId(model), model)
}

/**
Expand Down Expand Up @@ -154,7 +154,7 @@ export default function DistributedCache ({
if (!model || model.length < 1) {
console.error('no model found', eventName)
// no model found
if (route) await route({ ...event, model })
if (route) await route(event)
return
}

Expand Down
113 changes: 70 additions & 43 deletions src/services/service-mesh/web-switch/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@

import os from 'os'
import WebSocket from 'ws'
import mcastDns from 'multicast-dns'
import Dns from 'multicast-dns'

const SERVICENAME = 'webswitch'
const HOSTNAME = 'webswitch.local'
const MAXRETRY = 5
const TIMEOUTEVENT = 'webswitchTimeout'
const configRoot = require('../../../config').hostConfig
const domain = configRoot.services.cert.domain
Expand All @@ -33,8 +32,6 @@ let models
/** @type {WebSocket} */
let ws

if (!configRoot) console.error(protocol, 'cannot access config file')

function getLocalAddress () {
const interfaces = os.networkInterfaces()
const addresses = []
Expand All @@ -45,10 +42,16 @@ function getLocalAddress () {
addresses.push(address.address)
}
}
return addresses
}
return addresses
}

const getHost = () => domain || process.env.DOMAIN
const getPort = () =>
/true/i.test(process.env.SSL_ENABLED)
? process.env.SSL_PORT
: process.env.PORT

/**
* Use multicast DNS to find the host
* instance configured as the "switch"
Expand All @@ -57,11 +60,11 @@ function getLocalAddress () {
* @returns {Promise<string>} url
*/
async function resolveServiceUrl () {
const mdns = mcastDns()
const dns = Dns()
let url

return new Promise(async function (resolve) {
mdns.on('response', function (response) {
dns.on('response', function (response) {
DEBUG && console.debug(resolveServiceUrl.name, response)

const answer = response.answers.find(
Expand All @@ -75,56 +78,73 @@ async function resolveServiceUrl () {
}
})

mdns.on('query', function (query) {
dns.on('query', function (query) {
DEBUG && console.debug('got a query packet:', query)

const questions = query.questions.filter(
q => q.name === SERVICENAME || q.name === HOSTNAME
)

if (questions[0]) {
if (isSwitch || os.hostname === HOSTNAME) {
console.debug('answering for', HOSTNAME)
mdns.respond({
answers: [
{
name: SERVICENAME,
type: 'SRV',
data: {
port: config.port,
weight: 0,
priority: 10,
target: domain || config.host
}
},
{
name: HOSTNAME,
type: 'A',
ttl: 300,
data: getLocalAddress()[0]
if (questions[0] && (isSwitch || os.hostname === HOSTNAME)) {
console.debug('answering for', HOSTNAME, protocol, getPort(), getHost())
const answer = {
answers: [
{
name: SERVICENAME,
type: 'SRV',
data: {
port: 8080, //getPort(),
weight: 0,
priority: 10,
target: 'aegis.module-federation.org'
}
]
})
},
{
name: HOSTNAME,
type: 'A',
ttl: 300,
data: getLocalAddress()[0]
}
]
}

dns.respond(answer)
}
})

let takeoverTime
const RETRYINTERVAL = 2000

function takeover (retries) {
if (!takeoverTime) {
takeoverTime =
Date.now() + RETRYINTERVAL * retries * Math.random() * 1000
return
}
const now = Date.now()
const ready = now > takeoverTime
console.log({ ready, now, takeoverTime })
return ready
}

/**
* Query DNS for the web-switch server.
* Query DNS for the webswitch server.
* Recursively retry by incrementing a
* counter we pass to ourselves on the
* stack.
*
* @param {number} attempts number of query attempts
* @param {number} retries number of query attempts
* @returns
*/
function runQuery (attempts = 0) {
if (attempts > MAXRETRY) {
console.warn('mDNS cannot find switch after max retries')
function runQuery (retries = 0) {
if (takeover(retries)) {
console.warn('assuming switch duties')
isSwitch = true
return
}

// lets query for an A record
mdns.query({
dns.query({
questions: [
{
name: HOSTNAME,
Expand All @@ -133,7 +153,12 @@ async function resolveServiceUrl () {
]
})

setTimeout(() => (url ? resolve(url) : runQuery(attempts++)), 6000)
if (url) {
resolve(url)
return
}

setTimeout(() => runQuery(retries++), RETRYINTERVAL)
}

runQuery()
Expand Down Expand Up @@ -189,7 +214,7 @@ const handshake = {
pid: process.pid,
serviceUrl,
address: getLocalAddress()[0],
url: `${protocol}://${domain || config.host}:${config.port}`,
url: `${protocol}://${getHost()}:${getPort()}`,
serialize () {
return JSON.stringify({
...this,
Expand Down Expand Up @@ -219,8 +244,8 @@ function startHeartBeat (ws) {
ws.ping(0x9)
} else {
try {
await broker.notify(TIMEOUTEVENT, 'server unresponsive', true)
console.error(receivedPong.resolve, 'no response, trying new conn')
await broker.notify(TIMEOUTEVENT, 'server unresponsive')
console.error(receivedPong, 'no response, trying new conn')
clearInterval(intervalId)
await reconnect()
} catch (error) {
Expand Down Expand Up @@ -255,7 +280,8 @@ export async function subscribe (eventName, callback, options = {}) {
async function _connect () {
if (!ws) {
if (!serviceUrl) serviceUrl = await resolveServiceUrl()
console.info(_connect.name, 'connecting to ', serviceUrl)
console.info(_connect.name, 'switch', serviceUrl)

ws = new WebSocket(serviceUrl)

ws.on('open', function () {
Expand All @@ -264,8 +290,8 @@ async function _connect () {
})

ws.on('error', function (error) {
console.error(_connect.name, 'opening new conn after error', error)
ws = null
console.error(_connect.name, error)
ws = null // new socket next msg
})

ws.on('message', async function (message) {
Expand All @@ -275,6 +301,7 @@ async function _connect () {
if (eventData.eventName) {
if (broker) await broker.notify(eventData.eventName, eventData)
if (uplinkCallback) await uplinkCallback(message)

return
}

Expand Down
22 changes: 1 addition & 21 deletions src/services/service-mesh/web-switch/switch.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ let messagesSent = 0
*/
export function attachServer (server) {
/**
*
* @param {object} data
* @param {WebSocket} sender
*/
Expand Down Expand Up @@ -55,36 +54,17 @@ export function attachServer (server) {
messagesSent,
clientsConnected: server.clients.size,
uplink: server.uplink ? server.uplink.info : 'no uplink',
primarySwitch: isSwitch,
failoverSwitch: server.failoverSwitch,
activeSwitch: isSwitch,
clients: [...server.clients].map(c => c.info)
})
)
}

// function setFailoverSwitch (client) {
// const clients = server.clients.entries()

// function _setFailoverSwitch (client) {
// if (!server.failoverSwitch) {
// if (client.OPEN) {
// client.send({ proto: SERVICENAME, msg: 'setFailover' })
// return true
// }
// if (_setFailoverSwitch(clients.next().value)) return true
// }
// return false
// }

// _setFailoverSwitch(client)
// }

/**
* @param {WebSocket} client
*/
server.on('connection', function (client) {
client.info = { address: client._socket.address(), id: nanoid() }
//setFailoverSwitch(client)

client.addListener('ping', function () {
console.assert(!DEBUG, 'responding to client ping', client.info)
Expand Down
10 changes: 0 additions & 10 deletions wasm/assembly/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,9 @@ export function fibonacci(x: number): number {
if (x === 0) {
return 0;
}

if (x === 1) {
return 1;
}

return fibonacci(x - 1) + fibonacci(x - 2);
}

Expand Down Expand Up @@ -171,14 +169,6 @@ export function validate(keys: string[], vals: string[]): void {
return;
}

// export function inboundPort(keys: string[], vals: string[]): string[][] {
// aegis.log("inbound port called: " + keys[0] + ": " + vals[0]);
// const outval = new Array<string[]>(1);
// outval[0] = ["key1", "val1"];
// aegis.invokePort("task1", "task data", "task1Event", 1, 2);
// return outval;
// }

export function test(keys: string[], values: string[]): string[][] {
const key1 = keys[0] == "key1" ? values[0] : "default";
const key2 = keys[1] == "key2" ? values[1] : "default";
Expand Down
Loading

0 comments on commit a2f8f79

Please sign in to comment.