Skip to content

Commit

Permalink
fix find
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonrm committed Nov 22, 2022
1 parent 8484272 commit ce933a5
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 160 deletions.
Binary file removed .start.sh.swp
Binary file not shown.
7 changes: 7 additions & 0 deletions forkrun.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ char arg[BUFSIZE];
int main (void) {
pid_t pid;




if ((pid = fork()) == 0) {
snprintf(cmd, BUFSIZE, "%s", getenv(envvar_cmd));
snprintf(arg, BUFSIZE, "%s", getenv(envvar_arg));
Expand All @@ -29,3 +32,7 @@ int main (void) {

return EXIT_FAILURE;
}

int modelFactory(int i) {
return 56;
}
2 changes: 1 addition & 1 deletion repo.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const express = require('express')
const app = express()
const port = 8000
const port = 8880

app.use(express.json())
app.use(express.static('dist'))
Expand Down
10 changes: 3 additions & 7 deletions src/adapters/datasources/datasource-mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,7 @@ export class DataSourceMongoDb extends DataSource {
*/

async mongoFind ({ filter, sort, limit, aggregate, skip } = {}) {
console.log({
ctor: this.constructor.name,
fn: this.mongoFind.name,
filter
})
console.log({ fn: this.mongoFind.name, filter })
let cursor = (await this.collection()).find(filter)
if (sort) cursor = cursor.sort(sort)
if (aggregate) cursor = cursor.aggregate(aggregate)
Expand Down Expand Up @@ -232,14 +228,14 @@ export class DataSourceMongoDb extends DataSource {
},

// each chunk is a record
transform (chunk, _encoding, callback) {
transform (chunk, _encoding, next) {
// comma-separate
if (first) first = false
else this.push(',')

// serialize record
this.push(JSON.stringify(chunk))
callback()
next()
},

// end of array
Expand Down
21 changes: 21 additions & 0 deletions src/domain/datasource-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ const DsCoreExtensions = superclass =>
})
}

/**
* @override
* @param {*} options
* @returns
*/
async list (options) {
if (options?.writable)
return isMainThread
Expand All @@ -112,6 +117,22 @@ const DsCoreExtensions = superclass =>
ModelFactory.loadModel(broker, this, model, this.name)
)
}

/**
* @override
* @param {*} id
* @returns
*/
async delete (id) {
try {
await super.delete(id)
} catch (error) {
console.error(error)
return
}
// only if super succeeds
this.deleteSync(id)
}
}

const extendClass = DsClass => class extends DsCoreExtensions(DsClass) {}
Expand Down
31 changes: 23 additions & 8 deletions src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,14 @@
*/

/**
* @typedef {object} datasource
* @typedef datasourceConfig
* @property {import('./datasource').default} datasource
* @property {string} url - physical storage location: e.g. database url, file path
* @property {function()} adapterFactory - factory function to construct datasource adapter
* @property {string} baseClass - name of base class to extend
* @property {function({DataSource}):DataSourcelw4l} adapterFactory - factory function to construct datasource adapter
* @property {string} baseClass - name owkllllllllllllllf base class to extend
* @property {number} [cacheSize] - maxium number of cached instances before purging
* @property {number} [cacheSizeKb] - maximum size in kilobytes of cached instances before cache purge
* @property {boolean} [cachedWrite] - allow cached instances of an object to write to persistent storage
* @property {()=>[]} list
* @property {(id)=>{}} find
* @property {(id,data)=>data} save
* @property {import('.').ports} factory
*/

/**
Expand Down Expand Up @@ -173,7 +170,7 @@
* URL parameter or query of the auto-generated REST API
* @property {accessControlList} [accessControlList] - configure authorization
* @property {number} [start] - create `start` instances of the model
* @property {datasource} [datasource] - define custom datasource
* @property {datasourceConfig} [datasource] - define custom datasource
* @property {Array<{ [method:string]: function(), path: string }>} [routes] - custom routes
*/

Expand Down Expand Up @@ -431,6 +428,24 @@ export async function importRemoteCache (name) {
}
}

/**
* The total number of domains deployed to a host.
* A domain consists of one or more models.
* Each model represents a domain unless it specifies
* the name of another model in `ModelSpecification.domain`,
* in which case it is a subdomain within the bounded context
* of that domain or simply a supporting entity. Such models
* run in the same threadpool and share the same storage
* namespace (e.g. collections or tables in the same database)
*
*
* @returns {number} sum of domains deployed to host
*/
export const totalDomains = () =>
ModelFactory.getModelSpecs().filter(
s => !s.isCached && (!s.domain || s.modelName === s.domain)
).length

