From b72334c67af8773764723afe33c0e4cf3724edd9 Mon Sep 17 00:00:00 2001 From: tysonrm Date: Sat, 12 Nov 2022 17:46:01 -0600 Subject: [PATCH] secure acccess to datasource from hosted code --- .prettierrc | 5 +- forkrun | Bin 0 -> 50436 bytes forkrun.c | 31 ++++ package.json | 4 +- src/adapters/controllers/get-model-by-id.js | 7 +- src/adapters/controllers/get-models.js | 1 + src/adapters/controllers/index.js | 2 +- src/adapters/datasources/datasource-file.js | 4 +- src/adapters/datasources/datasource-memory.js | 4 +- .../datasources/datasource-mongodb.js | 108 +++++++------ .../datasources/datasource-solidpod.js | 4 +- src/adapters/session.js | 9 -- src/aegis.js | 23 ++- src/domain/datasource-factory.js | 123 +++++++++------ src/domain/datasource.js | 88 ++++------- src/domain/distributed-cache.js | 53 ++++--- src/domain/domain-events.js | 1 + src/domain/event-router.js | 47 +++--- src/domain/index.js | 22 ++- src/domain/make-ports.js | 21 ++- src/domain/make-relations.js | 145 ++++++++++-------- src/domain/model.js | 83 +++++----- src/domain/shared-memory.js | 69 ++++++--- src/domain/thread-pool.js | 110 +++++++------ src/domain/use-cases/create-model.js | 6 +- src/domain/use-cases/edit-model.js | 4 +- src/domain/use-cases/index.js | 123 ++++++++------- src/domain/use-cases/invoke-port.js | 8 +- src/domain/use-cases/list-models.js | 2 +- src/domain/use-cases/load-models.js | 50 ++---- src/domain/use-cases/remove-model.js | 5 +- src/domain/util/change-data-capture.js | 62 ++++++-- start.sh | 9 +- 33 files changed, 674 insertions(+), 559 deletions(-) create mode 100755 forkrun create mode 100644 forkrun.c delete mode 100644 src/adapters/session.js diff --git a/.prettierrc b/.prettierrc index 222861c3..b51b0a9f 100644 --- a/.prettierrc +++ b/.prettierrc @@ -1,4 +1,7 @@ { "tabWidth": 2, - "useTabs": false + "useTabs": false, + "arrowParens": "avoid", + "singleQuote": true, + "semi": false } diff --git a/forkrun b/forkrun new file mode 100755 index 0000000000000000000000000000000000000000..84ecb5c0d6cbf62cab006bd1992be1fd7b640a46 GIT binary patch literal 50436 zcmeI5Yitx%6vxj#S{7UA3dNvey3t@z5Cp|kkup501(iZ6SP6Prw%c}XyNmlkuvG_2 z6)~D_f(`K%BpQoitmua}qPCD2;}=s+Pz*6ejVUo26iHZP#dBt6+u2bef%wV)NzdH# zzmK!${_bqQ-Rx(-|MX`ek&D>e>=W2)@`&0gz!A~q>=o=vtyp+x`P%ZTO6kotV{vZR z-<0Ip!4s9LF0ZN19X8f$xqW15E``Ls*055^@aCi?xN+t?Hs9RLdaEeWvab>Il0Fi1 zpj0B6+K?FOa^|~zftgPjh>5o6^I7?rpmvb00@+?_cy-+Q#T)xUY4b6H9xHZhtdbm?ua{Jkp zs*l-@f%QMjlBMQp67@Ou$y&@SUnVxmW6yOnk#6jvkLW8NyUEgDv2I{3iDdu zz4e;o^Hlr3-JOH+z3<5PDZp;-iC23Hjo5rg{%~K$)8O`ZU zOP=hSnNr6@u1#t@PHA%W&3~nh2KIPfzFlhW@|2vJ&U1MDp-+jbrB*)B4r(n+%UaQV zZ?kx4y4LG+Ip*vyOHbuFd=Io(_RN<#i_N|2Rb8wl?mB#b&rkvN8lP`5+SjJe_*`Ru z?6*E|ut4h_ZYkf#Ow)ZHb%gi6i|=!G``ucG+$BehINuhJx5X1|@nyF73NyZ=vg)p? z)yvi5dzMf(URZS}O-)dEb2zjq5RS)VajJ{O8?Dg1=Q_oDtL?2^EtNa1e4p3w`PTh% z(Q+47ZpHeoTe2*98)AN)IT)>ZHPx@1_h$XNIl-@+*M|MNd1u?N(>7ziabMY3iU$aQ z00@8p2!H?xfB*=900@8p2!H?xfB*=900@8p2!H?xfB*=900@8p2!H?xfB*=900@8p z2!H?xfB*=900@8p2!H?xfB*=900@8p2!H?xfB*=900@8p2!Ox^6EJ^&-cKEl-=V+Y zgjfgyAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4ea zAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JNY0w4eaAOHd&00JOzK?DN4s)2nw z`xtf?d)D%@0k@OcZ?v?;q~)_)9o)#ek$ye5-{t1|dhH$Vp9H+U>!AO*Dm1Xf%R8d(lUQcU6&9$_SYa|aHYaw!rUM3?eyrk=H z^3o3Dsc(~5-!488HH(q!q|MDxO9 z^UZOMHA_il*b08y^#+GdIdrQ-a~V17cRDnmtnn_4<@OTx0`@}oY`<*HImTTS;G_CG zb0s8|Fg1=#xEnp$Loh~+bNMe`1*t1*zkgHM$EQ`yI?J+{`}p|e&npcFw4S|&U2GiD zp_2i(v3;}WoKGz~>auo6|gDF<&yO)GdKSF z*5msR)WkkMQP=1Fa$um*U0IWNw)MT%N7}#s;OUB|X1spB@!4xikKX(CvZ3M7qrWUW z@l4CE+3Bu>uMH%64jg@Q(#{#TH0&Mj?>n`+X8&MCe}35~Kka<+g(;^Oy#4W>;A3m| zc29cXeEBbS{QVEx{h{k#3U +#include +#include +#include + +#define BUFSIZE 256 + +char* envvar_cmd = "FORKRUN_CMD"; +char* envvar_arg = "FORKRUN_ARG"; +char cmd[BUFSIZE]; +char arg[BUFSIZE]; + +int main (void) { + pid_t pid; + + if ((pid = fork()) == 0) { + snprintf(cmd, BUFSIZE, "%s", getenv(envvar_cmd)); + snprintf(arg, BUFSIZE, "%s", getenv(envvar_arg)); + char* const argv[] = { cmd, arg, NULL }; + + if (execv(cmd, argv) < 0) { + perror("execv error"); + } + } else if (pid < 0) { + perror("fork error"); + } else { + return EXIT_SUCCESS; + } + + return EXIT_FAILURE; +} diff --git a/package.json b/package.json index 6d537c3e..90fb45b2 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,8 @@ "format": "yarn pretty -- --write", "prepublish": "yarn build", "stats": "git ls-files | xargs wc -l", - "test": "jest" + "test": "jest", + "buildc": "gcc forkrun.c -o forkrun" }, "peerDependencies": { "core-js": "^3" @@ -69,6 +70,7 @@ "jsdoc-to-markdown": "7.1.1", "mocha": "10.0.0", "mocha-esm": "1.1.1", + "prettier": "2.7.1", "split": "1.0.1", "standard": "^17.0.0", "yarn": "1.22.19" diff --git a/src/adapters/controllers/get-model-by-id.js b/src/adapters/controllers/get-model-by-id.js index 3dc1bf0e..3748120c 100644 --- a/src/adapters/controllers/get-model-by-id.js +++ b/src/adapters/controllers/get-model-by-id.js @@ -9,9 +9,10 @@ export default function getModelByIdFactory (findModel) { try { httpRequest.log(getModelById.name) - const id = httpRequest.params.id - const query = httpRequest.query - const model = await findModel({ id, query }) + const model = await findModel({ + id: httpRequest.params.id, + query: httpRequest.query + }) const { content, contentType } = getContent( httpRequest, diff --git a/src/adapters/controllers/get-models.js b/src/adapters/controllers/get-models.js index 5bf4679e..919a1090 100644 --- a/src/adapters/controllers/get-models.js +++ b/src/adapters/controllers/get-models.js @@ -20,6 +20,7 @@ export default function getModelsFactory (listModels) { httpRequest.stream = true return } + const { content, contentType } = getContent(httpRequest, models) return { diff --git a/src/adapters/controllers/index.js b/src/adapters/controllers/index.js index 3422c421..74bf06d5 100644 --- a/src/adapters/controllers/index.js +++ b/src/adapters/controllers/index.js @@ -50,7 +50,7 @@ export const initCache = () => { async function loadModelInstances () { console.time(label) - //await Promise.allSettled(specs.map(async m => m && m.fn ? m.fn() : false)) + await Promise.allSettled(specs.map(async m => (m && m.fn ? m.fn() : false))) console.timeEnd(label) } diff --git a/src/adapters/datasources/datasource-file.js b/src/adapters/datasources/datasource-file.js index 94815e6e..ef977130 100644 --- a/src/adapters/datasources/datasource-file.js +++ b/src/adapters/datasources/datasource-file.js @@ -9,8 +9,8 @@ export class DataSourceFile extends DataSourceMemory { /** * @param {Set} map */ - constructor(map, factory, name) { - super(map, factory, name) + constructor(map, name, options) { + super(map, name, options) this.file = this.getFilePath() this.className = DataSourceFile.name } diff --git a/src/adapters/datasources/datasource-memory.js b/src/adapters/datasources/datasource-memory.js index fd1e1f21..7cd37a04 100644 --- a/src/adapters/datasources/datasource-memory.js +++ b/src/adapters/datasources/datasource-memory.js @@ -6,8 +6,8 @@ import DataSource from '../../domain/datasource' * Temporary in-memory storage. */ export class DataSourceMemory extends DataSource { - constructor (map, factory, name, options) { - super(map, factory, name, options) + constructor (map, name, options) { + super(map, name, options) this.className = DataSourceMemory.name } diff --git a/src/adapters/datasources/datasource-mongodb.js b/src/adapters/datasources/datasource-mongodb.js index 9bc376b9..1df28d32 100644 --- a/src/adapters/datasources/datasource-mongodb.js +++ b/src/adapters/datasources/datasource-mongodb.js @@ -1,26 +1,22 @@ -"use strict" - -import { EventBrokerFactory } from "../../domain" - -const broker = EventBrokerFactory.getInstance() +'use strict' const HIGHWATERMARK = 50 -const mongodb = require("mongodb") +const mongodb = require('mongodb') const { MongoClient } = mongodb -const { DataSourceMemory } = require("./datasource-memory") -const { Transform, Writable } = require("stream") -const qpm = require("query-params-mongo") +const { DataSourceMemory } = require('./datasource-memory') +const { Transform, Writable } = require('stream') +const qpm = require('query-params-mongo') const processQuery = qpm({ - autoDetect: [{ fieldPattern: /_id$/, dataType: "objectId" }], - converters: { objectId: mongodb.ObjectId }, + autoDetect: [{ fieldPattern: /_id$/, dataType: 'objectId' }], + converters: { objectId: mongodb.ObjectId } }) -const url = process.env.MONGODB_URL || "mongodb://localhost:27017" -const configRoot = require("../../config").hostConfig +const url = process.env.MONGODB_URL || 'mongodb://localhost:27017' +const configRoot = require('../../config').hostConfig const dsOptions = configRoot.adapters.datasources.DataSourceMongoDb.options || { runOffline: true, - numConns: 2, + numConns: 2 } const cacheSize = configRoot.adapters.cacheSize || 3000 @@ -31,7 +27,7 @@ const connections = [] const mongoOpts = { //useNewUrlParser: true, - useUnifiedTopology: true, + useUnifiedTopology: true } /** @@ -40,8 +36,8 @@ const mongoOpts = { * even when the database is offline. */ export class DataSourceMongoDb extends DataSourceMemory { - constructor (map, factory, name, options = {}) { - super(map, factory, name, options) + constructor (map, name, options = {}) { + super(map, name, options) this.cacheSize = cacheSize this.mongoOpts = mongoOpts this.className = this.constructor.name @@ -57,7 +53,7 @@ export class DataSourceMongoDb extends DataSourceMemory { const client = new MongoClient(this.url, this.mongoOpts) await client.connect() connections.push(client) - client.on("connectionClosed", () => + client.on('connectionClosed', () => connections.splice(connections.indexOf(client), 1) ) } @@ -93,7 +89,7 @@ export class DataSourceMongoDb extends DataSourceMemory { async loadModels () { try { const cursor = (await this.collection()).find().limit(this.cacheSize) - cursor.forEach((model) => super.saveSync(model.id, model)) + cursor.forEach(model => super.saveSync(model.id, model)) } catch (error) { console.error({ fn: this.loadModels.name, error }) } @@ -101,9 +97,7 @@ export class DataSourceMongoDb extends DataSourceMemory { async findDb (id) { try { - const model = await (await this.collection()).findOne({ _id: id }) - // save it to the cache - return super.saveSync(id, model) || model // saveSync fails on fresh start + return (await this.collection()).findOne({ _id: id }) } catch (error) { console.error({ fn: this.findDb.name, error }) } @@ -124,7 +118,7 @@ export class DataSourceMongoDb extends DataSourceMemory { ) { // cached can be empty object, save after finding in db const data = await this.findDb(id) - super.saveSync(id, data) + if (data) super.saveSync(id, data) return data } return cached @@ -143,9 +137,11 @@ export class DataSourceMongoDb extends DataSourceMemory { async saveDb (id, data) { try { const clone = JSON.parse(this.serialize(data)) - await ( - await this.collection() - ).replaceOne({ _id: id }, { ...clone, _id: id }, { upsert: true }) + await (await this.collection()).replaceOne( + { _id: id }, + { ...clone, _id: id }, + { upsert: true } + ) return data } catch (error) { console.error({ fn: this.saveDb.name, error }) @@ -164,22 +160,21 @@ export class DataSourceMongoDb extends DataSourceMemory { */ async save (id, data) { try { - const cache = super.saveSync(id, data) + super.saveSync(id, data) try { await this.saveDb(id, data) } catch (error) { // default is true if (!this.runOffline) { - this.deleteSync(id) + super.deleteSync(id) // after delete mem and db are sync'd - console.error("db trans failed, rolled back") + console.error('db trans failed, rolled back') return } // run while db is down - cache will be ahead - console.error("db trans failed, sync it later") - return data + console.error('db trans failed, sync it later') } - return cache + return data } catch (e) { console.error(e) } @@ -199,14 +194,13 @@ export class DataSourceMongoDb extends DataSourceMemory { const ctx = this async function upsert () { - const operations = objects.map((str) => { - const obj = JSON.parse(str) + const operations = objects.map(obj => { return { replaceOne: { filter: { ...filter, _id: obj.id }, replacement: { ...obj, _id: obj.id }, - upsert: true, - }, + upsert: true + } } }) @@ -216,7 +210,7 @@ export class DataSourceMongoDb extends DataSourceMemory { const result = await col.bulkWrite(operations) console.log(result.getRawResponse()) objects = [] - } catch (error) { } + } catch (error) {} } } @@ -233,13 +227,13 @@ export class DataSourceMongoDb extends DataSourceMemory { end (chunk, _, done) { objects.push(chunk) done() - }, + } }) - writable.on("finish", async () => await upsert()) + writable.on('finish', async () => await upsert()) return writable - } catch (error) { } + } catch (error) {} } /** @@ -253,7 +247,6 @@ export class DataSourceMongoDb extends DataSourceMemory { * @returns */ async mongoFind ({ filter, sort, limit, aggregate, skip, offset } = {}) { - console.log({ filter }) let cursor = (await this.collection()).find(filter) if (sort) cursor = cursor.sort(sort) if (limit) cursor = cursor.limit(limit) @@ -275,12 +268,13 @@ export class DataSourceMongoDb extends DataSourceMemory { streamList ({ writable, serialize, transform, options }) { try { let first = true + const serializer = new Transform({ writableObjectMode: true, // start of array construct (callback) { - this.push("[") + this.push('[') callback() }, @@ -288,7 +282,7 @@ export class DataSourceMongoDb extends DataSourceMemory { transform (chunk, _encoding, callback) { // comma-separate if (first) first = false - else this.push(",") + else this.push(',') // serialize record this.push(JSON.stringify(chunk)) @@ -297,25 +291,28 @@ export class DataSourceMongoDb extends DataSourceMemory { // end of array flush (callback) { - this.push("]") + this.push(']') callback() - }, + } }) return new Promise(async (resolve, reject) => { const readable = (await this.mongoFind(options)).stream() - readable.on("error", reject) - readable.on("end", resolve) + readable.on('error', reject) + readable.on('end', resolve) // optionally transform db stream then pipe to output - if (serialize && transform) - readable.pipe(transform).pipe(serializer).pipe(writable) - else if (serialize) readable.pipe(serializer).pipe(writable) + if (transform && serialize) + readable + .pipe(transform) + .pipe(serializer) + .pipe(writable) else if (transform) readable.pipe(transform).pipe(writable) + else if (serialize) readable.pipe(serializer).pipe(writable) else readable.pipe(writable) }) - } catch (error) { } + } catch (error) {} } processOptions (param) { @@ -351,8 +348,9 @@ export class DataSourceMongoDb extends DataSourceMemory { const { writable = null, transform = null, - serialize = true, - query = null, + serialize = false, + options = null, + query = null } = param try { @@ -367,7 +365,7 @@ export class DataSourceMongoDb extends DataSourceMemory { writable, serialize, transform, - options: processedOptions, + options: processedOptions }) } @@ -381,7 +379,7 @@ export class DataSourceMongoDb extends DataSourceMemory { return { total: await this.countDb(), cached: this.getCacheSize(), - bytes: this.getCacheSizeBytes(), + bytes: this.getCacheSizeBytes() } } diff --git a/src/adapters/datasources/datasource-solidpod.js b/src/adapters/datasources/datasource-solidpod.js index 2bc35c41..f822c0e8 100644 --- a/src/adapters/datasources/datasource-solidpod.js +++ b/src/adapters/datasources/datasource-solidpod.js @@ -25,8 +25,8 @@ async function login () { */ export class DataSourceSolidPod extends DataSourceFile { // ... import statement for authentication, which includes the fetch function, is omitted for brevity. - constructor (map, factory, name) { - super(map, factory, name) + constructor (map, name, options) { + super(map, name) // this.file = PODURL + name + ".json"; // console.log(this.file); // login().then(() => console.log(`logged into pod ${PODURL}`)); diff --git a/src/adapters/session.js b/src/adapters/session.js deleted file mode 100644 index 17399ec4..00000000 --- a/src/adapters/session.js +++ /dev/null @@ -1,9 +0,0 @@ -'use strict' - -import { AsyncLocalStorage } from "async_hooks" - - - -export function session(req, res, next) { - next() -} \ No newline at end of file diff --git a/src/aegis.js b/src/aegis.js index 5cc17814..178c8775 100644 --- a/src/aegis.js +++ b/src/aegis.js @@ -12,7 +12,6 @@ const { badUserRoute, reload } = DomainEvents const { pathToRegexp, match } = require('path-to-regexp') const { nanoid } = require('nanoid') const { EventEmitter } = require('stream') -const { isMainThread, parentPort } = require('worker_threads') const broker = EventBrokerFactory.getInstance() const { @@ -43,13 +42,13 @@ const endpointPortId = e => `${modelPath}/${e}/:id/service/ports/:port` * @extends {Map} */ class RouteMap extends Map { - find(path) { + find (path) { const routeInfo = [...super.values()].find(v => v.regex.test(path)) if (routeInfo) return { ...routeInfo, params: routeInfo.matcher(path).params } } - set(path, method) { + set (path, method) { if (super.has(path)) { super.set(path, { ...super.get(path), ...method }) return @@ -62,13 +61,13 @@ class RouteMap extends Map { }) } - has(path) { + has (path) { this.hasPath = path this.routeInfo = this.find(path) return this.routeInfo ? true : false } - get(path) { + get (path) { // if equal we already know the answer return path === this.hasPath ? this.routeInfo : this.find(path) } @@ -76,19 +75,19 @@ class RouteMap extends Map { const routes = new RouteMap() -function buildPath(ctrl, path) { +function buildPath (ctrl, path) { return ctrl.path && ctrl.path[path.name] ? ctrl.path[path.name] : path(ctrl.endpoint) } -function checkAllowedMethods(ctrl, method) { +function checkAllowedMethods (ctrl, method) { if (!ctrl.ports.methods) return true return ctrl.ports.methods.includes(method) } const router = { - autoRoutes(path, method, controllers, adapter, ports = false) { + autoRoutes (path, method, controllers, adapter, ports = false) { controllers() .filter(ctrl => !ctrl.internal) .forEach(ctrl => { @@ -107,7 +106,7 @@ const router = { }) }, - userRoutes(controllers) { + userRoutes (controllers) { try { controllers().forEach(ctlr => routes.set(ctlr.path, ctlr)) } catch (error) { @@ -116,13 +115,13 @@ const router = { } }, - adminRoute(controller, adapter) { + adminRoute (controller, adapter) { const adminPath = `${apiRoot}/config` routes.set(adminPath, { get: adapter(controller()) }) } } -function makeRoutes() { +function makeRoutes () { router.autoRoutes(endpoint, 'get', liveUpdate, http) router.autoRoutes(endpoint, 'get', getModels, http) router.autoRoutes(endpoint, 'post', postModels, http) @@ -153,7 +152,7 @@ function makeRoutes() { * @param {Response} res * @returns */ -async function handle(path, method, req, res) { +async function handle (path, method, req, res) { const routeInfo = routes.get(path) if (!routeInfo) { diff --git a/src/domain/datasource-factory.js b/src/domain/datasource-factory.js index a1635dfd..cf08feec 100644 --- a/src/domain/datasource-factory.js +++ b/src/domain/datasource-factory.js @@ -12,28 +12,41 @@ import ModelFactory from '.' import * as adapters from '../adapters/datasources' import { dsClasses } from '../adapters/datasources' -import sysconf from '../config' +import configRoot from '../config' import DataSource from './datasource' import { withSharedMemory } from './shared-memory' import compose from './util/compose' -const defaultAdapter = sysconf.hostConfig.adapters.defaultDatasource +const debug = /\*|datasource/i.test(process.env.DEBUG) +const defaultAdapter = configRoot.hostConfig.adapters.defaultDatasource const DefaultDataSource = adapters[defaultAdapter] if (!DefaultDataSource) throw new Error('no default datasource') -const RestrictedDsMixin = superclass => class extends superclass { - getFactory () { - throw new Error('unauthorized') +const FACTORY = Symbol() + +const DsFactoryAccessors = superclass => + class extends superclass { + set factory (value) { + this[FACTORY] = value + } + get factory () { + return this[FACTORY] + } + } + +const accessFactory = DsClass => class extends DsFactoryAccessors(DsClass) {} + +class SomeClass {} + +const NewFeature = superclass => + class extends superclass { + newMethod () {} } -} -const restrictDs = dsClass => class RestrictedDs extends RestrictedDsMixin(dsClass) { } +const newFeature = ExistingClass => class extends NewFeature(ExistingClass) {} -function createRestrictedDs (DsClass, map, factory, name, options) { - const RestrictedDs = restrictDs(DsClass) - return new RestrictedDs(map, factory, name, options) -} +newFeature(SomeClass) /** * Manages each domain model's dedicated datasource. @@ -47,7 +60,6 @@ function createRestrictedDs (DsClass, map, factory, name, options) { const DataSourceFactory = (() => { // References all DSes let dataSources - let restrictedDatasources /** * @param {*} name @@ -73,6 +85,10 @@ const DataSourceFactory = (() => { */ /** + * Create the class based on the options passed in, + * customizations defined in the ModelSpec, or, if no + * configuration is provided, use the default adapter + * for the host instance. * * @param {import('.').ModelSpecification} spec * @param {dsOpts} options @@ -81,9 +97,7 @@ const DataSourceFactory = (() => { function createDataSourceClass (spec, options) { const { memoryOnly, ephemeral, adapterName } = options - if (memoryOnly || ephemeral) { - return dsClasses['DataSourceMemory'] - } + if (memoryOnly || ephemeral) return dsClasses['DataSourceMemory'] if (adapterName) return adapters[adapterName] || DefaultDataSource @@ -98,6 +112,21 @@ const DataSourceFactory = (() => { return DefaultDataSource } + /** + * Apply any compositional mixins specifed in {@link options}. + * Compostion allows us to observe the open/closed principle + * and add new feature/functions arbitrarily to any datasource + * class in the hierarchy without having to modify it. + * + * @param {typeof DataSource} DsClass + * @param {dsOpts} options + * @returns {typeof DataSource} + */ + function extendDataSourceClass (DsClass, options = {}) { + const mixins = [accessFactory].concat(options.mixins || []) + return compose(...mixins)(DsClass) + } + /** * * @param {string} name @@ -105,51 +134,42 @@ const DataSourceFactory = (() => { * @returns {DataSource} */ function createDataSource (name, options) { - if (!name) throw new Error('missing name', { fn: createDataSource.name, options }) - const spec = ModelFactory.getModelSpec(name) - if (!spec) return - const dsMap = options.dsMap || new Map() const DsClass = createDataSourceClass(spec, options) + const DsExtendedClass = extendDataSourceClass(DsClass, options) - const DsMixinsClass = options.mixins?.length > 0 - ? compose(...options.mixins)(DsClass) - : DsClass + const newDs = new DsExtendedClass(dsMap, name, options) + newDs.factory = this // setter to avoid exposing in ctor - const newDs = new DsMixinsClass(dsMap, this, name, options) - const restrictedDs = createRestrictedDs(DsMixinsClass, dsMap, null, name, options) + if (!options.ephemeral) dataSources.set(name, newDs) + + debug && console.debug({ newDs }) - if (!options.ephemeral) { - dataSources.set(name, newDs) - restrictedDatasources.set(name, restrictedDs) - } return newDs } /** * Get the datasource for each model. + * * @param {string} name - model name * @param {dsOpts} options * @returns {import('./datasource').default} */ - function getDataSource (name, options = {}) { + function getDataSource (name, options) { const upperName = name.toUpperCase() - if (!dataSources) { - dataSources = new Map() - restrictedDatasources = new Map() - } + if (!dataSources) dataSources = new Map() - if (dataSources.has(upperName)) { - return dataSources.get(upperName) - } + if (dataSources.has(upperName)) return dataSources.get(upperName) return createDataSource(upperName, options) } /** + * Create a datasource based on {@link SharedArrayBuffer} that will live in + * shared memory to be accessed in parallel by multiple coordinating threads. * * @param {string} name * @param {dsOpts} [options] @@ -157,24 +177,33 @@ const DataSourceFactory = (() => { */ function getSharedDataSource (name, options) { const upperName = name.toUpperCase() - let opts = options - if (!opts) opts = {} - if (!opts.mixins) opts.mixins = [] - if (!dataSources) { - dataSources = new Map() - restrictedDatasources = new Map() - } + if (!dataSources) dataSources = new Map() - if (dataSources.has(upperName)) { - return dataSources.get(upperName) - } + if (dataSources.has(upperName)) return dataSources.get(upperName) return withSharedMemory(createDataSource, this, upperName, options) } + /** + * Return a {@link Proxy} of the ds that traps calls to any functions + * that could have security or privacy implications if accessed by + * hosted code. + * + * @param {string} name + * @returns {ProxyHandler} + */ function getRestrictedDataSource (name) { - if (restrictedDatasources) return restrictedDatasources.get(name) + return new Proxy(getDataSource(name), { + get (target, key) { + if (key === 'factory') { + throw new Error('unauthorized') + } + }, + ownKeys (target) { + return [] + } + }) } function close () { diff --git a/src/domain/datasource.js b/src/domain/datasource.js index b5b99833..d000ee5f 100644 --- a/src/domain/datasource.js +++ b/src/domain/datasource.js @@ -3,7 +3,6 @@ import { changeDataCapture } from './util/change-data-capture' /** change data capture */ -let CDC = {} const cdcEnabled = false // /true/i.test('CHANGE_DATA_CAPTURE') function roughSizeOfObject (...objects) { @@ -37,53 +36,35 @@ function roughSizeOfObject (...objects) { return bytes } +const FACTORY = Symbol() + /** * Data source base class */ export default class DataSource { - constructor (map, factory, name, options = {}) { + constructor (map, name, options) { this.className = this.constructor.name this.dsMap = map - this.factory = factory this.name = name this.options = options } - /** - * - * @param {*} id - * @param {*} data - * @returns - */ - changeDataCapture (id, data) { - const cdc = CDC[this.name] && CDC[this.name][id] ? CDC[this.name][id] : null - const deserialized = JSON.parse(JSON.stringify(data)) - if (cdc) { - const indeces = [] - indeces[0] = cdc.changes.length - Object.keys(data).forEach(key => (cdc.proxy[key] = deserialized[key])) - cdc.indeces[1] = cdc.changes.length - cdc.metadata.push({ time: Date.now(), indeces, user: null }) - return cdc.changes.slice(cdc.indeces[0], cdc.indeces[1] - cdc.indeces[0]) - } - let changes - const writeEvent = { time: Date.now(), indeces: [], user: null } - CDC[this.name] = {} - CDC[this.name][id] = {} - CDC[this.name][id].changes = changes = [] - writeEvent.indeces[0] = 0 - CDC[this.name][id].proxy = changeDataCapture(deserialized, changes) - Object.keys(data).forEach( - key => (CDC[this.name][id].proxy[key] = data[key]) - ) - writeEvent.indeces[1] = changes.length - CDC[this.name][id].metadata = [] - CDC[this.name][id].metadata.push(writeEvent) + changeHandlers () {} + + handleChanges (id, data) { + if (!cdcEnabled) return data + + const prev = this.findSync(id) + if (!prev) return data + + const proxyClone = changeDataCapture({ ...prev }, this.changeHandlers()) + return Object.freeze(Object.assign(proxyClone, data)) } /** - * Upsert model instance asynchronomously - * to handle I/0 latency and concurrency + * Upsert model instance asynchronously + * to handle I/0 latency & concurrency. + * * @param {*} id * @param {*} data * @returns {Promise} @@ -93,16 +74,16 @@ export default class DataSource { } /** - * Synchronous cache write. Dont use - * this method to call a remote datasource. - * Use async {@link save} instead. + * Synchronous cache write. Don't use this + * method to save to a remote data source; + * use asychronous {@link save} method. + * * @param {string} id * @param {import(".").Model} data * @returns */ saveSync (id, data) { - if (cdcEnabled) this.changeDataCapture(id, data) - return this.mapSet(id, data) + return this.mapSet(id, this.handleChanges(id, data)) } /** @@ -225,7 +206,7 @@ export default class DataSource { ) ) - if (query.__count === 'stats') + if (query.__count) return { list: loo.length, total: this.getCacheSize(), @@ -258,15 +239,7 @@ export default class DataSource { * * @param {*} options */ - async load (options) { } - - /** - * - * @returns {import("./datasource-factory").DataSourceFactory} - */ - getFactory () { - return this.factory - } + async load (options) {} /** * @@ -303,7 +276,6 @@ export default class DataSource { return this.countSync() * roughSizeOfObject(this.listSync({ __count: 1 })) } - /** * Subclasses must override this method to run * the query against the datastore it accesses. @@ -316,8 +288,8 @@ export default class DataSource { /** * Subclasses must override this method to run * the query against the datastore it accesses. - * @param {foreignKey} filter - * @returns + * @param {foreignKey} filter + * @returns */ manyToOne (filter) { return this.find(filter) @@ -326,8 +298,8 @@ export default class DataSource { /** * Subclasses must override this method to run * the query against the datastore it accesses. - * @param {{foreignKey:id}} filter - * @returns + * @param {{foreignKey:id}} filter + * @returns */ containsMany (filter) { return this.listSync(filter) @@ -360,9 +332,9 @@ export default class DataSource { } /** - * - */ - close () { } + * + */ + close () {} getClassName () { return this.className diff --git a/src/domain/distributed-cache.js b/src/domain/distributed-cache.js index 719be0fa..dc1c9fb4 100644 --- a/src/domain/distributed-cache.js +++ b/src/domain/distributed-cache.js @@ -5,7 +5,7 @@ import { importRemoteCache } from '.' import domainEvents from '../domain/domain-events' import asyncPipe from './util/async-pipe' import { workerData } from 'worker_threads' -import { UseCaseService } from './use-cases' +import { modelsInDomain, UseCaseService } from './use-cases' const { internalCacheRequest, internalCacheResponse, @@ -366,7 +366,7 @@ export default function DistributedCache ({ publish({ ...event, eventName: externalCrudEvent(eventName) }) ) - function handlecrudeEvent (modelSpecs) { } + function handlecrudeEvent (modelSpecs) {} /** * Subcribe to external CRUD events for related models. * Also listen for request and response events for locally @@ -374,7 +374,21 @@ export default function DistributedCache ({ */ function listen () { const modelSpecs = models.getModelSpecs() - const localModels = [workerData.poolName.toUpperCase()] + const localModels = modelsInDomain(workerData.poolName) + const relatedModels = [ + ...modelSpecs + .filter(spec => spec.relations) + .map(spec => Object.values(spec.relations)) + .flat(2) + .map(relation => relation.modelName) + .reduce((unique, modelName) => unique.add(modelName), new Set()) + ] + + console.debug({ relatedModels }) + const remoteRelated = relatedModels.filter( + related => !localModels.includes(related) + ) + const remoteModels = [ ...new Set( // deduplicate modelSpecs @@ -406,12 +420,13 @@ export default function DistributedCache ({ externalCacheResponse(modelName), internalCacheResponse(modelName) ) + // + ;[ // listen for CRUD events from related, external models - ;[ - models.getEventName(models.EventTypes.UPDATE, modelName), - models.getEventName(models.EventTypes.CREATE, modelName), - models.getEventName(models.EventTypes.DELETE, modelName) - ].forEach(receiveCrudBroadcast) + models.getEventName(models.EventTypes.UPDATE, modelName), + models.getEventName(models.EventTypes.CREATE, modelName), + models.getEventName(models.EventTypes.DELETE, modelName) + ].forEach(receiveCrudBroadcast) }) // Respond to external search requests and broadcast local CRUD events @@ -422,17 +437,17 @@ export default function DistributedCache ({ externalCacheResponse(modelName) ) - // fulfillDeploymentRequest( - // externalDeploymentRequest(modelName), - // externalDeploymentResponse(modelName) - // ) - - // Listen for local CRUD events and forward externally - ;[ - models.getEventName(models.EventTypes.UPDATE, modelName), - models.getEventName(models.EventTypes.CREATE, modelName), - models.getEventName(models.EventTypes.DELETE, modelName) - ].forEach(broadcastCrudEvent) + // fulfillDeploymentRequest( + // externalDeploymentRequest(modelName), + // externalDeploymentResponse(modelName) + // ) + + // Listen for local CRUD events and forward externally + ;[ + models.getEventName(models.EventTypes.UPDATE, modelName), + models.getEventName(models.EventTypes.CREATE, modelName), + models.getEventName(models.EventTypes.DELETE, modelName) + ].forEach(broadcastCrudEvent) }) } diff --git a/src/domain/domain-events.js b/src/domain/domain-events.js index 239d9cf4..fea39579 100644 --- a/src/domain/domain-events.js +++ b/src/domain/domain-events.js @@ -7,6 +7,7 @@ export function registerEvents (broker) { // broker.on('shutdown', signal => process.exit(signal || 0)) broker.on('emitEvent', event => broker.notify(event.eventName, event)) + broker.on('ping', event => broker.notify('to_main', event.data.jobData)) } const domainEvents = { diff --git a/src/domain/event-router.js b/src/domain/event-router.js index ea935366..399b679c 100644 --- a/src/domain/event-router.js +++ b/src/domain/event-router.js @@ -1,42 +1,44 @@ 'use strict' import { workerData, BroadcastChannel, isMainThread } from 'worker_threads' +import { modelsInDomain } from './use-cases' const modelName = isMainThread ? null : workerData.poolName export class PortEventRouter { - constructor(models, broker) { + constructor (models, broker) { this.models = models this.broker = broker } - getThreadLocalPorts() { - return Object.values(this.models.getModelSpec(modelName)) - .filter(port => port) - .filter(port => port.consumesEvent || port.producesEvent) - .map(port => ({ - ...port, - modelName - })) + getThreadLocalPorts () { + return this.models + .getModelSpecs() + .filter( + spec => + (spec.domain && + modelsInDomain(spec.domain).includes(spec.modelName)) || + spec.modelName === modelName + ) + .flatMap(spec => + Object.values(spec.ports) + .filter(port => port.consumesEvent || port.producesEvent) + .map(port => ({ ...port, modelName: spec.modelName })) + ) } - getThreadRemotePorts() { + getThreadRemotePorts () { return this.models .getModelSpecs() .filter(spec => spec.ports && spec.modelName !== modelName) - .map(spec => + .flatMap(spec => Object.values(spec.ports) .filter(port => port.consumesEvent || port.producesEvent) - .map(port => ({ - ...port, - modelName: spec.modelName - })) + .map(port => ({ ...port, modelName: spec.modelName })) ) - .flat() } - handleChannelEvent(msg) { - if (msg.data.eventName) - this.broker.notify(msg.data.eventName, msg.data) + handleChannelEvent (msg) { + if (msg.data.eventName) this.broker.notify(msg.data.eventName, msg.data) else { console.log('missing eventName', msg.data) this.broker.notify('missingEventName', msg.data) @@ -49,7 +51,7 @@ export class PortEventRouter { * and forward to pools that consume them. If a producer event is * not consumed by any local thread, foward to service mesh. */ - listen() { + listen () { const localPorts = this.getThreadLocalPorts() const remotePorts = this.getThreadRemotePorts() @@ -59,8 +61,9 @@ export class PortEventRouter { const subscribePorts = remotePorts.filter(remote => localPorts.find(local => local.consumesEvent === remote.producesEvent) ) - const unhandledPorts = localPorts.filter(remote => - !remotePorts.find(local => local.producesEvent === remote.consumesEvent) + const unhandledPorts = localPorts.filter( + remote => + !remotePorts.find(local => local.producesEvent === remote.consumesEvent) ) const services = new Set() diff --git a/src/domain/index.js b/src/domain/index.js index 6b60eb00..cc17b191 100644 --- a/src/domain/index.js +++ b/src/domain/index.js @@ -136,6 +136,7 @@ * @property {()=>[]} list * @property {(id)=>{}} find * @property {(id,data)=>data} save + * @property {import('.').ports} factory */ /** @@ -202,16 +203,14 @@ import { importAdapterCache, importServiceCache, importWorkerCache, - importPortCache, + importPortCache } from './import-remotes' /** * * @param {Model} model */ -const createEvent = model => ({ - model: model -}) +const createEvent = model => ({ model }) /** * @param {{updated:Model,changes:Object}} param0 @@ -225,12 +224,12 @@ const updateEvent = ({ updated, changes }) => ({ const deleteEvent = model => ({ modelId: ModelFactory.getModelId(model), - model: model + model }) const onloadEvent = model => ({ modelId: ModelFactory.getModelId(model), - model: model + model }) function getUniqueId () { @@ -270,17 +269,16 @@ function register ({ }) const dependencies = { - getUniqueId, ...model.dependencies, - ...bindings + ...bindings, + getUniqueId } ModelFactory.registerModel({ ...model, - modelName: modelName, - domain: typeof model.domain === 'string' - ? model.domain.toUpperCase() - : modelName, + modelName, + domain: + typeof model.domain === 'string' ? model.domain.toUpperCase() : modelName, dependencies, factory: model.factory(dependencies), worker: workers[modelName], diff --git a/src/domain/make-ports.js b/src/domain/make-ports.js index faf794c4..a40593ab 100644 --- a/src/domain/make-ports.js +++ b/src/domain/make-ports.js @@ -93,11 +93,9 @@ function getPortCallback (cb) { /** * Are we compensating for a canceled transaction? * @param {import(".").Model} model - * @returns {Promise} */ -async function isUndoRunning (model) { - const latest = await model.find(model.getId()) - return latest.compensate +function isUndoRunning (model) { + return model.findSync(model.getId()).compensate } function hydrate (broker, datasource, eventInfo) { @@ -125,13 +123,14 @@ function addPortListener (portName, portConf, broker, datasource) { portConf.consumesEvent, async function (eventInfo) { const model = hydrate(broker, datasource, eventInfo) - // Don't call any more ports if undoing. - - console.info( - // if (await isUndoRunning(model)) { - // console.log('undo running, canceling port operation') - // return - // } `event ${eventInfo.eventName} fired: calling port ${portName}`, + + if (isUndoRunning(model)) { + console.log('undo running, canceling port operation') + return + } + + console.log( + `event ${eventInfo.eventName} fired: calling port ${portName}`, eventInfo ) // invoke this port diff --git a/src/domain/make-relations.js b/src/domain/make-relations.js index 778bef5f..bba689b9 100644 --- a/src/domain/make-relations.js +++ b/src/domain/make-relations.js @@ -1,6 +1,9 @@ 'use strict' -import DataSource from './datasource' +/** + * @typedef {import('./datasource').default} DataSource + */ + import domainEvents from './domain-events' const { internalCacheRequest, @@ -72,16 +75,44 @@ export const relationType = { return this.manyToOne(model, ds, rel) }, + /** + * retrieve embedded documents from array + * + * @param {*} model + * @param {*} ds + * @param {*} rel + * @returns + */ containsMany: (model, ds, rel) => Promise.all( model[rel.arrayKey].map(arrayItem => ds.find(arrayItem[rel.foreignKey])) ), - custom: x => x + /** + * + * @param {import('.').Model} model + * @param {import('.').datasource} ds + * @param {*} rel + * @param {*} args + * @returns + */ + custom: (model, ds, rel, args) => { + // use restricted datasources, i.e. no access to DataSourceFactory + const rds = ds.factory.getRestrictedDataSource(model.modelName) + const relRds = ds.factory.getRestrictedDataSource(rel.modelName, { + isCached: true, + ephemeral: true + }) + + // if relRds is in the same domain but is remote, this fails + if (rds.domain !== relRds.domain) return null + + return rds[rel.name]({ args, model, ds: relRds, relation: rel }) + } } /** - * If we create a new related object, foreign keys need to reference it + * If we create a new related object, foreign keys need to reference it. */ const updateForeignKeys = { /** @@ -91,32 +122,38 @@ const updateForeignKeys = { * @param {import('./index').relations[x]} relation * @param {import('./model-factory').Datasource} ds */ - [relationType.manyToOne.name] (fromModel, toModels, relation, ds) { - return fromModel.updateSync( + async [relationType.manyToOne.name] (fromModel, toModels, relation, ds) { + return fromModel.update( { [relation.foreignKey]: toModels[0].getId() }, false ) }, - [relationType.oneToOne.name] (fromModel, toModels, relation, ds) { + async [relationType.oneToOne.name] (fromModel, toModels, relation, ds) { return this[relationType.manyToOne.name](fromModel, toModels, relation, ds) }, - [relationType.oneToMany.name] (fromModel, toModels, relation, ds) { - return toModels.map(m => { - const model = ds.findSync(m.id) - ds.saveSync({ - ...model, - [relation.foreignKey]: fromModel.getId() + async [relationType.oneToMany.name] (fromModel, toModels, relation, ds) { + return Promise.all( + toModels.map(async m => { + const model = await ds.find(m.id || m.getId()) + return ds.save(m.id, { + ...model, + [relation.foreignKey]: fromModel.getId() + }) }) - }) + ) + }, + + async [relationType.containsMany.name] (fromModel, toModels, relation, ds) { + toModels.map(model => + model.update({ [relation.foreignKey]: fromModel.getId() }) + ) }, - [relationType.containsMany.name] (fromModel, toModels, relation, ds) { - // console(relation.arrayKey) - // fromModel[relation.arrayKey].concat([ - // ...toModels.map(m => m[relation.foreignKey]) - // ]) + async [relationType.custom.name] (fromModel, toModels, relation, ds) { + const customFn = fromModel[`${relation}UpdateForeignKeys`] + if (customFn === 'function') customFn(toModels, relation, ds) } } @@ -132,7 +169,9 @@ async function createNewModels (args, fromModel, relation, ds) { if (args.length > 0) { const { UseCaseService } = require('.') const service = UseCaseService(relation.modelName.toUpperCase()) - const newModels = await Promise.all(args.map(arg => service.createModel(arg))) + const newModels = await Promise.all( + args.map(arg => service.createModel(arg)) + ) return updateForeignKeys[relation.type](fromModel, newModels, relation, ds) } } @@ -178,6 +217,12 @@ export function requireRemoteObject (model, relation, broker, ...args) { }) } +/** + * Find out if a related model runs locally. + * + * @param {import('.').relations[x]} relation + * @returns {boolean} + */ function isRelatedModelLocal (relation) { return require('.') .default.getModelSpecs() @@ -186,18 +231,6 @@ function isRelatedModelLocal (relation) { .includes(relation.modelName.toUpperCase()) } -function checkDomain (modelName1, modelName2) { - const spec1 = require('.').default.getModelSpec(modelName1) - if (!spec1.domain) throw new Error(`model not in domain ${modelName1}`) - const spec2 = require('.').default.getModelSpec(modelName1) - if (!spec2.domain) throw new Error(`model not in domain ${modelName2}`) - if (spec1.domain !== spec2.domain) throw new Error(`models not in same domain`) -} - -/** - * @typedef {import('./datasource').default} DataSource - */ - /** * Generate functions to retrieve related domain objects. * @param {import("./index").relations} relations @@ -222,37 +255,23 @@ export default function makeRelations (relations, datasource, broker) { return { // the relation function async [relation] (...args) { - // Get or create datasource of related object - const ds = datasource.getFactory().getDataSource(relatedModelName) - - if (rel.type === 'custom') { - checkDomain(relatedModelName, this.getName()) - const rds = datasource - .getFactory() - .getRestrictedDataSource(this.getName()) - const relRds = datasource - .getFactory() - .getRestrictedDataSource(relatedModelName) - - return datasource[relation].call( - rds, - { - args, - relation, - model: this, - ds: relRds - } - ) - } + // Get existing (or create temp) datasource of related object + const local = isRelatedModelLocal(rel) + const createNew = args?.length > 0 - if (args?.length > 0 && isRelatedModelLocal(rel)) - // args mean create new instance(s) of related model + let ds = datasource.factory.getDataSource( + relatedModelName, + local ? {} : { isCached: true, ephemeral: true } + ) + + // args mean create related model instances + if (createNew && local && rel.type !== 'custom') return await createNewModels(args, this, rel, ds) - const models = await relationType[rel.type](this, ds, rel) + const models = await relationType[rel.type](this, ds, rel, args) if (!models || models.length < 1) { - // couldn't find the object locally - try remote instances + // find remote instance(s) const event = await requireRemoteObject( this, rel, @@ -260,18 +279,20 @@ export default function makeRelations (relations, datasource, broker) { ...args ) - // new models: update foreign keys - if (event?.args?.length > 0) + // we now have the code, so get the real ds + ds = ds.factory.getDataSource(rel.modelName) + + if (createNew) updateForeignKeys[rel.type](this, event.model, rel, ds) - return await relationType[rel.type](this, ds, rel) + return await relationType[rel.type](this, ds, rel, args) } return models } } - } catch (e) { - console.error(e) + } catch (error) { + console.error({ fn: makeRelations.name, error }) } }) .reduce((c, p) => ({ ...p, ...c })) diff --git a/src/domain/model.js b/src/domain/model.js index 8c1f5ef9..3760a8b9 100644 --- a/src/domain/model.js +++ b/src/domain/model.js @@ -118,7 +118,7 @@ const Model = (() => { } } - function queueNotice(model) { + function queueNotice (model) { console.debug(queueNotice.name, 'disabled') } @@ -134,7 +134,7 @@ const Model = (() => { * @param {} clonedModel * @returns */ - function rehydrate(clonedModel, model) { + function rehydrate (clonedModel, model) { return model.prototype ? Object.setPrototypeOf(clonedModel, model.prototype) : clonedModel @@ -149,7 +149,7 @@ const Model = (() => { * }} modelInfo * @returns {Model} */ - function make(modelInfo) { + function make (modelInfo) { const { model = {}, spec: { @@ -186,12 +186,12 @@ const Model = (() => { [ID]: dependencies.getUniqueId(), // Called before update is committed - [ONUPDATE](changes) { + [ONUPDATE] (changes) { return onUpdate(this, changes) }, // Called before edelte is committed - [ONDELETE]() { + [ONDELETE] () { return onDelete(this) }, @@ -201,7 +201,7 @@ const Model = (() => { * @param {eventMask} event - event type, see {@link eventMask}. * @returns {Model} - updated model */ - [VALIDATE](changes, event) { + [VALIDATE] (changes, event) { return validate(this, changes, event) }, @@ -211,7 +211,7 @@ const Model = (() => { * @param {number} event * @returns {string} key name/s: create, update, onload, delete */ - getEventMaskName(event) { + getEventMaskName (event) { if (typeof event !== 'number') return const key = Object.keys(eventMask).find(k => eventMask[k] & event) return key @@ -221,7 +221,7 @@ const Model = (() => { * Compensate for downstream transaction failures. * Back out all previous port transactions */ - async undo() { + async undo () { return compensate(this) }, @@ -233,7 +233,7 @@ const Model = (() => { * @param {boolean} [multi] - allow multiple listeners for event, * defaults to `true` */ - addListener(eventName, callback, options) { + addListener (eventName, callback, options) { broker.on(eventName, callback, options) }, @@ -245,7 +245,7 @@ const Model = (() => { * @param {boolean} [forward] - forward event to service mesh, * defaults to `false` */ - emit(eventName, eventData, options) { + emit (eventName, eventData, options) { broker.notify( eventName, { @@ -259,7 +259,7 @@ const Model = (() => { /** @typedef {import('./serializer.js').Serializer} Serializer */ - getDataSourceType() { + getDataSourceType () { return datasource.getClassName() }, @@ -277,9 +277,9 @@ const Model = (() => { * @param {boolean} validate - run validation by default * @returns {Promise} */ - async update(changes, validate = true) { + async update (changes, validate = true) { // get the last saved version - const saved = await datasource.find(this[ID]) || {} + const saved = (await datasource.find(this[ID])) || {} // merge changes with last saved and optionally validate const valid = validateUpdates({ ...this, ...saved }, changes, validate) @@ -314,7 +314,7 @@ const Model = (() => { * @param {boolean} validate * @returns {Model} */ - updateSync(changes, validate = true) { + updateSync (changes, validate = true) { // merge changes with lastest copy and optionally validate const valid = validateUpdates(this, changes, validate) @@ -335,12 +335,12 @@ const Model = (() => { * @param {*} data * @returns */ - async save(id = null, data = null) { + async save (id = null, data = null) { if (id && data) return datasource.save(id, data) return datasource.save(this[ID], this) }, - async find(id) { + async find (id) { if (!id) throw new Error('missing id') return datasource.find(id) }, @@ -352,7 +352,7 @@ const Model = (() => { * @param {{key1, keyN}} filter - list of required matching key-values * @returns {Model[]} */ - listSync(filter) { + listSync (filter) { return datasource.listSync(filter) }, @@ -364,11 +364,11 @@ const Model = (() => { * @param {listOptions} options * @returns {Model[]} */ - async list(options) { + async list (options) { return datasource.list(options) }, - createWriteStream() { + createWriteStream () { return datasource.createWriteStream() }, @@ -376,11 +376,11 @@ const Model = (() => { * Original request passed in by caller * @returns arguments passed by caller */ - getArgs() { + getArgs () { return modelInfo.args ? modelInfo.args : [] }, - getDependencies() { + getDependencies () { return dependencies }, @@ -388,7 +388,7 @@ const Model = (() => { * Identify events types. * @returns {eventMask} */ - getEventMask() { + getEventMask () { return eventMask }, @@ -397,11 +397,11 @@ const Model = (() => { * * @returns {import(".").ModelSpecification} */ - getSpec() { + getSpec () { return modelInfo.spec }, - isCached() { + isCached () { return modelInfo.spec.isCached }, @@ -410,7 +410,7 @@ const Model = (() => { * * @returns {import(".").ports} */ - getPorts() { + getPorts () { return modelInfo.spec.ports }, @@ -419,7 +419,7 @@ const Model = (() => { * * @returns */ - getName() { + getName () { return this[MODELNAME] }, @@ -428,7 +428,7 @@ const Model = (() => { * * @returns {string} */ - getId() { + getId () { return this[ID] }, @@ -437,7 +437,7 @@ const Model = (() => { * * @returns {string[]} history of ports called by this model instance */ - getPortFlow() { + getPortFlow () { return this[PORTFLOW] }, @@ -447,11 +447,11 @@ const Model = (() => { * @param {string} key - string representation of Symbol * @returns {Symbol} */ - getKey(key) { + getKey (key) { return keyMap[key] }, - equals(model) { + equals (model) { return ( model && (model.id || model.getId) && @@ -459,34 +459,33 @@ const Model = (() => { ) }, - getContext(name) { + getContext (name) { return asyncContext[name] }, /** * Returns service of related domain provided * this domain is related to it via modelspec. - * If not, we won't be able to access the + * If not, we won't be able to access the * domain's memory. Every domain model employs * this capability-based security measure. - * @returns + * @returns */ - fetchRelatedModel(modelName) { + fetchRelatedModel (modelName) { const rel = Object.values(relations).find( v => v.modelName.toUpperCase() === modelName.toUpperCase() ) if (!rel) throw new Error('no relation found') - if (!datasource - .getFactory() - .listDataSources() - .includes(modelName.toUpperCase())) + if ( + !datasource.factory + .listDataSources() + .includes(modelName.toUpperCase()) + ) throw new Error('no datasource found') - const ds = datasource - .getFactory() - .getDataSource(modelName.toUpperCase()) + const ds = datasource.factory.getDataSource(modelName.toUpperCase()) if (!ds) throw new Error('no datasoure found') @@ -589,7 +588,7 @@ const Model = (() => { * @returns {Model} updated model * */ - async update(model, changes) { + async update (model, changes) { return model.update(changes) }, diff --git a/src/domain/shared-memory.js b/src/domain/shared-memory.js index 1001cf0d..8dd72bbf 100644 --- a/src/domain/shared-memory.js +++ b/src/domain/shared-memory.js @@ -7,21 +7,29 @@ import { EventBrokerFactory } from '.' const broker = EventBrokerFactory.getInstance() +// size is in UTF-16 codepoints const MAPSIZE = 2048 * 56 -// Size is in UTF-16 codepointse const KEYSIZE = 64 const OBJSIZE = 4056 +function logError (x) { + console.error({ msg: 'unexpected datatype', type: typeof x, value: x }) +} + const dataType = { write: { - string: x => x, + string: x => logError(x), object: x => JSON.stringify(x), - number: x => x + number: x => logError(x), + symbol: x => logError(x), + undefined: x => logError(x) }, read: { string: x => JSON.parse(x), - object: x => x, - number: x => x + object: x => logError(x), + number: x => logError(x), + symbol: x => logError(x), + undefined: x => logError(x) } } @@ -36,8 +44,8 @@ const dataType = { */ const SharedMemoryMixin = superclass => class extends superclass { - constructor (map, factory, name, options) { - super(map, factory, name, options) + constructor (map, name, options) { + super(map, name, options) // Indicate which class we extend this.className = super.className @@ -59,15 +67,30 @@ const SharedMemoryMixin = superclass => * @returns {import('.').Model} */ mapGet (id) { + if (!id) { + return console.warn({ + fn: `${__filename}:${this.mapGet.name}`, + message: 'no id provided' + }) + } + try { - if (!id) return console.log('no id provided') - const raw = this.dsMap.get(id) - if (!raw) return console.log('no data') - const data = dataType.read[typeof raw](raw) + const jsonStr = this.dsMap.get(id) + if (!jsonStr) { + return console.warn({ + fn: `${__filename}:${this.mapGet.name}`, + message: 'no data found' + }) + } + + const jsonObj = dataType.read[typeof jsonStr](jsonStr) + if (!jsonObj || typeof jsonObj !== 'object') { + throw new Error('problem reading data from shared mem') + } return isMainThread - ? data - : ModelFactory.loadModel(broker, this, data, this.name) + ? jsonObj + : ModelFactory.loadModel(broker, this, jsonObj, this.name) } catch (error) { console.error({ fn: this.mapGet.name, error }) } @@ -80,11 +103,15 @@ const SharedMemoryMixin = superclass => mapToArray () { return this.dsMap.map(v => isMainThread - ? JSON.parse(v) + ? JSON.parse(dv) : ModelFactory.loadModel(broker, this, JSON.parse(v), this.name) ) } + /** + * @override + * @returns + */ mapCount () { return this.dsMap.length } @@ -100,13 +127,12 @@ const SharedMemoryMixin = superclass => * @returns {SharedMap} */ function findSharedMap (name) { - if (name === workerData.poolName && workerData.sharedMap) workerData.sharedMap + if (name === workerData.poolName) return workerData.sharedMap if (workerData.dsRelated?.length > 0) { const dsRel = workerData.dsRelated.find(ds => ds.modelName === name) if (dsRel) return dsRel.dsMap } - return null } function rehydrateSharedMap (name) { @@ -144,16 +170,15 @@ export function withSharedMemory ( ? createSharedMap(mapsize, keysize, objsize, name) : rehydrateSharedMap(name) - if (sharedMap instanceof SharedMap) + if (sharedMap instanceof SharedMap) { return createDataSource.call(factory, name, { ...options, dsMap: sharedMap, - mixins: [ - DsClass => - class - extends SharedMemoryMixin(DsClass) { } - ].concat(options.mixins) + mixins: [DsClass => class extends SharedMemoryMixin(DsClass) {}].concat( + options.mixins || [] + ) }) + } return createDataSource.call(factory, name, options) } catch (error) { diff --git a/src/domain/thread-pool.js b/src/domain/thread-pool.js index 65785c83..2b10c7dd 100644 --- a/src/domain/thread-pool.js +++ b/src/domain/thread-pool.js @@ -65,15 +65,16 @@ const DEFAULT_TIME_TO_LIVE = 180000 */ /** - * Queues break context so we need some help + * + * Queues break async context so make this an async resource */ class Job extends AsyncResource { constructor ({ jobName, jobData, modelName, resolve, reject, options }) { super('Job') const store = new Map([...requestContext.getStore()]) - this.requestId = store.get('id') + store.set('asyncId', this.asyncId()) store.delete('res') // can't pass socket - store.delete('req') // can't pass socket + this.requestId = store.get('id') this.options = options this.jobName = jobName this.jobData = { jobData, modelName, context: store } @@ -137,7 +138,7 @@ export class ThreadPool extends EventEmitter { this.threads = [] /** @type {Array} */ this.freeThreads = [] - /** @type {Array<(Thread)=>postJob>}*/ + /** @type {ArrayThread.run(Job)>}*/ this.waitingJobs = waitingJobs this.file = file this.name = name @@ -189,7 +190,7 @@ export class ThreadPool extends EventEmitter { * file:string * workerData:WorkerOptions.workerData * }} param0 - * @returns {Promise} + * @returns {Thread} */ newThread ({ pool = this, file, workerData }) { EventEmitter.captureRejections = true @@ -216,7 +217,7 @@ export class ThreadPool extends EventEmitter { }, /** - * Post this job to a worker. + * Run job on this thread. * * @param {Job} job */ @@ -247,9 +248,10 @@ export class ThreadPool extends EventEmitter { const errorFn = AsyncResource.bind(error => { pool.jobTime(job.stopTimer()) - console.error({ fn: 'thread.run', error }) + console.error({ fn: this.run.name, error }) unsubscribe('exit', exitFn) unsubscribe('message', messageFn) + pool.incrementErrorCount() pool.threads.splice(pool.threads.indexOf(this), 1) pool.emit('unhandledThreadError', error) job.reject(error) @@ -272,6 +274,7 @@ export class ThreadPool extends EventEmitter { this[channel].once('message', messageFn) this[channel].once('error', errorFn) this[channel].once('exit', exitFn) + job.startTimer() this[channel].postMessage({ name, data }, transfer) @@ -314,7 +317,7 @@ export class ThreadPool extends EventEmitter { /** * - * @param {Thread} thread + * @param {Thread} threa * @param {*} reason * @returns */ @@ -338,6 +341,7 @@ export class ThreadPool extends EventEmitter { * * @param {string} jobName name of a use case function in {@link UseCaseService} * @param {*} jobData anything that can be cloned + * @param {string} modelName its possible to have multiple models per domain * @returns {Promise<*>} anything that can be cloned */ runJob (jobName, jobData, modelName, options = {}) { @@ -348,6 +352,7 @@ export class ThreadPool extends EventEmitter { console.warn('pool is closed') return reject('pool is closed') } + const job = new Job({ jobName, jobData, @@ -359,16 +364,14 @@ export class ThreadPool extends EventEmitter { let thread = this.freeThreads.shift() - if (!thread) { - thread = this.allocate() - } + if (!thread) thread = this.allocate() if (thread) { thread.run(job) return } - console.warn('no threads: queue job', jobName) + console.warn('no threads: queuing job', jobName) this.waitingJobs.push(thread => thread.run(job)) this.jobsQueued++ }) @@ -383,13 +386,11 @@ export class ThreadPool extends EventEmitter { * @param {Thread} thread */ reallocate (thread) { - if (this.waitingJobs.length > 0) { - // call `postJob`: the caller has provided - // a callback to run when the job is done + if (this.waitingJobs.length > 0) + // call thread.run this.waitingJobs.shift()(thread) - } else { - this.freeThreads.push(thread) - } + // return to pool + else this.freeThreads.push(thread) } /** @@ -559,7 +560,9 @@ export class ThreadPool extends EventEmitter { } async fireEvent (event) { - return this.runJob(event.eventName, event, this.name, { channel: EVENTCHANNEL }) + return this.runJob(event.eventName, event, this.name, { + channel: EVENTCHANNEL + }) } /** @@ -581,9 +584,8 @@ export class ThreadPool extends EventEmitter { } return new Promise((resolve, reject) => { - if (this.noJobsRunning()) { - resolve(this) - } else { + if (this.noJobsRunning()) resolve(this) + else { const timerId = setTimeout( () => reject(new Error('drain timeout')), 4000 @@ -617,13 +619,10 @@ const ThreadPoolFactory = (() => { const broadcastChannels = new Map() function getBroadcastChannel (poolName) { - if (broadcastChannels.has(poolName)) { - return broadcastChannels.get(poolName) - } - - const broadcast = new BroadcastChannel(poolName) - broadcastChannels.set(poolName, broadcast) - return broadcast + if (broadcastChannels.has(poolName)) return broadcastChannels.get(poolName) + const channel = new BroadcastChannel(poolName) + broadcastChannels.set(poolName, channel) + return channel } /** @@ -642,9 +641,14 @@ const ThreadPoolFactory = (() => { * @returns */ function calculateMaxThreads (options) { + // defer to explicitly set value if (options?.maxThreads) return options.maxThreads - const nApps = ModelFactory.getModelSpecs().filter(s => !s.isCached).length - return Math.floor(os.cpus().length / nApps || 1) || DEFAULT_THREADPOOL_MAX + // get the total number of domains + const nApps = ModelFactory.getModelSpecs().filter( + s => !s.isCached && s.modelName === s.domain + ).length + // divide the total cpu count by the number of domains + return Math.floor(os.cpus().length / nApps || DEFAULT_THREADPOOL_MAX || 1) } /** @@ -654,8 +658,8 @@ const ThreadPoolFactory = (() => { */ /** - * Creates a pool for use by a domain {@link Model}. - * Provides thread-safe {@link Map}. + * Creates a pool for use by one or more {@link Model}s of a domain. + * Provides shared memory {@link Map}s accessible from other authorized pools. * @param {string} poolName use {@link Model.getName()} * @param {threadOptions} options * @returns @@ -706,15 +710,18 @@ const ThreadPoolFactory = (() => { */ function getThreadPool (poolName, options) { function getPool (poolName, options) { - if (threadPools.has(poolName)) { - return threadPools.get(poolName) - } + if (threadPools.has(poolName)) return threadPools.get(poolName) return createThreadPool(poolName, options) } const facade = { async runJob (jobName, jobData, modelName) { - return getPool(poolName, options).runJob(jobName, jobData, modelName, options) + return getPool(poolName, options).runJob( + jobName, + jobData, + modelName, + options + ) }, status () { return getPool(poolName, options).status() @@ -756,6 +763,7 @@ const ThreadPoolFactory = (() => { return new Promise((resolve, reject) => { const pool = threadPools.get(poolName.toUpperCase()) if (!pool) reject(`no such pool ${pool}`) + pool .close() .notify(poolClose) @@ -784,11 +792,9 @@ const ThreadPoolFactory = (() => { } async function removeUndeployedPools () { - const pools = ThreadPoolFactory.listPools().map(pool => pool) const allModels = ModelFactory.getModelSpecs().map(spec => spec.modelName) - await Promise.all( - pools + listPools() .filter(poolName => !allModels.includes(poolName.toUpperCase())) .map(poolName => destroy(threadPools.get(poolName))) ) @@ -814,9 +820,7 @@ const ThreadPoolFactory = (() => { } function status (poolName = null) { - if (poolName) { - return threadPools.get(poolName.toUpperCase()).status() - } + if (poolName) return threadPools.get(poolName.toUpperCase()).status() return [...threadPools].map(([, v]) => v.status()) } @@ -849,7 +853,7 @@ const ThreadPoolFactory = (() => { await pool.abort(reason) // get jobs going again - if (pool.waitingJobs.length > 1) { + if (pool.jobQueueDepth() > 1) { try { const runJob = pool.waitingJobs.shift() const thread = pool.allocate() @@ -892,17 +896,21 @@ const ThreadPoolFactory = (() => { pool.totalTransactions() - pool.jobQueueDepth() === workCompleted ) { const timerId = setTimeout(() => abort(pool), 1000) - const done = false - for await (const thread of pool.threads) { - if (pool.aborting || done) return + for (const thread of pool.threads) { + if (pool.aborting || !timerId) return thread.run( new Job({ - name: 'ping', - data: timerId, - resolve: id => clearTimeout(id) && (done = true), - reject: console.error + jobName: 'ping', + jobData: timerId, + options: { channel: EVENTCHANNEL }, + reject: console.error, + resolve: result => + result === timerId && + clearTimeout(result) && + (timerId = null) && + pool.freeThreads.push(thread) }) ) } diff --git a/src/domain/use-cases/create-model.js b/src/domain/use-cases/create-model.js index 3b9dc4a3..d2016933 100644 --- a/src/domain/use-cases/create-model.js +++ b/src/domain/use-cases/create-model.js @@ -29,11 +29,11 @@ export default function makeCreateModel ({ threadpool, idempotent, broker, - handlers = [], + handlers = [] } = {}) { const eventType = models.EventTypes.CREATE const eventName = models.getEventName(eventType, modelName) - handlers.forEach((handler) => broker.on(eventName, handler)) + handlers.forEach(handler => broker.on(eventName, handler)) // Add an event whose callback invokes this factory. broker.on(domainEvents.createModel(modelName), createModel) @@ -46,10 +46,8 @@ export default function makeCreateModel ({ return threadpool.runJob(createModel.name, input, modelName) } else { try { - const model = models.createModel(broker, repository, modelName, input) await repository.save(model.getId(), model) - console.debug({ fn: createModel.name, model }) try { const event = models.createEvent(eventType, modelName, model) broker.notify(eventName, event) diff --git a/src/domain/use-cases/edit-model.js b/src/domain/use-cases/edit-model.js index b8316ae0..d259e22a 100644 --- a/src/domain/use-cases/edit-model.js +++ b/src/domain/use-cases/edit-model.js @@ -44,9 +44,7 @@ export default function makeEditModel ({ if (isMainThread) { const model = await repository.find(input.id) - if (!model) { - throw new Error('no such id') - } + if (!model) throw new Error('no such id') return threadpool.runJob(editModel.name, input, modelName) } else { diff --git a/src/domain/use-cases/index.js b/src/domain/use-cases/index.js index eb57cbf4..d01dd1f2 100644 --- a/src/domain/use-cases/index.js +++ b/src/domain/use-cases/index.js @@ -1,33 +1,33 @@ -"use strict" +'use strict' -import ModelFactory from "../model-factory" -import DataSourceFactory from "../datasource-factory" -import ThreadPoolFactory from "../thread-pool.js" -import EventBrokerFactory from "../event-broker" +import ModelFactory from '../model-factory' +import DataSourceFactory from '../datasource-factory' +import ThreadPoolFactory from '../thread-pool.js' +import EventBrokerFactory from '../event-broker' -import makeCreateModel from "./create-model" -import makeEditModel from "./edit-model" -import makeListModels from "./list-models" -import makeFindModel from "./find-model" -import makeRemoveModel from "./remove-model" -import makeLoadModels from "./load-models" -import makeDeployModel from "./deploy-model" -import makeListConfig from "./list-configs" -import makeEmitEvent from "./emit-event" -import makeInvokePort from "./invoke-port" -import makeHotReload from "./hot-reload" -import brokerEvents from "./broker-events" -import DistributedCache from "../distributed-cache" -import makeServiceMesh from "./create-service-mesh.js" -import { PortEventRouter } from "../event-router" -import { isMainThread } from "worker_threads" -import { hostConfig } from "../../config" -import { requestContext } from "../util/async-context" +import makeCreateModel from './create-model' +import makeEditModel from './edit-model' +import makeListModels from './list-models' +import makeFindModel from './find-model' +import makeRemoveModel from './remove-model' +import makeLoadModels from './load-models' +import makeDeployModel from './deploy-model' +import makeListConfig from './list-configs' +import makeEmitEvent from './emit-event' +import makeInvokePort from './invoke-port' +import makeHotReload from './hot-reload' +import brokerEvents from './broker-events' +import DistributedCache from '../distributed-cache' +import makeServiceMesh from './create-service-mesh.js' +import { PortEventRouter } from '../event-router' +import { isMainThread } from 'worker_threads' +import { hostConfig } from '../../config' +import { requestContext } from '../util/async-context' export const serviceMeshPlugin = hostConfig.services.activeServiceMesh || process.env.SERVICE_MESH_PLUGIN || - "webswitch" + 'webswitch' export function registerEvents () { // main thread handles event dispatch @@ -39,14 +39,13 @@ export function registerEvents () { PortEventRouter, DistributedCache, createServiceMesh: makeOne(serviceMeshPlugin, makeServiceMesh, { - internal: true, - }), + internal: true + }) }) } -function modelsInDomain (domain) { - return ModelFactory - .getModelSpecs() +export function modelsInDomain (domain) { + return ModelFactory.getModelSpecs() .filter(s => s.domain && s.domain.toUpperCase() === domain.toUpperCase()) .map(s => s.modelName.toUpperCase()) } @@ -57,15 +56,15 @@ function modelsInDomain (domain) { * @returns */ function findLocalRelatedModels (modelName) { - const localModels = ModelFactory.getModelSpecs().map((spec) => + const localModels = ModelFactory.getModelSpecs().map(spec => spec.modelName.toUpperCase() ) const spec = ModelFactory.getModelSpec(modelName) const result = !spec?.relations ? [] : Object.keys(spec.relations) - .map((k) => spec.relations[k].modelName.toUpperCase()) - .filter((modelName) => localModels.includes(modelName)) + .map(k => spec.relations[k].modelName.toUpperCase()) + .filter(modelName => localModels.includes(modelName)) if (!spec.domain) return result const models = modelsInDomain(spec.domain) @@ -79,11 +78,11 @@ function findLocalRelatedModels (modelName) { * @returns */ function findLocalRelatedDatasources (spec) { - return findLocalRelatedModels(spec.modelName).map((modelName) => ({ + return findLocalRelatedModels(spec.modelName).map(modelName => ({ modelName, dsMap: DataSourceFactory.getSharedDataSource(modelName, { - domain: spec.domain, - }).dsMap, + domain: spec.domain + }).dsMap })) } @@ -92,8 +91,8 @@ function getDataSource (modelName, options) { return shared ? DataSourceFactory.getSharedDataSource(modelName, options) : isMainThread - ? DataSourceFactory.getDataSource(modelName, options) - : null + ? DataSourceFactory.getDataSource(modelName, options) + : null } function getThreadPool (spec, ds, options) { @@ -102,7 +101,7 @@ function getThreadPool (spec, ds, options) { ...options, preload: false, sharedMap: ds.dsMap, - dsRelated: findLocalRelatedDatasources(spec), + dsRelated: findLocalRelatedDatasources(spec) }) } @@ -129,13 +128,13 @@ function buildOptions (spec, options) { // if caller provides id, use it as key for idempotency async idempotent () { return ds.find(requestContext.getStore().get('id')) - }, + } } } else { return { ...invariant, // only worker threads can write to persistent storage - repository: getDataSource(spec.modelName, options), + repository: getDataSource(spec.modelName, options) } } } @@ -148,11 +147,11 @@ function buildOptions (spec, options) { */ function make (factory) { const specs = ModelFactory.getModelSpecs() - return specs.map((spec) => ({ + return specs.map(spec => ({ endpoint: spec.endpoint, path: spec.path, ports: spec.ports, - fn: factory(buildOptions(spec, { domain: spec.domain })), + fn: factory(buildOptions(spec, { domain: spec.domain })) })) } @@ -178,19 +177,19 @@ const deployModels = () => make(makeDeployModel) const invokePorts = () => make(makeInvokePort) const hotReload = () => [ { - endpoint: "reload", + endpoint: 'reload', fn: makeHotReload({ models: ModelFactory, - broker: EventBrokerFactory.getInstance(), - }), - }, + broker: EventBrokerFactory.getInstance() + }) + } ] const listConfigs = () => makeListConfig({ models: ModelFactory, broker: EventBrokerFactory.getInstance(), datasources: DataSourceFactory, - threadpools: ThreadPoolFactory, + threadpools: ThreadPoolFactory }) /** @@ -198,11 +197,11 @@ const listConfigs = () => * * @param {*} modelName */ -const domainPorts = (modelName) => ({ +const domainPorts = modelName => ({ ...UseCaseService(modelName), eventBroker: EventBrokerFactory.getInstance(), modelSpec: ModelFactory.getModelSpec(modelName), - dataSource: DataSourceFactory.getDataSource(modelName), + dataSource: DataSourceFactory.getDataSource(modelName) }) /** @@ -216,7 +215,7 @@ const userController = (fn, ports) => async (req, res) => { return await fn(req, res, ports) } catch (error) { console.error({ fn: userController.name, error }) - res.status(500).json({ msg: "error occurred", error }) + res.status(500).json({ msg: 'error occurred', error }) } } @@ -231,18 +230,18 @@ const userController = (fn, ports) => async (req, res) => { export function getUserRoutes () { try { return ModelFactory.getModelSpecs() - .filter((spec) => spec.routes) - .map((spec) => + .filter(spec => spec.routes) + .map(spec => spec.routes - .filter((route) => route) - .map((route) => { + .filter(route => route) + .map(route => { const api = domainPorts(spec.modelName) return Object.keys(route) - .map((key) => - typeof route[key] === "function" + .map(key => + typeof route[key] === 'function' ? { - [key]: userController(route[key], api), - } + [key]: userController(route[key], api) + } : { [key]: route[key] } ) .reduce((a, b) => ({ ...a, ...b })) @@ -266,7 +265,7 @@ export const UseCases = { registerEvents, emitEvents, deployModels, - invokePorts, + invokePorts } /** @@ -278,7 +277,7 @@ export const UseCases = { * @returns */ export function UseCaseService (modelName = null) { - if (typeof modelName === "string") { + if (typeof modelName === 'string') { const modelNameUpper = modelName.toUpperCase() return { createModel: makeOne(modelNameUpper, makeCreateModel), @@ -290,7 +289,7 @@ export function UseCaseService (modelName = null) { emitEvent: makeOne(modelNameUpper, makeEmitEvent), deployModel: makeOne(modelNameUpper, makeDeployModel), invokePort: makeOne(modelNameUpper, makeInvokePort), - listConfigs: listConfigs(), + listConfigs: listConfigs() } } return { @@ -304,7 +303,7 @@ export function UseCaseService (modelName = null) { hotReload: hotReload(), emitEvent: emitEvents(), invokePort: invokePorts(), - listConfigs: listConfigs(), + listConfigs: listConfigs() } } diff --git a/src/domain/use-cases/invoke-port.js b/src/domain/use-cases/invoke-port.js index ca6951e2..04453c8f 100644 --- a/src/domain/use-cases/invoke-port.js +++ b/src/domain/use-cases/invoke-port.js @@ -9,6 +9,7 @@ import { AppError } from '../util/app-error' * @property {import('../model-factory').ModelFactory models * @property {import('../datasources/datasource').default} repository * @property {import('../domain/event-broker').EventBroker} broker + * @property {import('../thread-pool').ThreadPool} threadpool * @property {Function[]} handlers */ @@ -41,7 +42,12 @@ export default function makeInvokePort ({ try { const { id = null, port } = input const service = await findModelService(id) - if (!service) throw new Error('could not find service') + + if (!service) throw new Error('service not found') + + if (typeof service[port] !== 'function') + throw new Error(`${port} is not a function`) + return await service[port](input) } catch (error) { return AppError(error) diff --git a/src/domain/use-cases/list-models.js b/src/domain/use-cases/list-models.js index 19cee86f..66de5258 100644 --- a/src/domain/use-cases/list-models.js +++ b/src/domain/use-cases/list-models.js @@ -10,6 +10,6 @@ */ export default function makeListModels ({ repository }) { return async function listModels ({ query, writable }) { - return repository.list({ query, writable }) + return repository.list({ query, writable, serialize: true }) } } diff --git a/src/domain/use-cases/load-models.js b/src/domain/use-cases/load-models.js index 3108fa0e..721e0287 100644 --- a/src/domain/use-cases/load-models.js +++ b/src/domain/use-cases/load-models.js @@ -3,6 +3,7 @@ import Serializer from '../serializer' import { resumeWorkflow } from '../orchestrator' import { isMainThread } from 'worker_threads' +import { modelsInDomain } from '.' /** * @param {function(import("..").Model)} loadModel @@ -36,6 +37,7 @@ function hydrateModels (loadModel, broker, repository) { function handleError (e) { console.error(e) } + /** * * @param {{ @@ -44,17 +46,17 @@ function handleError (e) { * }} */ function handleRestart (repository, eventName) { - // console.log("resuming workflow", repository.name); - if (process.env.RESUME_WORKFLOW_DISABLED) return + const writable = {} + repository - .list() + .list({ writable }) .then(resumeWorkflow) .catch(handleError) repository - .list() + .listSync() .then(list => list.forEach(model => model.emit(eventName, model))) .catch(handleError) } @@ -70,40 +72,22 @@ function handleRestart (repository, eventName) { * }} options * @returns {function():Promise} */ -export default function ({ - modelName, - repository, - broker, - models, - handlers = [] -}) { +export default function ({ modelName, repository, broker, models }) { // main thread only - if (isMainThread) { - const eventType = models.EventTypes.ONLOAD - const eventName = models.getEventName(eventType, modelName) - handlers.forEach(handler => broker.on(eventName, handler)) - + if (!isMainThread) { /** * Loads persited data from datd cc x */ return async function loadModels () { - const spec = models.getModelSpec(modelName) - - // if (isMainThread) { - const eventType = models.EventTypes.ONLOAD - const eventName = models.getEventName(eventType, modelName) - handlers.forEach(handler => broker.on(eventName, handler)) - - return async function loadModels () { - const spec = models.getModelSpec(modelName) - - setTimeout(handleRestart, 30000, repository, eventName) - - return repository.load({ - hydrate: hydrateModels(models.loadModel, broker, repository), - serializer: Serializer.addSerializer(spec.serializers) - }) - } + // const domainModels = modelsInDomain(modelName) + // for await (const model of domainModels) { + // const spec = models.getModelSpec(model) + // setTimeout(handleRestart, 30000, repository, eventName) + // return repository.load({ + // hydrate: hydrateModels(models.loadModel, broker, repository), + // serializer: Serializer.addSerializer(spec.serializers), + // }) + // } } } } diff --git a/src/domain/use-cases/remove-model.js b/src/domain/use-cases/remove-model.js index 64513ce4..ed9e603d 100644 --- a/src/domain/use-cases/remove-model.js +++ b/src/domain/use-cases/remove-model.js @@ -37,10 +37,7 @@ export default function removeModelFactory ({ return async function removeModel ({ id }) { if (isMainThread) { const model = await repository.find(id) - - if (!model) { - throw new Error('no such id') - } + if (!model) throw new Error('no such id') return threadpool.runJob(removeModel.name, { id }, modelName) } else { diff --git a/src/domain/util/change-data-capture.js b/src/domain/util/change-data-capture.js index 7c47f1b9..418fd668 100644 --- a/src/domain/util/change-data-capture.js +++ b/src/domain/util/change-data-capture.js @@ -1,22 +1,54 @@ 'use strict' +const { requestContext } = require('./async-context') + +class CdcCapture extends Map { + get (target) { + const targetName = + target.constructor?.name || + target.prototype?.constructor.name || + target.modelName || + target.getName() + + if (!targetName) return + + if (!super.has(targetName)) super.set(targetName, []) + return super.get(targetName) + } +} + +const cdcCapture = new CdcCapture() + +function capture (target, prop, value) { + if (target[prop] === value) return false + + cdcCapture.get(target).push({ + object: target, + prop, + from: target[prop], + to: value, + time: Date.now(), + user: requestContext.getStore().get('user') + }) + + console.log(`changed ${prop} from ${target[prop]} to ${value}`) + return true +} + /** - * @param {object} target - * @param {Array<{prop:string,from:*,to:*}>} captureArray + * @param {object} target + * @param {Array<{prop:string,from:*,to:*}>} captureArray */ -function changeDataCapture (target, captureArray) { - const cdc = { - set (target, prop, value) { - console.log(`changed ${prop} from ${target[prop]} to ${value}`) +function changeDataCapture (target, handlers = []) { + handlers.push(capture) - if (target[prop] !== value) { - target[prop] = value - captureArray.push({ prop, from: target[prop], to: value }) - } - return true + const handler = { + set (target, prop, value) { + return handlers.reduce(h => h(target, prop, value)) } } - return new Proxy(target, cdc) + + return new Proxy(target, handler) } /** @@ -26,14 +58,14 @@ function changeDataCapture (target, captureArray) { * @returns */ function findChanges (from, to) { - const captureArray = [] - const fromProxy = changeDataCapture(from, captureArray) + const fromProxy = changeDataCapture(from) + const captureArray = cdcCapture.get(from) Object.keys(to).forEach(key => { if (to[key]) fromProxy[key] = to[key] }) - // new object with changes (and only changes), + // new object with changes (and only changes), // excluding props that changed to undefined or null return { toObject: () => diff --git a/start.sh b/start.sh index ac26981e..f8974e50 100755 --- a/start.sh +++ b/start.sh @@ -1,3 +1,8 @@ -# create symlink between aegis-host and aegis vs using npm +# create symlink between aegis-host and aegis vs using npm -yarn link && cd ../aegis-host && yarn link @module-federation/aegis && yarn start +#export FORKRUN_CMD=/Users/tysonmidboe/.nvm/versions/node/v18.12.0/bin/node +#export FORKRUN_ARG=/Users/tysonmidboe/aegis-app/repo.js +export FORKRUN_CMD=/usr/bin/open +export FORKRUN_ARG=http://localhost + +./forkrun && yarn link && cd ../aegis-host && yarn link @module-federation/aegis && yarn start