diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..aa1c94e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "deno.enable": true, + "deno.unstable": true +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..533d64e --- /dev/null +++ b/README.md @@ -0,0 +1,168 @@ +# Workerpool + +An unopinionated small scale worker pool abstraction which serves as a base interface for more advanced worker managers. + +**Note:** This is NOT a drop-in replacement of the NPM `workerpool`, but it's easy to create the same functionality (see _Web Workers_ section below). + +**Note:** This is not yet a good candidate for large scale distributed queues, cluster awareness such as mutex and event ack have to be in place before that. If you think solutions in such a scale is viable in Deno, consider [buy me a coffee](https://buymeacoffee.com/vicary) and I'll make it happen. + +## Terminology + +1. **Workerpool** + + A manager that creates workers on the fly, executing tasks up to defined + concurrency. + +2. **Workers** + + Workers\* are internal wrappers for user provided runner classes, they maintain internal states such as active/busy and the retry counter. + + _\* Not to be confused with Web Workers._ + +3. **Runners** + + User implementation of task executors, where they all implements the `Runner` + interface. + +4. **Tasks** + + Tasks are named payloads enqueued into a workerpool. + +## Basic Usage + +```ts +class RunnerA implements Runner {...} +class RunnerB implements Runner {...} + +const pool = new Workerpool({ + concurrency: 2, + runners: [RunnerA, RunnerB] +}); + +pool + .enqueue({ name: "RunnerA", payload: {...} }) + .enqueue({ name: "RunnerB", payload: {...} }) + .start(); +``` + +## Runner Examples + +### In-memory Queue + +As a proof of concept, this is the most basic implementation of an in-memory queue. + +```ts +type Payload = any; + +type MemoryMutexTask = RunnerTask & { active?: boolean }; + +const tasks: MemoryMutexTask[] = []; + +const pool = new Workerpool({ + concurrency: 1, + runners: [runnerA, runnerB], + enqueue: (task: MemoryMutexTask) => { + if (tasks.includes(task)) { + task.active = false; + } else { + tasks.push(task); + } + }, + dequeue: () => { + // Uncomment the following line for FIFO queues + // if (tasks.find(({ active }) => active)) return; + + const task = tasks.find(({ active }) => !active); + if (task) { + task.active = true; + return task; + } + }, + success: (task) => { + const index = tasks.indexOf(task); + if (index > -1) { + tasks.splice(index, 1); + } + }, +}); +``` + +### Web Workers + +Deno has built-in support for workers, I'll use `comlink` to reduce codebase for simplicity. + +You'll need a separated script file for the worker. + +```ts +// myworker.ts +import { expose } from "https://cdn.skypack.dev/comlink?dts"; + +expose({ + execute: async (payload: string) => { + // Simulate async actions + await new Promise((resolve) => setTimeout(resolve, 1000)); + + return `Worker echo: ${payload}`; + }, +}); +``` + +And a proxy class in your main thread. + +```ts +// myrunner.ts +import { + Remote, + UnproxyOrClone, + wrap, +} from "https://cdn.skypack.dev/comlink?dts"; +import type { Runner } from "https://deno.land/x/workerpool/mod.ts"; + +export class MyRunner + implements Runner +{ + #worker: Remote>; + + constructor() { + const worker = new Worker(new URL("./myworker.ts", import.meta.url).href, { + type: "module", + }); + + this.#worker = wrap>(worker); + } + + execute(payload: TPayload) { + const result = this.#worker.execute(payload as UnproxyOrClone); + + return result as Promisable; + } + + async onSuccess(result: TResult) { + const onSuccess = await this.#worker.onSuccess; + return onSuccess?.(result as UnproxyOrClone); + } + + async onFailure(error: Error) { + const onFailure = await this.#worker.onFailure; + return onFailure?.(error); + } + + dispose() { + return this.#worker.terminate(); + } +} +``` + +Now register the runners into the workerpool: + +```ts +const pool = new Workerpool({ + concurrency: 1, + runners: [MyRunner], +}); + +pool + .enqueue({ name: "MyRunner", payload: "Hello World!" }) + .enqueue({ name: "MyRunner", payload: "Hello Again!" }) + .start(); +``` diff --git a/Runner.ts b/Runner.ts new file mode 100644 index 0000000..7e309c5 --- /dev/null +++ b/Runner.ts @@ -0,0 +1,28 @@ +import { Promisable } from "type-fest"; + +/** + * The Runner interface. + */ +export interface Runner< + TPayload = unknown, + TResult = unknown, + TError extends Error = Error +> { + execute: (payload: TPayload) => Promisable; + + onSuccess?: (result: TResult) => Promisable; + + /** + * Called when execute throws an error. + * + * This function may return a boolean to indicate if the task can be retried, + * defaults to true and always retries. + */ + onFailure?: (error: TError) => Promisable; + + /** + * Optional cleanup method to be called when the runner is about to be dropped + * due to concurrency overload, e.g. Workers#terminate() or DataSource#destroy(). + */ + dispose?: () => Promisable; +} diff --git a/RunnerTask.ts b/RunnerTask.ts new file mode 100644 index 0000000..f0c74db --- /dev/null +++ b/RunnerTask.ts @@ -0,0 +1,24 @@ +import { JsonValue } from "type-fest"; + +export interface RunnerTask { + /** + * Optional task id for easier mutex in database. + */ + id?: string; + + /** + * name of the runners executing this task. + */ + name: string; + + /** + * The task payload + */ + payload: TPayload; + + /** + * How many times this task has been executed, including the current run, the + * first run and retries. + */ + executionCount: number; +} diff --git a/Worker.ts b/Worker.ts new file mode 100644 index 0000000..d645e3e --- /dev/null +++ b/Worker.ts @@ -0,0 +1,82 @@ +import { Runner } from "./Runner.ts"; + +export class WorkerExecutionError extends Error { + constructor( + message: string, + readonly name: string, + readonly retryable = false + ) { + super(message); + } +} + +/** + * A wrapper class for task runners. + */ +export class Worker { + #executionnCount = 0; + #successCount = 0; + #failureCount = 0; + + #busy = false; + + constructor( + readonly runner: Runner, + readonly name: string + ) {} + + get busy() { + return this.#busy; + } + + get executionCount() { + return this.#executionnCount; + } + + get successCount() { + return this.#successCount; + } + + get failureCount() { + return this.#failureCount; + } + + async execute(payload: TPayload): Promise { + if (this.#busy) { + throw new Error(`Worker ${this.name} is busy.`); + } + + this.#busy = true; + + try { + const result = await this.runner.execute(payload); + + this.#successCount++; + + await this.runner.onSuccess?.(result); + + return result; + } catch (error) { + if (!(error instanceof Error)) { + throw error; + } + + this.#failureCount++; + + const retryable = await this.runner.onFailure?.(error); + + throw new WorkerExecutionError( + error.message, + error.name, + retryable ?? true + ); + } finally { + this.#executionnCount++; + this.#busy = false; + } + } + + dispose() { + return this.runner.dispose?.(); + } +} diff --git a/Workerpool.test.ts b/Workerpool.test.ts new file mode 100644 index 0000000..1b32e32 --- /dev/null +++ b/Workerpool.test.ts @@ -0,0 +1,117 @@ +import { assertEquals } from "https://deno.land/std@0.155.0/testing/asserts.ts"; +import { describe, it } from "https://deno.land/std@0.155.0/testing/bdd.ts"; +import { + assertSpyCalls, + stub, +} from "https://deno.land/std@0.155.0/testing/mock.ts"; +import { Runner } from "./Runner.ts"; +import { RunnerTask } from "./RunnerTask.ts"; +import { Workerpool } from "./Workerpool.ts"; + +type ArrowFunction = (...args: unknown[]) => unknown; +type MemoryMutexTask = RunnerTask & { active?: boolean }; + +describe("Workerpool", () => { + class runnerA implements Runner { + async execute(cb?: ArrowFunction) { + await new Promise((resolve) => setTimeout(resolve, Math.random() * 200)); + cb?.(); + } + } + + class runnerB extends runnerA {} + + it("should process all tasks", async () => { + const { callback, tasks } = await new Promise((resolve) => { + const tasks: MemoryMutexTask[] = []; + const callback = stub({ callback: () => {} }, "callback"); + const pool = new Workerpool({ + concurrency: 2, + runners: [runnerA], + enqueue: (task: MemoryMutexTask) => { + if (tasks.includes(task)) { + task.active = false; + } else { + tasks.push(task); + } + }, + dequeue: () => { + const task = tasks.find(({ active }) => !active); + if (task) { + task.active = true; + return task; + } + }, + success: (task) => { + const index = tasks.indexOf(task); + if (index > -1) { + tasks.splice(index, 1); + } + + if (tasks.length === 0) { + resolve({ callback, tasks }); + } + }, + }); + + pool + .enqueue({ name: "runnerA", payload: callback }) + .enqueue({ name: "runnerA", payload: callback }) + .enqueue({ name: "runnerA", payload: callback }) + .enqueue({ name: "runnerA", payload: callback }) + .start(); + }); + + assertSpyCalls(callback, 4); + assertEquals(tasks.length, 0); + }); + + it("should swap workers when concurrency is reached", async () => { + const { callback, tasks } = await new Promise((resolve) => { + const tasks: MemoryMutexTask[] = []; + const callback = stub({ callback: () => {} }, "callback"); + const pool = new Workerpool({ + concurrency: 1, + runners: [runnerA, runnerB], + enqueue: (task: MemoryMutexTask) => { + if (tasks.includes(task)) { + task.active = false; + } else { + tasks.push(task); + } + }, + dequeue: () => { + const task = tasks.find(({ active }) => !active); + if (task) { + task.active = true; + return task; + } + }, + success: (task) => { + const index = tasks.indexOf(task); + if (index > -1) { + tasks.splice(index, 1); + } + + if (tasks.length === 0) { + resolve({ callback, tasks }); + } + }, + }); + + pool + .enqueue({ name: "runnerA", payload: callback }) + .enqueue({ name: "runnerB", payload: callback }) + .enqueue({ name: "runnerA", payload: callback }) + .enqueue({ name: "runnerB", payload: callback }) + .start(); + }); + + assertSpyCalls(callback, 4); + assertEquals(tasks.length, 0); + }); + + it("should work with workers", async () => { + // TODO: + }); +}); diff --git a/Workerpool.ts b/Workerpool.ts new file mode 100644 index 0000000..0bc065b --- /dev/null +++ b/Workerpool.ts @@ -0,0 +1,225 @@ +import { Class, JsonValue, Promisable, SetOptional } from "type-fest"; +import { Runner } from "./Runner.ts"; +import { RunnerTask } from "./RunnerTask.ts"; +import { Worker, WorkerExecutionError } from "./Worker.ts"; + +export type WorkerpoolOptions = { + /** + * Classes which implements the Runner interface. + */ + runners?: Class>[]; + + /** + * Size of the worker pool, A.K.A. poolSize. + * + * @default 10 + */ + concurrency?: number; + + /** + * Retries before treating a task as failed. + * + * @default 0, + */ + maximumRetries?: number; + + /** + * If specified, workers will be discarded after this many successful or + * failure tasks. + * + * @default Infinity + */ + maximumTaskPerWorker?: number; + + /** + * Implementation of task enqueuing. + * + * Retries will also call this method with the task object, this function + * should reset the mutex lock if available. + */ + enqueue: (task: RunnerTask) => Promisable; + + /** + * Retrieves the next pending task, this function should acquire mutex lock + * for the task. + */ + dequeue: () => Promisable | undefined>; + + /** + * Called when a dequeued task is successful, use this function to remove + * finished tasks (mutex). + */ + success?: (task: RunnerTask, result: TResult) => Promisable; + + /** + * Called when a failing task has exceeded maximum retries. + */ + failure?: (task: RunnerTask, error: Error) => Promisable; +}; + +export class Workerpool { + #active = false; + #concurrency = 10; + #maximumRetries = 0; + #maximumTaskPerWorker = Infinity; + #runnerFactories = new Map>>(); + #workers = new Set>(); + + constructor(readonly options: WorkerpoolOptions) { + options.runners?.forEach((runner) => { + this.#runnerFactories.set(runner.name, runner); + }); + + if (options.concurrency) { + this.#concurrency = options.concurrency; + } + + if (options.maximumRetries) { + this.#maximumRetries = options.maximumRetries; + } + + if (options.maximumTaskPerWorker) { + this.#maximumTaskPerWorker = options.maximumTaskPerWorker; + } + } + + get workerCount() { + return this.#workers.size; + } + + start() { + if (this.#active) { + return; + } + + this.#active = true; + this.#dequeue(); + + return this; + } + + pause() { + this.#active = false; + + // Trigger runner disposal if the queue is already empty. + this.#dequeue(); + } + + /** + * @deprecated Use `pause` for elegance. + */ + stop() { + return this.pause(); + } + + enqueue({ + executionCount = 0, + ...task + }: SetOptional, "executionCount">) { + const doEnqueue = async () => { + await this.options.enqueue({ executionCount, ...task }); + + // Don't await for task executions here. + this.#dequeue(); + }; + + doEnqueue(); + + return this; + } + + async #dequeue() { + if (!this.#active) { + // Release idle runners + await Promise.all( + [...this.#workers] + .filter((worker) => !worker.busy) + .map((worker) => worker.dispose()) + ); + + return; + } + + const task = await this.options.dequeue(); + // No tasks available yet, wait for the next dequeue. + if (!task) { + return; + } + + const worker = await this.#getWorker(task.name); + // No workers available yet, wait for the next dequeue. + if (!worker) { + await this.options.enqueue(task); + return; + } + + task.executionCount++; + worker + .execute(task.payload) + .then( + (result) => this.options.success?.(task, result), + (error) => { + if ( + error instanceof WorkerExecutionError && + error.retryable && + task.executionCount < this.#maximumRetries + ) { + this.enqueue(task); + } else { + this.options.failure?.(task, error); + } + } + ) + .finally(() => { + if (worker.executionCount >= this.#maximumTaskPerWorker) { + this.#workers.delete(worker); + } + + this.#dequeue(); + }); + + this.#dequeue(); + } + + async #getWorker( + name: string + ): Promise | undefined> { + const idleWorkers = [...this.#workers].filter((worker) => !worker.busy); + const worker = idleWorkers.find( + ({ name: workerName }) => workerName === name + ); + if (worker) { + return worker; + } + + if (this.#workers.size < this.#concurrency) { + const runnerClass = this.#runnerFactories.get(name); + if (!runnerClass) { + throw new Error(`No runner is named ${name}.`); + } + + const workerInstance = new Worker( + new runnerClass(), + runnerClass.name + ); + + this.#workers.add(workerInstance); + + return workerInstance; + } else { + // Discard idle workers of other types, if available. + const idleWorker = idleWorkers.find( + ({ name: workerName }) => workerName !== name + ); + + if (idleWorker) { + this.#workers.delete(idleWorker); + + // Hint: To increase concurrency, to not await in runners. + await idleWorker.dispose(); + + return this.#getWorker(name); + } + } + } +} diff --git a/deno.json b/deno.json new file mode 100644 index 0000000..167ccfb --- /dev/null +++ b/deno.json @@ -0,0 +1,6 @@ +{ + "importMap": "./deno.json", + "imports": { + "type-fest": "https://raw.githubusercontent.com/vicary/type-fest/main/mod.ts" + } +}