From fd69c1ca096c130bb0a372b5186c1b3c4760abbb Mon Sep 17 00:00:00 2001 From: Dima Date: Mon, 26 Apr 2021 15:44:11 +0500 Subject: [PATCH 1/2] cp -rv thread_safe_function_counting/ thread_safe_function_counting_wrapped --- .../node-addon-api/addon.cc | 106 ++++++++++++++++++ .../node-addon-api/addon.js | 9 ++ .../node-addon-api/binding.gyp | 21 ++++ .../node-addon-api/package.json | 15 +++ 4 files changed, 151 insertions(+) create mode 100644 thread_safe_function_counting_wrapped/node-addon-api/addon.cc create mode 100644 thread_safe_function_counting_wrapped/node-addon-api/addon.js create mode 100644 thread_safe_function_counting_wrapped/node-addon-api/binding.gyp create mode 100644 thread_safe_function_counting_wrapped/node-addon-api/package.json diff --git a/thread_safe_function_counting_wrapped/node-addon-api/addon.cc b/thread_safe_function_counting_wrapped/node-addon-api/addon.cc new file mode 100644 index 00000000..30c7ad47 --- /dev/null +++ b/thread_safe_function_counting_wrapped/node-addon-api/addon.cc @@ -0,0 +1,106 @@ +#include +#include +#include "napi.h" + +constexpr size_t ARRAY_LENGTH = 10; + +// Data structure representing our thread-safe function context. +struct TsfnContext { + TsfnContext(Napi::Env env) : deferred(Napi::Promise::Deferred::New(env)) { + for (size_t i = 0; i < ARRAY_LENGTH; ++i) ints[i] = i; + }; + + // Native Promise returned to JavaScript + Napi::Promise::Deferred deferred; + + // Native thread + std::thread nativeThread; + + // Some data to pass around + int ints[ARRAY_LENGTH]; + + Napi::ThreadSafeFunction tsfn; +}; + +// The thread entry point. This takes as its arguments the specific +// threadsafe-function context created inside the main thread. +void threadEntry(TsfnContext* context); + +// The thread-safe function finalizer callback. This callback executes +// at destruction of thread-safe function, taking as arguments the finalizer +// data and threadsafe-function context. +void FinalizerCallback(Napi::Env env, void* finalizeData, TsfnContext* context); + +// Exported JavaScript function. Creates the thread-safe function and native +// thread. Promise is resolved in the thread-safe function's finalizer. +Napi::Value CreateTSFN(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + + // Construct context data + auto testData = new TsfnContext(env); + + // Create a new ThreadSafeFunction. + testData->tsfn = Napi::ThreadSafeFunction::New( + env, // Environment + info[0].As(), // JS function from caller + "TSFN", // Resource name + 0, // Max queue size (0 = unlimited). + 1, // Initial thread count + testData, // Context, + FinalizerCallback, // Finalizer + (void*)nullptr // Finalizer data + ); + testData->nativeThread = std::thread(threadEntry, testData); + + // Return the deferred's Promise. This Promise is resolved in the thread-safe + // function's finalizer callback. + return testData->deferred.Promise(); +} + +// The thread entry point. This takes as its arguments the specific +// threadsafe-function context created inside the main thread. +void threadEntry(TsfnContext* context) { + // This callback transforms the native addon data (int *data) to JavaScript + // values. It also receives the treadsafe-function's registered callback, and + // may choose to call it. + auto callback = [](Napi::Env env, Napi::Function jsCallback, int* data) { + jsCallback.Call({Napi::Number::New(env, *data)}); + }; + + for (size_t index = 0; index < ARRAY_LENGTH; ++index) { + // Perform a call into JavaScript. + napi_status status = + context->tsfn.BlockingCall(&context->ints[index], callback); + + if (status != napi_ok) { + Napi::Error::Fatal( + "ThreadEntry", + "Napi::ThreadSafeNapi::Function.BlockingCall() failed"); + } + // Sleep for some time. + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + + // Release the thread-safe function. This decrements the internal thread + // count, and will perform finalization since the count will reach 0. + context->tsfn.Release(); +} + +void FinalizerCallback(Napi::Env env, + void* finalizeData, + TsfnContext* context) { + // Join the thread + context->nativeThread.join(); + + // Resolve the Promise previously returned to JS via the CreateTSFN method. + context->deferred.Resolve(Napi::Boolean::New(env, true)); + delete context; +} + +// Addon entry point +Napi::Object Init(Napi::Env env, Napi::Object exports) { + exports["createTSFN"] = Napi::Function::New(env, CreateTSFN); + return exports; +} + +NODE_API_MODULE(addon, Init) diff --git a/thread_safe_function_counting_wrapped/node-addon-api/addon.js b/thread_safe_function_counting_wrapped/node-addon-api/addon.js new file mode 100644 index 00000000..4ee0d938 --- /dev/null +++ b/thread_safe_function_counting_wrapped/node-addon-api/addon.js @@ -0,0 +1,9 @@ +const { createTSFN } = require('bindings')('addon'); + +const callback = (...args) => { + console.log(new Date, ...args); +}; + +void async function() { + console.log(await createTSFN(callback)); +}(); diff --git a/thread_safe_function_counting_wrapped/node-addon-api/binding.gyp b/thread_safe_function_counting_wrapped/node-addon-api/binding.gyp new file mode 100644 index 00000000..492c7035 --- /dev/null +++ b/thread_safe_function_counting_wrapped/node-addon-api/binding.gyp @@ -0,0 +1,21 @@ +{ + 'targets': [{ + 'target_name': 'addon', + 'defines': ['V8_DEPRECATION_WARNINGS=1'], + 'sources': ['addon.cc'], + 'include_dirs': ["= 10.16.0" + } +} From bec0440cbaaeb6c164dcf09689b0b1b05fbc03d2 Mon Sep 17 00:00:00 2001 From: Dima Date: Mon, 26 Apr 2021 15:45:04 +0500 Subject: [PATCH 2/2] Replaced `addon.{js,cc}` by the new code. --- .../node-addon-api/addon.cc | 210 +++++++++++------- .../node-addon-api/addon.js | 16 +- 2 files changed, 139 insertions(+), 87 deletions(-) diff --git a/thread_safe_function_counting_wrapped/node-addon-api/addon.cc b/thread_safe_function_counting_wrapped/node-addon-api/addon.cc index 30c7ad47..d0674d9b 100644 --- a/thread_safe_function_counting_wrapped/node-addon-api/addon.cc +++ b/thread_safe_function_counting_wrapped/node-addon-api/addon.cc @@ -1,105 +1,157 @@ #include +#include #include + #include "napi.h" -constexpr size_t ARRAY_LENGTH = 10; +template +struct ExtractJSFunctionImpl; + +class NodeJSContext final { + private: + struct Impl final { + Napi::Env env_; + Napi::Promise::Deferred promise_; + std::vector threads_; + + explicit Impl(Napi::Env env) : env_(env), promise_(Napi::Promise::Deferred::New(env)) {} -// Data structure representing our thread-safe function context. -struct TsfnContext { - TsfnContext(Napi::Env env) : deferred(Napi::Promise::Deferred::New(env)) { - for (size_t i = 0; i < ARRAY_LENGTH; ++i) ints[i] = i; + ~Impl() { + for (std::thread& t : threads_) { + t.join(); + } + // NOTE(dkorolev): This promise can be set to something other than `true`. + promise_.Resolve(Napi::Boolean::New(env_, true)); + } }; + std::shared_ptr impl_; + + public: + explicit NodeJSContext(Napi::Env env) : impl_(std::make_shared(env)) {} + explicit NodeJSContext(const Napi::CallbackInfo& info) : NodeJSContext(info.Env()) {} - // Native Promise returned to JavaScript - Napi::Promise::Deferred deferred; + template + typename ExtractJSFunctionImpl::retval_t ExtractJSFunction(T f) { + return ExtractJSFunctionImpl::DoIt(*this, f); + } - // Native thread - std::thread nativeThread; + void RunAsync(std::function f) { impl_->threads_.emplace_back(f); } - // Some data to pass around - int ints[ARRAY_LENGTH]; + Napi::Env GetEnv() const { return impl_->env_; } + Napi::Value GetPromise() const { return impl_->promise_.Promise(); } +}; - Napi::ThreadSafeFunction tsfn; +template +struct ArgsPopulator final { + static void DoIt(Napi::Env env, const TUPLE& input, std::vector& output) { + PopulateArg(env, std::get(input), output[I]); + ArgsPopulator::DoIt(env, input, output); + } + static void PopulateArg(Napi::Env env, int input, napi_value& output) { output = Napi::Number::New(env, input); } + static void PopulateArg(Napi::Env env, const std::string& input, napi_value& output) { output = Napi::String::New(env, input); } + // NOTE(dkorolev): Add more type wrappers or find the right way to do it within Napi. }; -// The thread entry point. This takes as its arguments the specific -// threadsafe-function context created inside the main thread. -void threadEntry(TsfnContext* context); - -// The thread-safe function finalizer callback. This callback executes -// at destruction of thread-safe function, taking as arguments the finalizer -// data and threadsafe-function context. -void FinalizerCallback(Napi::Env env, void* finalizeData, TsfnContext* context); - -// Exported JavaScript function. Creates the thread-safe function and native -// thread. Promise is resolved in the thread-safe function's finalizer. -Napi::Value CreateTSFN(const Napi::CallbackInfo& info) { - Napi::Env env = info.Env(); - - // Construct context data - auto testData = new TsfnContext(env); - - // Create a new ThreadSafeFunction. - testData->tsfn = Napi::ThreadSafeFunction::New( - env, // Environment - info[0].As(), // JS function from caller - "TSFN", // Resource name - 0, // Max queue size (0 = unlimited). - 1, // Initial thread count - testData, // Context, - FinalizerCallback, // Finalizer - (void*)nullptr // Finalizer data - ); - testData->nativeThread = std::thread(threadEntry, testData); - - // Return the deferred's Promise. This Promise is resolved in the thread-safe - // function's finalizer callback. - return testData->deferred.Promise(); -} +template +struct ArgsPopulator final { + static void DoIt(Napi::Env, const TUPLE&, std::vector&) {} +}; -// The thread entry point. This takes as its arguments the specific -// threadsafe-function context created inside the main thread. -void threadEntry(TsfnContext* context) { - // This callback transforms the native addon data (int *data) to JavaScript - // values. It also receives the treadsafe-function's registered callback, and - // may choose to call it. - auto callback = [](Napi::Env env, Napi::Function jsCallback, int* data) { - jsCallback.Call({Napi::Number::New(env, *data)}); +class NodeJSFunction final { + private: + struct Impl final { + // The `NodeJSContext` is captured into `std::shared_ptr`, to ensure proper cleanup order. + NodeJSContext context_; + Napi::ThreadSafeFunction function_; + + Impl(NodeJSContext context, Napi::Function jsf) + : context_(context), + function_(Napi::ThreadSafeFunction::New( + context_.GetEnv(), + jsf, + "dkorolev_cpp_callaback", + 0, // Max queue size (0 = unlimited). + 1, // Initial thread count. + static_cast(nullptr), + [context](Napi::Env, void*, void*) { + // NOTE(dkorolev): The *IMPORTANT* part is to capture `context` by value here. + // If this is not done, the reference counter for the very `NodeJSContext` would drop to zero, + // the functions will get called, but the cleanup would fail, crashing the application. + }, + reinterpret_cast(0))) {} + ~Impl() { + // NOTE(dkorolev): This `.Release()` would eventually call the finalization lambda, which, in its turn, + // would release the captured-by-copy `context`, ensuring the cleanup is happening as it should, + // first the captured functions, then by joining the async threads, and finally by setting the promise. + function_.Release(); + } }; + std::shared_ptr impl_; + + public: + NodeJSFunction(NodeJSContext context, Napi::Function fun) : impl_(std::make_shared(context, fun)) {} + + template + void operator()(ARGS&&... args) const { + auto args_as_tuple_to_copy = std::make_tuple(std::forward(args)...); + if (impl_->function_.BlockingCall( + reinterpret_cast(0), [args_as_tuple_to_copy](Napi::Env env, Napi::Function jsf, int*) { + std::vector params; + using tuple_t = decltype(args_as_tuple_to_copy); + params.resize(std::tuple_size::value); + ArgsPopulator::value>::DoIt(env, args_as_tuple_to_copy, params); + jsf.Call(params); + // TODO(dkorolev): Process the return value as needed. + }) != napi_ok) { + Napi::Error::Fatal("NAPI", "`Napi::ThreadSafeNapi::Function.BlockingCall() != napi_ok`."); + } + } +}; - for (size_t index = 0; index < ARRAY_LENGTH; ++index) { - // Perform a call into JavaScript. - napi_status status = - context->tsfn.BlockingCall(&context->ints[index], callback); +template <> +struct ExtractJSFunctionImpl { + using retval_t = NodeJSFunction; + static NodeJSFunction DoIt(NodeJSContext self, Napi::Function js_function) { + return NodeJSFunction(self, js_function); + } +}; - if (status != napi_ok) { - Napi::Error::Fatal( - "ThreadEntry", - "Napi::ThreadSafeNapi::Function.BlockingCall() failed"); - } - // Sleep for some time. - std::this_thread::sleep_for(std::chrono::milliseconds(200)); +template <> +struct ExtractJSFunctionImpl { + using retval_t = NodeJSFunction; + static NodeJSFunction DoIt(NodeJSContext self, Napi::Value js_function) { + return NodeJSFunction(self, js_function.As()); } +}; - // Release the thread-safe function. This decrements the internal thread - // count, and will perform finalization since the count will reach 0. - context->tsfn.Release(); -} +Napi::Value RunAsyncWork(const Napi::CallbackInfo& cbinfo) { + // Create the context that would manage the lifetime of the extracted JS functions, to `.Release()` them later. + NodeJSContext ctx(cbinfo); + + // Create the captured functions before starting the async thread, as the very `cbinfo` is a const reference. + NodeJSFunction f_even = ctx.ExtractJSFunction(cbinfo[0]); + NodeJSFunction f_odd = ctx.ExtractJSFunction(cbinfo[1]); + + // Run the C++ code asynchronously. + ctx.RunAsync([f_even, f_odd]() { + // NOTE(dkorolev): It is *IMPORTANT* to capture `f_{even,odd}` by value, so that their refcounts are incremented. + struct IntString final { int i; std::string s; }; + for (IntString& value : std::vector({{1, "one"}, {2, "two"}, {3, "three"}, {4, "four"}, {5 ,"five"}})) { + ((value.i % 2 == 0) ? f_even : f_odd)(value.i, value.s); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + }); -void FinalizerCallback(Napi::Env env, - void* finalizeData, - TsfnContext* context) { - // Join the thread - context->nativeThread.join(); + // This `return` would return the promise immediately, as the "business logic" is run in a dedicated thread. + return ctx.GetPromise(); - // Resolve the Promise previously returned to JS via the CreateTSFN method. - context->deferred.Resolve(Napi::Boolean::New(env, true)); - delete context; + // The very `NodeJSContext ctx` would be released after the extracted functions are released, + // and the extracted functions will be released when they have no users left. + // The TL;DR is that as long as they are copied, not captured by reference, everything would work correctly. } -// Addon entry point Napi::Object Init(Napi::Env env, Napi::Object exports) { - exports["createTSFN"] = Napi::Function::New(env, CreateTSFN); + exports["runAsyncWork"] = Napi::Function::New(env, RunAsyncWork); return exports; } diff --git a/thread_safe_function_counting_wrapped/node-addon-api/addon.js b/thread_safe_function_counting_wrapped/node-addon-api/addon.js index 4ee0d938..fb725e81 100644 --- a/thread_safe_function_counting_wrapped/node-addon-api/addon.js +++ b/thread_safe_function_counting_wrapped/node-addon-api/addon.js @@ -1,9 +1,9 @@ -const { createTSFN } = require('bindings')('addon'); +const { runAsyncWork } = require('bindings')('addon'); -const callback = (...args) => { - console.log(new Date, ...args); -}; - -void async function() { - console.log(await createTSFN(callback)); -}(); +console.log('RunAsyncWork(): calling the C++ function.'); +const promise = runAsyncWork( + (i, s) => { console.log(`Callback from C++: even ${s}=${i}.`); }, + (i, s) => { console.log(`Callback from C++: odd ${s}=${i}.`); } +); +console.log('RunAsyncWork(): the promise is returned from C++.'); +promise.then((value) => { console.log(`RunAsyncWork(): the promise resolved, from C++, to ${value}.`); });