diff --git a/.start.sh.swp b/.start.sh.swp deleted file mode 100644 index d5a2240b..00000000 Binary files a/.start.sh.swp and /dev/null differ diff --git a/forkrun.c b/forkrun.c index 315fd62f..85081ffb 100644 --- a/forkrun.c +++ b/forkrun.c @@ -13,6 +13,9 @@ char arg[BUFSIZE]; int main (void) { pid_t pid; + + + if ((pid = fork()) == 0) { snprintf(cmd, BUFSIZE, "%s", getenv(envvar_cmd)); snprintf(arg, BUFSIZE, "%s", getenv(envvar_arg)); @@ -29,3 +32,7 @@ int main (void) { return EXIT_FAILURE; } + +int modelFactory(int i) { + return 56; +} diff --git a/repo.js b/repo.js index d54cf738..1a3e5970 100644 --- a/repo.js +++ b/repo.js @@ -2,7 +2,7 @@ const express = require('express') const app = express() -const port = 8000 +const port = 8880 app.use(express.json()) app.use(express.static('dist')) diff --git a/src/adapters/datasources/datasource-mongodb.js b/src/adapters/datasources/datasource-mongodb.js index 53937438..71b0f62f 100644 --- a/src/adapters/datasources/datasource-mongodb.js +++ b/src/adapters/datasources/datasource-mongodb.js @@ -194,11 +194,7 @@ export class DataSourceMongoDb extends DataSource { */ async mongoFind ({ filter, sort, limit, aggregate, skip } = {}) { - console.log({ - ctor: this.constructor.name, - fn: this.mongoFind.name, - filter - }) + console.log({ fn: this.mongoFind.name, filter }) let cursor = (await this.collection()).find(filter) if (sort) cursor = cursor.sort(sort) if (aggregate) cursor = cursor.aggregate(aggregate) @@ -232,14 +228,14 @@ export class DataSourceMongoDb extends DataSource { }, // each chunk is a record - transform (chunk, _encoding, callback) { + transform (chunk, _encoding, next) { // comma-separate if (first) first = false else this.push(',') // serialize record this.push(JSON.stringify(chunk)) - callback() + next() }, // end of array diff --git a/src/domain/datasource-factory.js b/src/domain/datasource-factory.js index 5f3e7006..af6adaad 100644 --- a/src/domain/datasource-factory.js +++ b/src/domain/datasource-factory.js @@ -97,6 +97,11 @@ const DsCoreExtensions = superclass => }) } + /** + * @override + * @param {*} options + * @returns + */ async list (options) { if (options?.writable) return isMainThread @@ -112,6 +117,22 @@ const DsCoreExtensions = superclass => ModelFactory.loadModel(broker, this, model, this.name) ) } + + /** + * @override + * @param {*} id + * @returns + */ + async delete (id) { + try { + await super.delete(id) + } catch (error) { + console.error(error) + return + } + // only if super succeeds + this.deleteSync(id) + } } const extendClass = DsClass => class extends DsCoreExtensions(DsClass) {} diff --git a/src/domain/index.js b/src/domain/index.js index 93f2835b..93256dfb 100644 --- a/src/domain/index.js +++ b/src/domain/index.js @@ -126,17 +126,14 @@ */ /** - * @typedef {object} datasource + * @typedef datasourceConfig + * @property {import('./datasource').default} datasource * @property {string} url - physical storage location: e.g. database url, file path - * @property {function()} adapterFactory - factory function to construct datasource adapter - * @property {string} baseClass - name of base class to extend + * @property {function({DataSource}):DataSourcelw4l} adapterFactory - factory function to construct datasource adapter + * @property {string} baseClass - name owkllllllllllllllf base class to extend * @property {number} [cacheSize] - maxium number of cached instances before purging * @property {number} [cacheSizeKb] - maximum size in kilobytes of cached instances before cache purge * @property {boolean} [cachedWrite] - allow cached instances of an object to write to persistent storage - * @property {()=>[]} list - * @property {(id)=>{}} find - * @property {(id,data)=>data} save - * @property {import('.').ports} factory */ /** @@ -173,7 +170,7 @@ * URL parameter or query of the auto-generated REST API * @property {accessControlList} [accessControlList] - configure authorization * @property {number} [start] - create `start` instances of the model - * @property {datasource} [datasource] - define custom datasource + * @property {datasourceConfig} [datasource] - define custom datasource * @property {Array<{ [method:string]: function(), path: string }>} [routes] - custom routes */ @@ -431,6 +428,24 @@ export async function importRemoteCache (name) { } } +/** + * The total number of domains deployed to a host. + * A domain consists of one or more models. + * Each model represents a domain unless it specifies + * the name of another model in `ModelSpecification.domain`, + * in which case it is a subdomain within the bounded context + * of that domain or simply a supporting entity. Such models + * run in the same threadpool and share the same storage + * namespace (e.g. collections or tables in the same database) + * + * + * @returns {number} sum of domains deployed to host + */ +export const totalDomains = () => + ModelFactory.getModelSpecs().filter( + s => !s.isCached && (!s.domain || s.modelName === s.domain) + ).length + export { UseCaseService } from './use-cases' export { default as EventBrokerFactory } from './event-broker' diff --git a/src/domain/make-relations.js b/src/domain/make-relations.js index 0d3b7649..b3cee4c2 100644 --- a/src/domain/make-relations.js +++ b/src/domain/make-relations.js @@ -27,7 +27,7 @@ export const relationType = { const filter = { [rel.foreignKey]: model.getId() } // retrieve from memory const memory = ds.listSync(filter) - // call datasource interface to fetch from external storage + // retrieve from from external storage const externalMedia = await ds.list({ query: filter }) // return all if (memory.length > 0) @@ -50,7 +50,7 @@ export const relationType = { // return if found if (memory) return memory // if not, call ds interface to search external storage - return ds.find({ id: model[rel.foreignKey] }) + return ds.find(model[rel.foreignKey]) }, /** diff --git a/src/domain/thread-pool.js b/src/domain/thread-pool.js index 8e1499ad..850650b1 100644 --- a/src/domain/thread-pool.js +++ b/src/domain/thread-pool.js @@ -4,7 +4,7 @@ import EventBrokerFactory from './event-broker' import { EventEmitter } from 'stream' import { Worker, BroadcastChannel } from 'worker_threads' import domainEvents from './domain-events' -import ModelFactory from '.' +import ModelFactory, { totalDomains } from '.' import os from 'os' import { AsyncResource } from 'async_hooks' import { requestContext } from '.' @@ -669,19 +669,20 @@ const ThreadPoolFactory = (() => { /** * By default the system-wide thread upper limit = the total # of cores. - * The default behavior is to spread threads/cores evenly between models. + * The default behavior is to spread cores evenly between domains. In + * the ModelSpec, this includes standalone models, e.g. models that + * have no domain configured or whose domain name is the same as its modelName. + * * @param {*} options * @returns */ function calculateMaxThreads (options) { // defer to explicitly set value if (options?.maxThreads) return options.maxThreads - // get the total number of domains - const nApps = ModelFactory.getModelSpecs().filter( - s => !s.isCached && s.modelName === s.domain - ).length // divide the total cpu count by the number of domains - return Math.floor(os.cpus().length / nApps || DEFAULT_THREADPOOL_MAX || 1) + return Math.floor( + os.cpus().length / totalDomains() || DEFAULT_THREADPOOL_MAX + ) } /** @@ -754,7 +755,7 @@ const ThreadPoolFactory = (() => { jobName, jobData, modelName, - options + ) }, status () { @@ -786,8 +787,7 @@ const ThreadPoolFactory = (() => { /** * This is the hot reload. Drain the pool, - * stop the existing threads & start new - * ones, which will have the latest code + * stop the pool and then start lsskk * @param {string} poolName i.e. modelName * @returns {Promise} * @throws {ReloadError} diff --git a/src/domain/use-cases/find-model.js b/src/domain/use-cases/find-model.js index 17ac7ad7..f84f508d 100644 --- a/src/domain/use-cases/find-model.js +++ b/src/domain/use-cases/find-model.js @@ -34,25 +34,18 @@ export default function makeFindModel ({ return async function findModel ({ id, query, model }) { if (isMainThread) { // Main thread performs read operations - const model = await repository.find(id) + const modelInst = await repository.find(id) - if (!model) { + if (!modelInst) { throw new Error('Not Found') } - console.log({ fn: findModel.name, model }) + console.log({ fn: findModel.name, model: modelInst }) // Only send to app thread if data must be enriched - if (!query.relation && !query.command) return model + if (!query.relation && !query.command) return modelInst - return await threadpool.runJob( - findModel.name, - { - id, - query, - model - }, - modelName - ) + const input = { id, query, model: modelInst } + return await threadpool.runJob(findModel.name, input, modelName) } else { try { const hydrateModel = model => diff --git a/src/domain/use-cases/hot-reload.js b/src/domain/use-cases/hot-reload.js index 3dbe1ef4..b0aba855 100644 --- a/src/domain/use-cases/hot-reload.js +++ b/src/domain/use-cases/hot-reload.js @@ -60,15 +60,13 @@ export default function makeHotReload ({ models, broker } = {}) { try { if (modelName && modelName !== '*') { - const spec = models.getModelSpec(modelName.toUpperCase()) + const spec = models.getModelSpec(modelName) if (!spec) throw new Error(`model not found ${modelName}`) const poolName = spec.domain || modelName - // compile() console.log('reloading pool', poolName) await ThreadPoolFactory.reload(poolName) return ThreadPoolFactory.status(poolName) } else { - // compile() console.log('reloading all pools') await ThreadPoolFactory.reloadPools() return ThreadPoolFactory.status() diff --git a/wasm/assembly/index.ts b/wasm/assembly/index.ts index 7b605df5..e0384858 100644 --- a/wasm/assembly/index.ts +++ b/wasm/assembly/index.ts @@ -1,181 +1,182 @@ -import * as aegis from "./aegis"; +import * as aegis from './aegis' +import * as nn from './neural-net' //import * as nn from "./neural-net"; //import * as tst from "./test"; -const key: i32 = 0; -const val: i32 = 1; +const key: i32 = 0 +const val: i32 = 1 -export class ModelSpec { - modelName: string; - endpoint: string; - constructor(name: string, endpoint: string) { - this.modelName = name; - this.endpoint = endpoint; +class ModelSpec { + modelName: string + endpoint: string + constructor (name: string, endpoint: string) { + this.modelName = name + this.endpoint = endpoint } } -export function getModelSpec(): ModelSpec { - return new ModelSpec("wasm", "wasm"); +export function getModelSpec (): ModelSpec { + return new ModelSpec('wasm', 'wasm') } -export const ArrayOfStrings_ID = idof(); +export const ArrayOfStrings_ID = idof() -function findVal(key: string, keys: string[], vals: string[]): string { +function findVal (key: string, keys: string[], vals: string[]): string { for (let i: i32 = 0; i < keys.length; i++) { if (keys[i] == key) { - return vals[i].toString(); + return vals[i].toString() } } - return ""; + return '' } -export function modelFactory(keys: string[], values: string[]): string[][] { - const arr = new Array(3); - arr[0] = ["key1", findVal("key1", keys, values)]; - arr[1] = ["key2", findVal("key2", keys, values)]; - arr[2] = ["fibonacci", findVal("fibonacci", keys, values)]; - return arr; +export function modelFactory (keys: string[], values: string[]): string[][] { + const arr = new Array(3) + arr[0] = ['key1', findVal('key1', keys, values)] + arr[1] = ['key2', findVal('key2', keys, values)] + arr[2] = ['fibonacci', findVal('fibonacci', keys, values)] + return arr } -export function getPorts(keys: string[], vals: string[]): string[][] { +export function getPorts (keys: string[], vals: string[]): string[][] { //aegis.log("getPorts called " + keys[0] + ":" + vals[0]); - const ports = new Array(2); + const ports = new Array(2) //service,type,consumesEvent,producesEvent,callback,undo - ports[0] = ["port1", "dFlow,outbound,dFlow_start,port1_done,port1Cb,1"]; - ports[1] = ["port2", "dFlow,outbound,port1_done,port2_done,port2Cb,1"]; - return ports; + ports[0] = ['port1', 'dFlow,outbound,dFlow_start,port1_done,port1Cb,1'] + ports[1] = ['port2', 'dFlow,outbound,port1_done,port2_done,port2Cb,1'] + return ports } -export function port1Cb(keys: string[], vals: string[]): string[][] { +export function port1Cb (keys: string[], vals: string[]): string[][] { const cfg = [ - ["port", "port1"], - ["callback", "port1Cb"], - ["consumesEvent", "dFlow_start"], - ["producesEvent", "port1_done"], - ]; + ['port', 'port1'], + ['callback', 'port1Cb'], + ['consumesEvent', 'dFlow_start'], + ['producesEvent', 'port1_done'] + ] aegis.log( - "porf invokced" + cfg[0][0] + " " + cfg[0][1] + " " + cfg[0][2] + cfg[0][3] - ); - return cfg; + 'porf invokced' + cfg[0][0] + ' ' + cfg[0][1] + ' ' + cfg[0][2] + cfg[0][3] + ) + return cfg } -export function port2Cb(keys: string[], vals: string[]): string[][] { +export function port2Cb (keys: string[], vals: string[]): string[][] { const cfg = [ - ["port", "port2"], - ["callback", "port2Cb"], - ["consumesEvent", "port1_done"], - ["producesEvent", "port2_done"], - ]; + ['port', 'port2'], + ['callback', 'port2Cb'], + ['consumesEvent', 'port1_done'], + ['producesEvent', 'port2_done'] + ] aegis.log( - "porf invoked" + cfg[0][0] + " " + cfg[0][1] + " " + cfg[0][2] + cfg[0][3] - ); - return cfg; + 'porf invoked' + cfg[0][0] + ' ' + cfg[0][1] + ' ' + cfg[0][2] + cfg[0][3] + ) + return cfg } -export function getCommands(): string[][] { - const commands = new Array(7); - commands[0] = ["serviceMeshListen", "tell wasm module to begin listening"]; - commands[1] = ["serviceMeshNotify", "tell wasm module to send broadcast"]; - commands[2] = ["serviceMeshCallback", "subscribed event fired"]; - commands[3] = ["runFibonacci", "remote calculate fibonacci"]; - commands[4] = ["fibonacci", "calculate fibonacci for a number"]; - commands[5] = ["deployModule", "request deployment of a module"]; - commands[6] = ["commandEx", "command example"]; - return commands; +export function getCommands (): string[][] { + const commands = new Array(7) + commands[0] = ['serviceMeshListen', 'tell wasm module to begin listening'] + commands[1] = ['serviceMeshNotify', 'tell wasm module to send broadcast'] + commands[2] = ['serviceMeshCallback', 'subscribed event fired'] + commands[3] = ['runFibonacci', 'remote calculate fibonacci'] + commands[4] = ['fibonacci', 'calculate fibonacci for a number'] + commands[5] = ['deployModule', 'request deployment of a module'] + commands[6] = ['commandEx', 'command example'] + return commands } -export function commandEx(keys: string[], vals: string[]): string[][] { - aegis.log("\ncommandEx called " + keys[0] + ":" + vals[0]); - const retval = new Array(1); - retval[0] = ["key1", "commandEx_update!"]; - return retval; +export function commandEx (keys: string[], vals: string[]): string[][] { + aegis.log('\ncommandEx called ' + keys[0] + ':' + vals[0]) + const retval = new Array(1) + retval[0] = ['key1', 'commandEx_update!'] + return retval } -export function serviceMeshListen(keys: string[], vals: string[]): void { - aegis.log("serviceMeshListen: " + keys[0] + ": " + vals[0]); - const eventName = findVal("eventName", keys, vals); - aegis.addListener(eventName, "serviceMeshCallback"); +export function serviceMeshListen (keys: string[], vals: string[]): void { + aegis.log('serviceMeshListen: ' + keys[0] + ': ' + vals[0]) + const eventName = findVal('eventName', keys, vals) + aegis.addListener(eventName, 'serviceMeshCallback') } -export function serviceMeshNotify(keys: string[], vals: string[]): void { - const modelName = findVal("modelName", keys, vals); - const modelId = findVal("modelId", keys, vals); - const eventName = findVal("eventName", keys, vals); - const eventData = new Array(3); - aegis.log("wasm notify called with args: " + modelName + ": " + modelId); - eventData[0] = [keys[0], vals[0]]; - eventData[1] = ["modelName", modelName]; - eventData[2] = ["modelId", modelId]; - aegis.fireEvent(eventName || "wasmWebListen", eventData, 1); +export function serviceMeshNotify (keys: string[], vals: string[]): void { + const modelName = findVal('modelName', keys, vals) + const modelId = findVal('modelId', keys, vals) + const eventName = findVal('eventName', keys, vals) + const eventData = new Array(3) + aegis.log('wasm notify called with args: ' + modelName + ': ' + modelId) + eventData[0] = [keys[0], vals[0]] + eventData[1] = ['modelName', modelName] + eventData[2] = ['modelId', modelId] + aegis.fireEvent(eventName || 'wasmWebListen', eventData, 1) } -export function serviceMeshCallback( +export function serviceMeshCallback ( keys: string[], vals: string[] ): string[][] { - aegis.log("websocket callback fired: " + keys[0] + ": " + vals[0]); - const eventName = findVal("eventName", keys, vals); - const eventData = new Array(2); - eventData[0] = [keys[0], vals[0]]; - eventData[1] = [keys[1], vals[1]]; - aegis.fireEvent(eventName + "callback", eventData, 1); - return [["key1", "serviceMeshCallback"]]; + aegis.log('websocket callback fired: ' + keys[0] + ': ' + vals[0]) + const eventName = findVal('eventName', keys, vals) + const eventData = new Array(2) + eventData[0] = [keys[0], vals[0]] + eventData[1] = [keys[1], vals[1]] + aegis.fireEvent(eventName + 'callback', eventData, 1) + return [['key1', 'serviceMeshCallback']] } -export function fibonacci(x: number): number { +export function fibonacci (x: number): number { if (x === 0) { - return 0; + return 0 } if (x === 1) { - return 1; + return 1 } - return fibonacci(x - 1) + fibonacci(x - 2); + return fibonacci(x - 1) + fibonacci(x - 2) } -export function runFibonacci(keys: string[], vals: string[]): string[][] { - let val: number = 0; - let startTime: i64 = Date.now(); +export function runFibonacci (keys: string[], vals: string[]): string[][] { + let val: number = 0 + let startTime: i64 = Date.now() for (let i = 0; i < keys.length; i++) { - if ("fibonacci" == keys[i]) { - val = parseInt(vals[i]); - break; + if ('fibonacci' == keys[i]) { + val = parseInt(vals[i]) + break } } - const sum = fibonacci(val); - const ret = new Array(2); - ret[0] = ["result", sum.toString()]; - ret[1] = ["time", (Date.now() - startTime).toString()]; - return ret; + const sum = fibonacci(val) + const ret = new Array(2) + ret[0] = ['result', sum.toString()] + ret[1] = ['time', (Date.now() - startTime).toString()] + return ret } -export function portEx(keys: string[], vals: string[]): void { - aegis.log("portEx calling port wasmTestPort" + keys[0] + ":" + vals[0]); - return; +export function portEx (keys: string[], vals: string[]): void { + aegis.log('portEx calling port wasmTestPort' + keys[0] + ':' + vals[0]) + return } -export function onUpdate(keys: string[], vals: string[]): string[][] { - return [["updatedByWasm", new Date(Date.now()).toUTCString()]]; +export function onUpdate (keys: string[], vals: string[]): string[][] { + return [['updatedByWasm', new Date(Date.now()).toUTCString()]] } -export function onDelete(keys: string[], vals: string[]): i8 { +export function onDelete (keys: string[], vals: string[]): i8 { // return negative to stop the action - aegis.log("onDelete called"); - return -1; + aegis.log('onDelete called') + return -1 } -export function validate(keys: string[], vals: string[]): void { - aegis.log("onUpdate called"); - return; +export function validate (keys: string[], vals: string[]): void { + aegis.log('onUpdate called') + return } -export function test(keys: string[], values: string[]): string[][] { - const key1 = keys[0] == "key1" ? values[0] : "default"; - const key2 = keys[1] == "key2" ? values[1] : "default"; - const arr = new Array(3); - arr[0] = ["key1", key1]; - arr[1] = ["key2", key2]; - arr[2] = ["key3", "alwaysThisValue"]; - aegis.log("test called"); - return arr; +export function test (keys: string[], values: string[]): string[][] { + const key1 = keys[0] == 'key1' ? values[0] : 'default' + const key2 = keys[1] == 'key2' ? values[1] : 'default' + const arr = new Array(3) + arr[0] = ['key1', key1] + arr[1] = ['key2', key2] + arr[2] = ['key3', 'alwaysThisValue'] + aegis.log('test called') + return arr }