Skip to content

Commit

Permalink
Switch to a more generic user-shared-buffer based mechanism for shari…
Browse files Browse the repository at this point in the history
…ng buffer
  • Loading branch information
kriszyp committed Apr 12, 2024
1 parent fd653f0 commit 9fa48f6
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 25 deletions.
4 changes: 2 additions & 2 deletions native.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export let Env,
getSharedBuffer,
compress,
directWrite,
getIncrementer,
getUserSharedBuffer,
attemptLock,
unlock;
path = pathModule;
Expand Down Expand Up @@ -104,7 +104,7 @@ export function setNativeFunctions(externals) {
position = externals.position;
resetTxn = externals.resetTxn;
directWrite = externals.directWrite;
getIncrementer = externals.getIncrementer;
getUserSharedBuffer = externals.getUserSharedBuffer;
attemptLock = externals.attemptLock;
unlock = externals.unlock;
getCurrentValue = externals.getCurrentValue;
Expand Down
6 changes: 3 additions & 3 deletions read.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
startRead,
setReadCallback,
directWrite,
getIncrementer,
getUserSharedBuffer,
attemptLock,
unlock,
} from './native.js';
Expand Down Expand Up @@ -391,10 +391,10 @@ export function addReadMethods(
if (rc < 0) lmdbError(rc);
},

getIncrementer(id, startingValue) {
getUserSharedBuffer(id, defaultBuffer) {
keyBytes.dataView.setUint32(0, this.db.dbi);
let keySize = this.writeKey(id, keyBytes, 4);
return getIncrementer(env.address, keySize, startingValue);
return getUserSharedBuffer(env.address, keySize, defaultBuffer);
},

attemptLock(id, version, callback) {
Expand Down
29 changes: 14 additions & 15 deletions src/env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1059,37 +1059,36 @@ uint64_t ExtendedEnv::getNextTime() {
uint64_t ExtendedEnv::getLastTime() {
return bswap_64(lastTime);
}
NAPI_FUNCTION(getIncrementer) {
NAPI_FUNCTION(getUserSharedBuffer) {
ARGS(3)
GET_INT64_ARG(0)
EnvWrap* ew = (EnvWrap*) i64;
uint32_t size;
GET_UINT32_ARG(size, 1);
int64_t starting_value;
napi_get_value_int64(env, args[2], &starting_value);
MDB_val default_buffer;
napi_get_arraybuffer_info(env, args[2], &default_buffer.mv_data, &default_buffer.mv_size);
ExtendedEnv* extend_env = (ExtendedEnv*) mdb_env_get_userctx(ew->env);
std::string key(ew->keyBuffer, size);
// get an incrementer with the key, starting value, and convert pointer to an array buffer
int64_t* incrementer = extend_env->getIncrementer(key, starting_value, env);
MDB_val buffer = extend_env->getUserSharedBuffer(key, default_buffer, env);
if (buffer.mv_data == default_buffer.mv_data) return args[2];
napi_value return_value;
napi_create_external_arraybuffer(env, incrementer, 8, cleanupLMDB, (void*) incrementer, &return_value);
napi_create_external_arraybuffer(env, buffer.mv_data, buffer.mv_size, cleanupLMDB, buffer.mv_data, &return_value);
return return_value;
}

int64_t* ExtendedEnv::getIncrementer(std::string key, int64_t starting_value, napi_env env) {
MDB_val ExtendedEnv::getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_env env) {
pthread_mutex_lock(&locksModificationLock);
auto resolution = incrementers.find(key);
auto resolution = userSharedBuffers.find(key);
bool found;
int64_t* incrementer;
if (resolution == incrementers.end()) {
incrementer = new int64_t;
*incrementer = starting_value;
incrementers.emplace(key, incrementer);
MDB_val user_shared_buffer;
if (resolution == userSharedBuffers.end()) {
userSharedBuffers.emplace(key, user_shared_buffer = default_buffer);
} else {
incrementer = resolution->second;
user_shared_buffer = resolution->second;
}
pthread_mutex_unlock(&locksModificationLock);
return incrementer;
return user_shared_buffer;
}
bool ExtendedEnv::attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew) {
pthread_mutex_lock(&locksModificationLock);
Expand Down Expand Up @@ -1250,7 +1249,7 @@ void EnvWrap::setupExports(Napi::Env env, Object exports) {
EXPORT_NAPI_FUNCTION("getSharedBuffer", getSharedBuffer);
EXPORT_NAPI_FUNCTION("setTestRef", setTestRef);
EXPORT_NAPI_FUNCTION("getTestRef", getTestRef);
EXPORT_NAPI_FUNCTION("getIncrementer", getIncrementer);
EXPORT_NAPI_FUNCTION("getUserSharedBuffer", getUserSharedBuffer);
EXPORT_NAPI_FUNCTION("attemptLock", attemptLock);
EXPORT_NAPI_FUNCTION("unlock", unlock);
EXPORT_FUNCTION_ADDRESS("writePtr", writeFFI);
Expand Down
4 changes: 2 additions & 2 deletions src/lmdb-js.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ class ExtendedEnv {
static MDB_txn* prefetchTxns[20];
static pthread_mutex_t* prefetchTxnsLock;
std::unordered_map<std::string, callback_holder_t> lock_callbacks;
std::unordered_map<std::string, int64_t*> incrementers;
std::unordered_map<std::string, MDB_val> userSharedBuffers;
pthread_mutex_t locksModificationLock;
uint64_t lastTime; // actually encoded as double
uint64_t previousTime; // actually encoded as double
int64_t* getIncrementer(std::string key, int64_t starting_value, napi_env env);
MDB_val getUserSharedBuffer(std::string key, MDB_val default_buffer, napi_env env);
bool attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew);
bool unlock(std::string key, bool only_check);
uint64_t getNextTime();
Expand Down
15 changes: 12 additions & 3 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -883,15 +883,24 @@ describe('lmdb-js', function () {
should.equal(results[i], 'value' + i);
}
});
it('getIncrementer', function () {
it('getUserSharedBuffer', function () {
let defaultIncrementer = new BigInt64Array(1);
defaultIncrementer[0] = 4n;
let incrementer = new BigInt64Array(
db.getIncrementer('incrementer-test', 4),
db.getUserSharedBuffer('incrementer-test', defaultIncrementer.buffer),
);
should.equal(Atomics.add(incrementer, 0, 1n), 4n);
let secondDefaultIncrementer = new BigInt64Array(1); //should not get used
incrementer = new BigInt64Array( // should return same incrementer
db.getIncrementer('incrementer-test', 1),
db.getUserSharedBuffer(
'incrementer-test',
secondDefaultIncrementer.buffer,
),
);
should.equal(defaultIncrementer[0], 5n);
should.equal(Atomics.add(incrementer, 0, 1n), 5n);
should.equal(defaultIncrementer[0], 6n);
should.equal(secondDefaultIncrementer[0], 0n);
});
it('prefetch', async function () {
await new Promise((resolve) => db.prefetch(['key1', 'key2'], resolve));
Expand Down

0 comments on commit 9fa48f6

Please sign in to comment.