Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async kv map #257

Merged
merged 7 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@olli/kvdex",
"version": "3.0.2",
"version": "3.1.0",
"exports": {
".": "./mod.ts",
"./zod": "./src/ext/zod/mod.ts",
Expand Down
17 changes: 9 additions & 8 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ export class Collection<

// Create hsitory entries iterator
const listOptions = createListOptions(options);
const iter = this.kv.list(selector, listOptions);
const iter = await this.kv.list(selector, listOptions);

// Collect history entries
let count = 0;
Expand Down Expand Up @@ -1258,7 +1258,7 @@ export class Collection<
// Perform quick delete if all documents are to be deleted
if (selectsAll(options)) {
// Create list iterator and empty keys list, init atomic operation
const iter = this.kv.list({ prefix: this._keys.base }, options);
const iter = await this.kv.list({ prefix: this._keys.base }, options);

const keys: DenoKvStrictKey[] = [];
const atomic = new AtomicWrapper(this.kv);
Expand All @@ -1270,7 +1270,8 @@ export class Collection<

// Set history entries if keeps history
if (this._keepsHistory) {
for await (const { key } of this.kv.list({ prefix: this._keys.id })) {
const historyIter = await this.kv.list({ prefix: this._keys.id });
for await (const { key } of historyIter) {
const id = getDocumentId(key as DenoKvStrictKey);

if (!id) {
Expand Down Expand Up @@ -1836,7 +1837,7 @@ export class Collection<

// Perform efficient count if counting all document entries
if (selectsAll(options)) {
const iter = this.kv.list({ prefix: this._keys.id }, options);
const iter = await this.kv.list({ prefix: this._keys.id }, options);
for await (const _ of iter) {
result++;
}
Expand Down Expand Up @@ -2073,8 +2074,8 @@ export class Collection<
const atomic = new AtomicWrapper(this.kv);
const historyKeyPrefix = extendKey(this._keys.history, id);
const historySegmentKeyPrefix = extendKey(this._keys.historySegment, id);
const historyIter = this.kv.list({ prefix: historyKeyPrefix });
const historySegmentIter = this.kv.list({
const historyIter = await this.kv.list({ prefix: historyKeyPrefix });
const historySegmentIter = await this.kv.list({
prefix: historySegmentKeyPrefix,
});

Expand Down Expand Up @@ -2464,7 +2465,7 @@ export class Collection<
if (this._encoder) {
const atomic = new AtomicWrapper(this.kv);
const keyPrefix = extendKey(this._keys.segment, id);
const iter = this.kv.list({ prefix: keyPrefix });
const iter = await this.kv.list({ prefix: keyPrefix });

for await (const { key } of iter) {
atomic.delete(key as DenoKvStrictKey);
Expand Down Expand Up @@ -2585,7 +2586,7 @@ export class Collection<
// Create list iterator with given options
const selector = createListSelector(prefixKey, options);
const listOptions = createListOptions(options);
const iter = this.kv.list(selector, listOptions);
const iter = await this.kv.list(selector, listOptions);

// Initiate lists
const docs: Document<TOutput, ParseId<TOptions>>[] = [];
Expand Down
33 changes: 33 additions & 0 deletions src/ext/kv/async_lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
export class AsyncLock {
private queue: PromiseWithResolvers<void>[];

constructor() {
this.queue = [];
}

async run<T>(fn: () => Promise<T>): Promise<T> {
await this.lock();
const result = await fn();
this.release();
return result;
}

async close() {
for (const lock of this.queue) {
lock.resolve();
await lock.promise;
}
}

private async lock() {
const prev = this.queue.at(-1);
const next = Promise.withResolvers<void>();
this.queue.push(next);
await prev?.promise;
}

private release() {
const lock = this.queue.shift();
lock?.resolve();
}
}
83 changes: 46 additions & 37 deletions src/ext/kv/atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import type {
DenoKvSetOptions,
DenoKvStrictKey,
} from "../../types.ts";
import { allFulfilled } from "../../utils.ts";
import type { AsyncLock } from "./async_lock.ts";
import type { MapKv } from "./map_kv.ts";
import { createVersionstamp } from "./utils.ts";

export class MapKvAtomicOperation implements DenoAtomicOperation {
private kv: MapKv;
private checks: (() => boolean)[];
private ops: ((versionstamp: string) => void)[];
private lock: AsyncLock;
private checks: (() => Promise<boolean>)[];
private ops: ((versionstamp: string) => Promise<void>)[];

constructor(kv: MapKv) {
constructor(kv: MapKv, lock: AsyncLock) {
this.kv = kv;
this.lock = lock;
this.checks = [];
this.ops = [];
}
Expand All @@ -26,9 +30,9 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
value: unknown,
options?: DenoKvSetOptions,
): DenoAtomicOperation {
this.ops.push((versionstamp) =>
this.kv._set(key, value, versionstamp, options)
);
this.ops.push(async (versionstamp) => {
await this.kv._set(key, value, versionstamp, options);
});
return this;
}

Expand All @@ -38,10 +42,10 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

min(key: DenoKvStrictKey, n: bigint): DenoAtomicOperation {
this.ops.push((versionstamp) => {
const { value } = this.kv.get(key);
this.ops.push(async (versionstamp) => {
const { value } = await this.kv.get(key);
if (!value) {
this.kv._set(key, { value: n }, versionstamp);
await this.kv._set(key, { value: n }, versionstamp);
return;
}

Expand All @@ -50,7 +54,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
throw new Error("Min operation can only be performed on KvU64 value");
}

this.kv._set(key, {
await this.kv._set(key, {
value: n < val ? n : val,
}, versionstamp);
});
Expand All @@ -59,10 +63,10 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

max(key: DenoKvStrictKey, n: bigint): DenoAtomicOperation {
this.ops.push((versionstamp) => {
const { value } = this.kv.get(key);
this.ops.push(async (versionstamp) => {
const { value } = await this.kv.get(key);
if (!value) {
this.kv._set(key, { value: n }, versionstamp);
await this.kv._set(key, { value: n }, versionstamp);
return;
}

Expand All @@ -71,7 +75,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
throw new Error("Max operation can only be performed on KvU64 value");
}

this.kv._set(key, {
await this.kv._set(key, {
value: n > val ? n : val,
}, versionstamp);
});
Expand All @@ -80,10 +84,10 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

sum(key: DenoKvStrictKey, n: bigint): DenoAtomicOperation {
this.ops.push((versionstamp) => {
const { value } = this.kv.get(key);
this.ops.push(async (versionstamp) => {
const { value } = await this.kv.get(key);
if (!value) {
this.kv._set(key, { value: n }, versionstamp);
await this.kv._set(key, { value: n }, versionstamp);
return;
}

Expand All @@ -92,7 +96,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
throw new Error("Sum operation can only be performed on KvU64 value");
}

this.kv._set(key, {
await this.kv._set(key, {
value: n + val,
}, versionstamp);
});
Expand All @@ -102,8 +106,8 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {

check(...checks: DenoAtomicCheck[]): DenoAtomicOperation {
checks.forEach(({ key, versionstamp }) => {
this.checks.push(() => {
const entry = this.kv.get(key);
this.checks.push(async () => {
const entry = await this.kv.get(key);
return entry.versionstamp === versionstamp;
});
});
Expand All @@ -112,31 +116,36 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

enqueue(value: unknown, options?: DenoKvEnqueueOptions): DenoAtomicOperation {
this.ops.push((versionstamp) => {
this.kv._enqueue(value, versionstamp, options);
this.ops.push(async (versionstamp) => {
await this.kv._enqueue(value, versionstamp, options);
});

return this;
}

commit(): DenoKvCommitError | DenoKvCommitResult {
const passedChecks = this.checks
.map((check) => check())
.every((check) => check);
async commit(): Promise<DenoKvCommitError | DenoKvCommitResult> {
return await this.lock.run(async () => {
const checks = await Promise.allSettled(
this.checks.map((check) => check()),
);

if (!passedChecks) {
return {
ok: false,
};
}
const passedChecks = checks.every((checkResult) =>
checkResult.status === "fulfilled" && checkResult.value
);

const versionstamp = createVersionstamp();
if (!passedChecks) {
return {
ok: false,
};
}

this.ops.forEach((op) => op(versionstamp));
const versionstamp = createVersionstamp();
await allFulfilled(this.ops.map((op) => op(versionstamp)));

return {
ok: true,
versionstamp,
};
return {
ok: true,
versionstamp,
};
});
}
}
Loading
Loading