Skip to content

Commit

Permalink
shared datasources
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonrm committed May 8, 2022
1 parent 54b0b32 commit 9970004
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ node_modules
.idea
.DS_Store
tst-rtry.js
.VSCodeCounter/
8 changes: 4 additions & 4 deletions src/adapters/datasources/datasource-memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import DataSource from '../../domain/datasource'
* Temporary in-memory storage.
*/
export class DataSourceMemory extends DataSource {
constructor (map, factory, name) {
super(map, factory, name)
}

/**
* @override
*
Expand Down Expand Up @@ -107,6 +103,10 @@ export class DataSourceMemory extends DataSource {
id
})
}
this.deleteSync(id)
}

async deleteSync (id) {
this.dsMap.delete(id)
}
}
2 changes: 1 addition & 1 deletion src/adapters/datasources/datasource-mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const cacheSize = configRoot.adapters.cacheSize || 3000
* even when the database is offline.
*/
export class DataSourceMongoDb extends DataSourceMemory {
constructor (maps, factory, name) {
constructor (map, factory, name) {
super(map, factory, name)
this.url = url
this.cacheSize = cacheSize
Expand Down
31 changes: 10 additions & 21 deletions src/adapters/datasources/index.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,15 @@
export * from './datasource-memory'
export * from './datasource-file'
export * from './datasource-mongodb'
export * from './datasource-ipfs'
//export * from "./datasource-solid-pod";
// export * from './datasource-ipfs'
// export * from './datasource-solid-pod'

const config = {
getBaseClass (name) {
if (name === 'DataSourceFile') {
return require('.').DataSourceFile
}
if (name === 'DataSourceMongoDb') {
return require('.').DataSourceMongoDb
}
if (name === 'DataSourceIpfs') {
return require('.').DataSourceIpfs
}
// if (name === "DataSourceSolidPod") {
// return require(".").DataSourceSolidPod;
// }
return require('.').DataSourceMemory
},
MEMORYADAPTER: 'DataSourceMemory'
}
import { DataSourceFile } from './datasource-file'
import { DataSourceMemory } from './datasource-memory'
import { DataSourceMongoDb } from './datasource-mongodb'

export default config
export const dsClasses = {
[DataSourceFile.name]: DataSourceFile,
[DataSourceMemory.name]: DataSourceMemory,
[DataSourceMongoDb.name]: DataSourceMongoDb
}
6 changes: 3 additions & 3 deletions src/domain/datasource-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import ModelFactory from '.'
import * as adapters from '../adapters/datasources'
import dsconfig from '../adapters/datasources'
import { dsClasses } from '../adapters/datasources'
import sysconf from '../config'
import DataSource from './datasource'
import { withSharedMemory } from './shared-memory'
Expand Down Expand Up @@ -67,7 +67,7 @@ const DataSourceFactory = (() => {
const { memoryOnly, ephemeral, adapterName } = options

if (memoryOnly || ephemeral) {
return dsconfig.getBaseClass(dsconfig.MEMORYADAPTER)
return dsClasses['DataSourceMemory']
}

if (adapterName) return adapters[adapterName] || DefaultDataSource
Expand All @@ -76,7 +76,7 @@ const DataSourceFactory = (() => {
const url = spec.datasource.url
const cacheSize = spec.datasource.cacheSize
const adapterFactory = spec.datasource.factory
const BaseClass = dsconfig.getBaseClass(spec.datasource.baseClass)
const BaseClass = dsClasses[spec.datasource.baseClass]
return adapterFactory(url, cacheSize, BaseClass)
}

Expand Down
4 changes: 2 additions & 2 deletions src/domain/distributed-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export default function DistributedCache ({
eventName === models.getEventName(models.EventTypes.DELETE, modelName)
) {
console.debug('deleting from cache', modelName, event.modelId)
await datasources.getSharedDataSource(modelName).delete(event.modelId)
await datasources.getDataSource(modelName).delete(event.modelId)
return true
}
return false
Expand Down Expand Up @@ -283,7 +283,7 @@ export default function DistributedCache ({
// find the requested object or objects
const relatedModels = await relationType[relation.type](
model,
datasources.getSharedDataSource(relation.modelName.toUpperCase()),
datasources.getDataSource(relation.modelName.toUpperCase()),
relation
)

Expand Down
34 changes: 34 additions & 0 deletions src/domain/federated-query.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict'

import { EventBrokerFactory } from '.'
import { requireRemoteObject } from './make-relations'

export const FederationMixin = superclass =>
class extends superclass {
/**
* Deserialize
* @override
* @param {*} id
* @returns {import('.').Model}
*/
async find (id) {
try {
const result = await super.find(id)
if (!result || result.length < 1) return this.findRemote(id)
return result
} catch (error) {
console.error({ fn: 'federatedFindById', error })
}
}

async findRemote (id) {
const event = await requireRemoteObject(
null,
{ type: 'findById', modelName: this.name, id },
EventBrokerFactory.getInstance()
)
if (event?.model) return event.model
console.debug('federated findById: no model found')
return {}
}
}
14 changes: 10 additions & 4 deletions src/domain/thread-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import domainEvents from './domain-events'
import ModelFactory, { DataSourceFactory } from '.'
import { performance as perf } from 'perf_hooks'
import os from 'os'
import { Perf } from 'nats/lib/nats-base-client/util'

const { poolOpen, poolClose, poolDrain, poolAbort } = domainEvents
const broker = EventBrokerFactory.getInstance()
Expand Down Expand Up @@ -53,6 +54,8 @@ async function kill (thread, reason) {
try {
return await new Promise(resolve => {
console.info({ msg: 'killing thread', id: thread.id, reason })
const TIMEOUT = 8000
const start = perf.now()

const timerId = setTimeout(async () => {
await thread.mainChannel.terminate()
Expand All @@ -62,12 +65,14 @@ async function kill (thread, reason) {
reason
})
resolve(thread.id)
}, 8000)
}, TIMEOUT)

thread.mainChannel.once('exit', () => {
clearTimeout(timerId)
thread.eventChannel.close()
console.info('clean exit of thread', thread.id, reason)
// the timeout will cause this to run if executed first
if (perf.now() - start < TIMEOUT)
console.info('clean exit of thread', thread.id, reason)
resolve(thread.id)
})

Expand Down Expand Up @@ -128,7 +133,7 @@ function newThread ({ pool, file, workerData }) {
const timerId = setTimeout(async () => {
const message = 'timedout creating thread'
console.error({ fn: newThread.name, message })
reject('timeout')
reject(message)
}, 50000)

worker.once('message', async msg => {
Expand Down Expand Up @@ -300,6 +305,7 @@ export class ThreadPool extends EventEmitter {
this.freeThreads.findIndex(t => t.id === thread.id),
1
)
return this
}

/**
Expand Down Expand Up @@ -847,8 +853,8 @@ const ThreadPoolFactory = (() => {
return Object.freeze({
getThreadPool,
broadcastEvent,
listPools,
fireEvent,
listPools,
reloadAll,
reload,
status,
Expand Down
2 changes: 1 addition & 1 deletion src/domain/use-cases/broker-events.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export default function brokerEvents (
process.on('message', ({ cmd, id, pid, data, name }) => {
if (cmd && id && data && process.pid !== pid) {
if (cmd === 'saveCommand') {
const ds = datasources.getSharedDataSource(name)
const ds = datasources.getDataSource(name)
ds.save(id, data, false)
return
}
Expand Down

0 comments on commit 9970004

Please sign in to comment.