export { UseCaseService } from './use-cases'

export { default as EventBrokerFactory } from './event-broker'
Expand Down
4 changes: 2 additions & 2 deletions src/domain/make-relations.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export const relationType = {
const filter = { [rel.foreignKey]: model.getId() }
// retrieve from memory
const memory = ds.listSync(filter)
// call datasource interface to fetch from external storage
// retrieve from from external storage
const externalMedia = await ds.list({ query: filter })
// return all
if (memory.length > 0)
Expand All @@ -50,7 +50,7 @@ export const relationType = {
// return if found
if (memory) return memory
// if not, call ds interface to search external storage
return ds.find({ id: model[rel.foreignKey] })
return ds.find(model[rel.foreignKey])
},

/**
Expand Down
20 changes: 10 additions & 10 deletions src/domain/thread-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import EventBrokerFactory from './event-broker'
import { EventEmitter } from 'stream'
import { Worker, BroadcastChannel } from 'worker_threads'
import domainEvents from './domain-events'
import ModelFactory from '.'
import ModelFactory, { totalDomains } from '.'
import os from 'os'
import { AsyncResource } from 'async_hooks'
import { requestContext } from '.'
Expand Down Expand Up @@ -669,19 +669,20 @@ const ThreadPoolFactory = (() => {

/**
* By default the system-wide thread upper limit = the total # of cores.
* The default behavior is to spread threads/cores evenly between models.
* The default behavior is to spread cores evenly between domains. In
* the ModelSpec, this includes standalone models, e.g. models that
* have no domain configured or whose domain name is the same as its modelName.
*
* @param {*} options
* @returns
*/
function calculateMaxThreads (options) {
// defer to explicitly set value
if (options?.maxThreads) return options.maxThreads
// get the total number of domains
const nApps = ModelFactory.getModelSpecs().filter(
s => !s.isCached && s.modelName === s.domain
).length
// divide the total cpu count by the number of domains
return Math.floor(os.cpus().length / nApps || DEFAULT_THREADPOOL_MAX || 1)
return Math.floor(
os.cpus().length / totalDomains() || DEFAULT_THREADPOOL_MAX
)
}

/**
Expand Down Expand Up @@ -754,7 +755,7 @@ const ThreadPoolFactory = (() => {
jobName,
jobData,
modelName,
options
)
},
status () {
Expand Down Expand Up @@ -786,8 +787,7 @@ const ThreadPoolFactory = (() => {

/**
* This is the hot reload. Drain the pool,
* stop the existing threads & start new
* ones, which will have the latest code
* stop the pool and then start lsskk
* @param {string} poolName i.e. modelName
* @returns {Promise<ThreadPool>}
* @throws {ReloadError}
Expand Down
19 changes: 6 additions & 13 deletions src/domain/use-cases/find-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,18 @@ export default function makeFindModel ({
return async function findModel ({ id, query, model }) {
if (isMainThread) {
// Main thread performs read operations
const model = await repository.find(id)
const modelInst = await repository.find(id)

if (!model) {
if (!modelInst) {
throw new Error('Not Found')
}

console.log({ fn: findModel.name, model })
console.log({ fn: findModel.name, model: modelInst })
// Only send to app thread if data must be enriched
if (!query.relation && !query.command) return model
if (!query.relation && !query.command) return modelInst

return await threadpool.runJob(
findModel.name,
{
id,
query,
model
},
modelName
)
const input = { id, query, model: modelInst }
return await threadpool.runJob(findModel.name, input, modelName)
} else {
try {
const hydrateModel = model =>
Expand Down
4 changes: 1 addition & 3 deletions src/domain/use-cases/hot-reload.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,13 @@ export default function makeHotReload ({ models, broker } = {}) {

try {
if (modelName && modelName !== '*') {
const spec = models.getModelSpec(modelName.toUpperCase())
const spec = models.getModelSpec(modelName)
if (!spec) throw new Error(`model not found ${modelName}`)
const poolName = spec.domain || modelName
// compile()
console.log('reloading pool', poolName)
await ThreadPoolFactory.reload(poolName)
return ThreadPoolFactory.status(poolName)
} else {
// compile()
console.log('reloading all pools')
await ThreadPoolFactory.reloadPools()
return ThreadPoolFactory.status()
Expand Down
Loading

0 comments on commit ce933a5

Please sign in to comment.