Skip to content

Commit

Permalink
add support for multiple models per domain
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonrm committed Oct 20, 2022
1 parent 70f8b1c commit 3006a29
Show file tree
Hide file tree
Showing 19 changed files with 330 additions and 299 deletions.
4 changes: 2 additions & 2 deletions __test__/controllers/post-model.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ describe('Controllers', function () {
'ABC',
model => ({ model })
)
const addModel = await addModelFactory({
const createModel = await addModelFactory({
modelName: 'ABC',
models: ModelFactory,
repository: DataSourceFactory.getDataSource('ABC'),
broker: EventBrokerFactory.getInstance()
})
const resp = await postModelFactory(addModel)({
const resp = await postModelFactory(createModel)({
body: { a: 'a' },
headers: { 'User-Agent': 'test' },
ip: '127.0.0.1',
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
"nanoid": "3.3.2",
"nats": "2.7.1",
"path-to-regexp": "^6.2.1",
"pretty": "^2.0.0",
"pretty-cli": "^0.0.14",
"python-shell": "^3.0.1",
"query-params-mongo": "^1.1.3",
"regenerator-runtime": "0.13.9",
Expand Down Expand Up @@ -68,7 +70,7 @@
"mocha": "10.0.0",
"mocha-esm": "1.1.1",
"split": "1.0.1",
"standard": "17.0.0",
"standard": "^17.0.0",
"yarn": "1.22.19"
}
}
6 changes: 3 additions & 3 deletions src/adapters/controllers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { UseCases, getUserRoutes } from '../../domain/use-cases'

const {
addModels,
createModels,
editModels,
findModels,
listConfigs,
Expand Down Expand Up @@ -33,7 +33,7 @@ function make (useCases, controllerFactory) {
}))
}

export const postModels = () => make(addModels, postModelFactory)
export const postModels = () => make(createModels, postModelFactory)
export const patchModels = () => make(editModels, patchModelFactory)
export const getModels = () => make(listModels, getModelsFactory)
export const getModelsById = () => make(findModels, getModelByIdFactory)
Expand All @@ -50,7 +50,7 @@ export const initCache = () => {

async function loadModelInstances () {
console.time(label)
await Promise.allSettled(specs.map(async m => m && m.fn ? m.fn() : false))
//await Promise.allSettled(specs.map(async m => m && m.fn ? m.fn() : false))
console.timeEnd(label)
}

Expand Down
2 changes: 1 addition & 1 deletion src/adapters/controllers/live-rollout.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/**
*
* @param {import("../use-cases/add-model").addModel} addModel
* @param {import("../use-cases/add-model").createModel} addModel
* @param {function():string} hash
* @returns {import("./http-adapter").httpController}
*/
Expand Down
2 changes: 1 addition & 1 deletion src/adapters/controllers/live-update.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/**
*
* @param {import("../use-cases/add-model").addModel} addModel
* @param {import("../use-cases/add-model").createModel} addModel
* @param {function():string} hash
* @returns {import("./http-adapter").httpController}
*/
Expand Down
8 changes: 4 additions & 4 deletions src/adapters/controllers/post-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

/**
*
* @param {import("../use-cases/add-model").addModel} addModel
* @param {import("../use-cases/add-model").createModel} createModel
* @param {function():string} hash
* @returns {import("./http-adapter").httpController}
*/
export default function postModelFactory (addModel) {
export default function postModelFactory (createModel) {
return async function postModel (httpRequest) {
try {
httpRequest.log(postModel.name)

const model = await addModel(httpRequest.body)
const model = await createModel(httpRequest.body)

console.debug({ function: addModel.name, output: model })
console.debug({ function: createModel.name, output: model })

return {
headers: {
Expand Down
109 changes: 51 additions & 58 deletions src/adapters/datasources/datasource-mongodb.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
'use strict'
"use strict"

import ModelFactory from '../../domain'
import { EventBrokerFactory } from '../../domain'
import { EventBrokerFactory } from "../../domain"

const broker = EventBrokerFactory.getInstance()

const HIGHWATERMARK = 50

const mongodb = require('mongodb')
const mongodb = require("mongodb")
const { MongoClient } = mongodb
const { DataSourceMemory } = require('./datasource-memory')
const { Transform, Writable } = require('stream')
const qpm = require('query-params-mongo')
const { DataSourceMemory } = require("./datasource-memory")
const { Transform, Writable } = require("stream")
const qpm = require("query-params-mongo")
const processQuery = qpm({
autoDetect: [{ fieldPattern: /_id$/, dataType: 'objectId' }],
converters: { objectId: mongodb.ObjectId }
autoDetect: [{ fieldPattern: /_id$/, dataType: "objectId" }],
converters: { objectId: mongodb.ObjectId },
})

const url = process.env.MONGODB_URL || 'mongodb://localhost:27017'
const configRoot = require('../../config').hostConfig
const url = process.env.MONGODB_URL || "mongodb://localhost:27017"
const configRoot = require("../../config").hostConfig
const dsOptions = configRoot.adapters.datasources.DataSourceMongoDb.options || {
runOffline: true,
numConns: 2
numConns: 2,
}
const cacheSize = configRoot.adapters.cacheSize || 3000

Expand All @@ -31,8 +30,8 @@ const cacheSize = configRoot.adapters.cacheSize || 3000
const connections = []

const mongoOpts = {
//useNewUrlParserd: true,
useUnifiedTopology: true
//useNewUrlParser: true,
useUnifiedTopology: true,
}

/**
Expand All @@ -41,15 +40,15 @@ const mongoOpts = {
* even when the database is offline.
*/
export class DataSourceMongoDb extends DataSourceMemory {
constructor (map, factory, name) {
super(map, factory, name)
constructor (map, factory, name, options) {
super(map, factory, name, options)
this.cacheSize = cacheSize
this.mongoOpts = mongoOpts
// keep running even if db is down
this.className = this.constructor.name
this.runOffline = dsOptions.runOffline
this.domain = options?.domain || name
this.url = url
this.className = this.constructor.name
//console.log(this)
console.log(this)
}

async connection () {
Expand All @@ -58,7 +57,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
const client = new MongoClient(this.url, this.mongoOpts)
await client.connect()
connections.push(client)
client.on('connectionClosed', () =>
client.on("connectionClosed", () =>
connections.splice(connections.indexOf(client), 1)
)
}
Expand All @@ -71,7 +70,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
}

async collection () {
return (await this.connection()).db(this.name).collection(this.name)
return (await this.connection()).db(this.domain).collection(this.name)
}

/**
Expand All @@ -94,7 +93,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
async loadModels () {
try {
const cursor = (await this.collection()).find().limit(this.cacheSize)
cursor.forEach(model => super.saveSync(model.id, model))
cursor.forEach((model) => super.saveSync(model.id, model))
} catch (error) {
console.error({ fn: this.loadModels.name, error })
}
Expand Down Expand Up @@ -144,11 +143,9 @@ export class DataSourceMongoDb extends DataSourceMemory {
async saveDb (id, data) {
try {
const clone = JSON.parse(this.serialize(data))
await (await this.collection()).replaceOne(
{ _id: id },
{ ...clone, _id: id },
{ upsert: true }
)
await (
await this.collection()
).replaceOne({ _id: id }, { ...clone, _id: id }, { upsert: true })
return clone
} catch (error) {
console.error({ fn: this.saveDb.name, error })
Expand All @@ -175,11 +172,11 @@ export class DataSourceMongoDb extends DataSourceMemory {
if (!this.runOffline) {
this.deleteSync(id)
// after delete mem and db are sync'd
console.error('db trans failed, rolled back')
console.error("db trans failed, rolled back")
return
}
// run while db is down - cache will be ahead
console.error('db trans failed, sync it later')
console.error("db trans failed, sync it later")
return data
}
return cache
Expand All @@ -202,14 +199,14 @@ export class DataSourceMongoDb extends DataSourceMemory {
const ctx = this

async function upsert () {
const operations = objects.map(str => {
const operations = objects.map((str) => {
const obj = JSON.parse(str)
return {
replaceOne: {
filter: { ...filter, _id: obj.id },
replacement: { ...obj, _id: obj.id },
upsert: true
}
upsert: true,
},
}
})

Expand All @@ -219,7 +216,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
const result = await col.bulkWrite(operations)
console.log(result.getRawResponse())
objects = []
} catch (error) {}
} catch (error) { }
}
}

Expand All @@ -236,13 +233,13 @@ export class DataSourceMongoDb extends DataSourceMemory {
end (chunk, _, done) {
objects.push(chunk)
done()
}
},
})

writable.on('finish', async () => await upsert())
writable.on("finish", async () => await upsert())

return writable
} catch (error) {}
} catch (error) { }
}

/**
Expand Down Expand Up @@ -283,15 +280,15 @@ export class DataSourceMongoDb extends DataSourceMemory {

// start of array
construct (callback) {
this.push('[')
this.push("[")
callback()
},

// each chunk is a record
transform (chunk, _encoding, callback) {
// comma-separate
if (first) first = false
else this.push(',')
else this.push(",")

// serialize record
this.push(JSON.stringify(chunk))
Expand All @@ -300,36 +297,31 @@ export class DataSourceMongoDb extends DataSourceMemory {

// end of array
flush (callback) {
this.push(']')
this.push("]")
callback()
}
},
})

return new Promise(async (resolve, reject) => {
const readable = (await this.mongoFind(options)).stream()

readable.on('error', reject)
readable.on('end', resolve)
readable.on("error", reject)
readable.on("end", resolve)

// optionally transform db stream then pipe to output
if (serialize && transform)
readable
.pipe(transform)
.pipe(serializer)
.pipe(writable)
readable.pipe(transform).pipe(serializer).pipe(writable)
else if (serialize) readable.pipe(serializer).pipe(writable)
else if (transform) readable.pipe(transform).pipe(writable)
else readable.pipe(writable)
})
} catch (error) {}
} catch (error) { }
}

processOptions ({ options, query }) {
if (options) {
return options
}
if (query) {
return processQuery(query)
return {
...processQuery(query),
...options,
}
}

Expand Down Expand Up @@ -361,11 +353,12 @@ export class DataSourceMongoDb extends DataSourceMemory {
transform = null,
serialize = true,
options = null,
query = null
query = null,
} = {}) {
try {
if (query?.__cached) return super.listSync(query)
if (query?.__count) return this.count()

const processedOptions = this.processOptions({ options, query })
console.log({ processedOptions })

Expand All @@ -374,7 +367,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
writable,
serialize,
transform,
options: processedOptions
options: processedOptions,
})
}

Expand All @@ -388,7 +381,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
return {
total: await this.countDb(),
cached: this.getCacheSize(),
bytes: this.getCacheSizeBytes()
bytes: this.getCacheSizeBytes(),
}
}

Expand Down Expand Up @@ -436,9 +429,9 @@ export class DataSourceMongoDb extends DataSourceMemory {
}

/**
*
* @param {*} filter
* @returns
*
* @param {*} filter
* @returns
*/
async containsMany (filter) {
return (await this.collection()).find(filter).toArray()
Expand Down
Loading

0 comments on commit 3006a29

Please sign in to comment.