Skip to content

Commit

Permalink
Experimental support for new tdjson interface
Browse files Browse the repository at this point in the history
- Added `useNewTdjsonInterface` experimental option to `tdl.configure`.
- The Client is changed to allow managing through another entity that
  can pass receive() results to the client and gets notified when the
  client is closed.
- Added `receiveTimeout` option to `tdl.configure`.
- `receiveTimeout` in the `createClient` options is deprecated (works
  with the old interface only). The option is very rarely used.
- The deprecated pause/resume do not work with the new interface.

The old interface still seems to work fine (modulo the fact that tdl
reuses the main libuv threadpool for clients) and the implementation is
somewhat simpler since the clients are independent.
  • Loading branch information
eilvelia committed Oct 4, 2023
1 parent 87fa627 commit bfde0e6
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 124 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

<!-- Hi! -->

## tdl (unreleased)
## tdl@7.4.0 (unreleased)

- Added `tdl.setLogMessageCallback` that allows to pass a callback to the
`td_set_log_message_callback` TDLib function using Node-API's thread-safe
functions. (TDLib v1.8.0+ only)
- `tdl.configure`: Added an experimental option `useNewTdjsonInterface` that
enables the use of `td_create_client_id`/`td_send`/`td_receive`/`td_execute`
interface with a client manager and global receive loop, though the old
interface still works well.
This does not use the libuv threadpool and does not have a limitation of max
`UV_THREADPOOL_SIZE` clients.
(TDLib v1.7.0+ only)
- `tdl.configure`: Added a `receiveTimeout` advanced option.
- `receiveTimeout` in the client options is deprecated.
- Deprecated the `useMutableRename` advanced option.

## [email protected] (2023-09-26)
Expand Down
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const tdl = require('tdl')

// If libtdjson is not present in the system search paths, the path to the
// libtdjson shared library can be set manually, e.g.:
// tdl.configure({ tdjson: '/usr/local/lib/libtdjson.dylib'})
// tdl.configure({ tdjson: '/usr/local/lib/libtdjson.dylib' })
// The library prefix can be set separate from the library name,
// example to search for libtdjson in the directory of the current script:
// tdl.configure({ libdir: __dirname })
Expand Down Expand Up @@ -145,9 +145,11 @@ tdl.configure({
// 'libtdjson.dylib' on macOS, or 'libtdjson.so' otherwise.
tdjson: 'libtdjson.so',
// Path to the library directory. By default, it is empty string.
libdir: '...',
libdir: '/usr/local/lib',
// Verbosity level of TDLib. By default, it is 2.
verbosityLevel: 3
verbosityLevel: 3,
// Experimental option. Defaults to false.
useNewTdjsonInterface: false
})
```

Expand All @@ -158,8 +160,8 @@ Some examples:
- `tdl.configure({ tdjson: require('prebuilt-tdlib').getTdjson() })`

The path concatenation of `libdir` + `tdjson` is directly passed to
[`dlopen`][dlopen] (Unix) or [`LoadLibrary`][LoadLibraryW] (Windows). Check your OS documentation
to find out where the shared library will be searched for.
[`dlopen`][dlopen] (Unix) or [`LoadLibrary`][LoadLibraryW] (Windows). Check your
OS documentation to find out where the shared library will be searched for.

#### `tdl.createClient(options: ClientOptions) => Client`

Expand All @@ -185,9 +187,8 @@ type ClientOptions = {
useTestDc: boolean, // Use test telegram server (defaults to false)
tdlibParameters: Object, // Raw TDLib parameters
// Advanced options:
skipOldUpdates: boolean,
bare: boolean,
receiveTimeout: number
skipOldUpdates: boolean
}
```
Expand Down Expand Up @@ -421,6 +422,9 @@ globally or per-project as a dev dependency.
The current limitation is that the number of created clients should not exceed
[UV_THREADPOOL_SIZE][] (as for now, the default is 4, max is 1024).

When `useNewTdjsonInterface` (experimental option) is set to true in
`tdl.configure`, this limitation does not apply.

[UV_THREADPOOL_SIZE]: http://docs.libuv.org/en/v1.x/threadpool.html

<a name="possible-errors"></a>
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"lint": "eslint . --max-warnings 0",
"jest-tests": "jest --testPathIgnorePatterns tests/integration",
"test": "npm run flow:check && npm run ts:check && npm run lint && npm run jest-tests",
"integration-tests": "jest tests/integration && cross-env TEST_TDL_TDLIB_ADDON=1 jest tests/integration",
"integration-tests": "jest tests/integration && cross-env TEST_TDL_TDLIB_ADDON=1 jest tests/integration && cross-env TEST_NEW_TDJSON=1 jest tests/integration",
"test:all": "npm run test && npm run integration-tests",
"coverage": "jest --coverage",
"prepare": "npm run clean && npm run build",
Expand Down
200 changes: 170 additions & 30 deletions packages/tdl/addon/td.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@ typedef void (*td_set_log_fatal_error_callback_t)(td_log_fatal_error_callback_pt
typedef void (*td_log_message_callback_ptr)(int verbosity_level, const char *message);
typedef void (*td_set_log_message_callback_t)(int max_verbosity_level, td_log_message_callback_ptr callback);

// New tdjson interface
typedef int (*td_create_client_id_t)();
typedef void (*td_send_t)(int client_id, const char *request);
typedef const char * (*td_receive_t)(double timeout);
typedef const char * (*td_execute_t)(const char *request);

td_json_client_create_t td_json_client_create;
td_json_client_send_t td_json_client_send;
td_json_client_receive_t td_json_client_receive;
td_json_client_execute_t td_json_client_execute;
td_json_client_destroy_t td_json_client_destroy;
td_set_log_fatal_error_callback_t td_set_log_fatal_error_callback;
td_set_log_message_callback_t td_set_log_message_callback;
td_create_client_id_t td_create_client_id;
td_send_t td_send;
td_receive_t td_receive;
td_execute_t td_execute;

#define FAIL(MSG) NAPI_THROW(Napi::Error::New(env, MSG));
#define TYPEFAIL(MSG) NAPI_THROW(Napi::TypeError::New(env, MSG));
Expand All @@ -50,13 +60,8 @@ Napi::External<void> TdClientCreate(const Napi::CallbackInfo& info) {
}

void TdClientSend(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
void *client = info[0].As<Napi::External<void>>().Data();
auto request_obj = info[1].As<Napi::Object>();
auto json = env.Global().Get("JSON").As<Napi::Object>();
auto stringify = json.Get("stringify").As<Napi::Function>();
std::string request =
stringify.Call(json, { request_obj }).As<Napi::String>().Utf8Value();
std::string request = info[1].As<Napi::String>().Utf8Value();
td_json_client_send(client, request.c_str());
}

Expand All @@ -82,14 +87,8 @@ class ReceiverAsyncWorker : public Napi::AsyncWorker

void OnOK() override {
Napi::Env env = Env();
if (response.empty()) {
deferred.Resolve(env.Null());
} else {
auto json = env.Global().Get("JSON").As<Napi::Object>();
auto parse = json.Get("parse").As<Napi::Function>();
Napi::Value obj = parse.Call(json, { Napi::String::New(env, response) });
deferred.Resolve(obj);
}
auto val = response.empty() ? env.Null() : Napi::String::New(env, response);
deferred.Resolve(val);
}

void OnError(const Napi::Error& err) override {
Expand Down Expand Up @@ -117,17 +116,12 @@ Napi::Value TdClientExecute(const Napi::CallbackInfo& info) {
void *client = info[0].IsNull() || info[0].IsUndefined()
? nullptr
: info[0].As<Napi::External<void>>().Data();
auto json = env.Global().Get("JSON").As<Napi::Object>();
auto stringify = json.Get("stringify").As<Napi::Function>();
if (!info[1].IsObject())
TYPEFAIL_VALUE("Expected second argument to be an object");
Napi::Object request_obj = info[1].As<Napi::Object>();
std::string request =
stringify.Call(json, { request_obj }).As<Napi::String>().Utf8Value();
if (!info[1].IsString())
TYPEFAIL_VALUE("Expected second argument to be a string");
std::string request = info[1].As<Napi::String>().Utf8Value();
const char *response = td_json_client_execute(client, request.c_str());
if (response == nullptr) return env.Null();
auto parse = json.Get("parse").As<Napi::Function>();
return parse.Call(json, { Napi::String::New(env, response) });
return Napi::String::New(env, response);
}

void TdClientDestroy(const Napi::CallbackInfo& info) {
Expand Down Expand Up @@ -177,11 +171,11 @@ void TdSetLogMessageCallback(const Napi::CallbackInfo& info) {
using namespace MessageCallback;
Napi::Env env = info.Env();
if (td_set_log_message_callback == nullptr)
FAIL("td_set_log_message_callback is not available")
FAIL("td_set_log_message_callback is not available");
if (info.Length() < 2)
TYPEFAIL("Expected two arguments")
TYPEFAIL("Expected two arguments");
if (!info[0].IsNumber())
TYPEFAIL("Expected first argument to be a number")
TYPEFAIL("Expected first argument to be a number");
int max_verbosity_level = info[0].As<Napi::Number>().Int32Value();
if (info[1].IsNull() || info[1].IsUndefined()) {
td_set_log_message_callback(max_verbosity_level, nullptr);
Expand All @@ -193,11 +187,11 @@ void TdSetLogMessageCallback(const Napi::CallbackInfo& info) {
return;
}
if (!info[1].IsFunction())
TYPEFAIL("Expected second argument to be one of: a function, null, undefined")
TYPEFAIL("Expected second argument to be one of: a function, null, undefined");
std::lock_guard<std::mutex> lock(tsfn_mutex);
if (tsfn != nullptr)
tsfn.Release();
tsfn = Tsfn::New(env, info[1].As<Napi::Function>(), "Callback", 0, 1);
tsfn = Tsfn::New(env, info[1].As<Napi::Function>(), "CallbackTSFN", 0, 1);
tsfn.Unref(env);
td_set_log_message_callback(max_verbosity_level, &c_message_callback);
}
Expand Down Expand Up @@ -231,15 +225,151 @@ void TdSetLogFatalErrorCallback(const Napi::CallbackInfo& info) {
return;
}
if (!info[0].IsFunction())
TYPEFAIL("Expected first argument to be one of: a function, null, undefined")
TYPEFAIL("Expected first argument to be one of: a function, null, undefined");
if (fatal_callback_tsfn != nullptr)
fatal_callback_tsfn.Release();
fatal_callback_tsfn = Napi::ThreadSafeFunction::New(
env, info[0].As<Napi::Function>(), "Callback", 0, 1);
env, info[0].As<Napi::Function>(), "FatalCallbackTSFN", 0, 1);
fatal_callback_tsfn.Unref(env);
td_set_log_fatal_error_callback(&c_fatal_callback);
}

// New tdjson interface, experimental
namespace Tdn {
class ReceiveWorker {
public:
ReceiveWorker(const Napi::Env& env, double timeout)
: timeout(timeout),
tsfn(Tsfn::New(env, "ReceiveTSFN", 0, 1, this)),
thread(&ReceiveWorker::loop, this)
{ thread.detach(); tsfn.Unref(env); }
~ReceiveWorker() {
{
std::lock_guard lock(mutex);
stop = true;
tsfn.Release();
}
cv.notify_all();
if (thread.joinable())
thread.join();
}

Napi::Promise NewTask(const Napi::Env& env) {
// A task can be added only after the previous task is finished.
auto new_deferred = std::make_unique<Napi::Promise::Deferred>(env);
auto promise = new_deferred->Promise();
if (working) {
auto error = Napi::Error::New(env, "receive is not finished yet");
new_deferred->Reject(error.Value());
return promise;
}
{
std::lock_guard lock(mutex);
tsfn.Ref(env);
deferred = std::move(new_deferred);
}
cv.notify_all();
return promise;
}
private:
using TsfnCtx = ReceiveWorker;
struct TsfnData {
std::unique_ptr<Napi::Promise::Deferred> deferred;
const char *response; // can be nullptr
};
static void CallJs(Napi::Env env, Napi::Function cb, TsfnCtx *ctx, TsfnData *data) {
if (data == nullptr) return;
if (env != nullptr) {
auto val = data->response == nullptr
? env.Null()
: Napi::String::New(env, data->response);
data->deferred->Resolve(val);
if (ctx != nullptr) ctx->tsfn.Unref(env);
}
delete data;
}
using Tsfn = Napi::TypedThreadSafeFunction<TsfnCtx, TsfnData, CallJs>;

void loop() {
while (true) {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this] { return deferred != nullptr || stop; });
if (stop) return;
working = true;
const char *response = td_receive(timeout);
// TDLib stores the response in thread-local storage that is deallocated
// on execute() and receive(). Since we never call execute() in this
// thread, it should be safe not to copy the response here.
tsfn.NonBlockingCall(new TsfnData { std::move(deferred), response });
deferred = nullptr;
working = false;
}
}

double timeout;
Tsfn tsfn;
std::atomic_bool working {false};
std::unique_ptr<Napi::Promise::Deferred> deferred;
bool stop {false};
std::mutex mutex;
std::condition_variable cv;
std::thread thread;
};

ReceiveWorker *worker = nullptr;

// Set the receive timeout explicitly.
void Init(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
if (worker != nullptr)
FAIL("The worker is already initialized");
if (!info[0].IsNumber())
TYPEFAIL("Expected first argument to be a number");
double timeout = info[0].As<Napi::Number>().DoubleValue();
worker = new ReceiveWorker(env, timeout);
}

Napi::Value CreateClientId(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
if (td_create_client_id == nullptr)
FAIL_VALUE("td_create_client_id is not available");
if (td_send == nullptr)
FAIL_VALUE("td_send is not available");
if (td_receive == nullptr)
FAIL_VALUE("td_receive is not available");
int client_id = td_create_client_id();
if (worker == nullptr)
worker = new ReceiveWorker(env, 10.0);
return Napi::Number::New(env, client_id);
}

void Send(const Napi::CallbackInfo& info) {
int client_id = info[0].As<Napi::Number>().Int32Value();
std::string request = info[1].As<Napi::String>().Utf8Value();
td_send(client_id, request.c_str());
}

// Should not be called again until promise is resolved/rejected.
Napi::Value Receive(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
return worker->NewTask(env);
}

Napi::Value Execute(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
if (td_execute == nullptr)
FAIL_VALUE("td_execute is not available");
if (!info[0].IsString())
TYPEFAIL_VALUE("Expected first argument to be a string");
std::string request = info[0].As<Napi::String>().Utf8Value();
const char *response = td_execute(request.c_str());
if (response == nullptr) return env.Null();
return Napi::String::New(env, response);
}

// void Stop(const Napi::CallbackInfo& info) { delete worker; }
}

#define FINDFUNC(F) \
F = (F##_t) dlsym(handle, #F); \
if ((dlsym_err_cstr = dlerror()) != nullptr) { \
Expand Down Expand Up @@ -272,6 +402,10 @@ void LoadTdjson(const Napi::CallbackInfo& info) {
FINDFUNC(td_json_client_destroy);
FINDFUNC(td_set_log_fatal_error_callback);
FINDFUNC_OPT(td_set_log_message_callback);
FINDFUNC_OPT(td_create_client_id);
FINDFUNC_OPT(td_send);
FINDFUNC_OPT(td_receive);
FINDFUNC_OPT(td_execute);
}

Napi::Object Init(Napi::Env env, Napi::Object exports) {
Expand All @@ -284,6 +418,12 @@ Napi::Object Init(Napi::Env env, Napi::Object exports) {
Napi::Function::New(env, TdSetLogFatalErrorCallback, "setLogFatalErrorCallback");
exports["setLogMessageCallback"] =
Napi::Function::New(env, TdSetLogMessageCallback, "setLogMessageCallback");
exports["tdnInit"] = Napi::Function::New(env, Tdn::Init, "init");
exports["tdnCreateClientId"] =
Napi::Function::New(env, Tdn::CreateClientId, "createClientId");
exports["tdnSend"] = Napi::Function::New(env, Tdn::Send, "send");
exports["tdnReceive"] = Napi::Function::New(env, Tdn::Receive, "receive");
exports["tdnExecute"] = Napi::Function::New(env, Tdn::Execute, "execute");
exports["loadTdjson"] = Napi::Function::New(env, LoadTdjson, "loadTdjson");
return exports;
}
Expand Down
13 changes: 8 additions & 5 deletions packages/tdl/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
'include_dirs': [
"<!@(node -p \"require('node-addon-api').include\")"
],
# 'libraries': [],
'dependencies': [
"<!(node -p \"require('node-addon-api').gyp\")"
],
Expand All @@ -22,10 +21,14 @@
'sources': [
'addon/win32-dlfcn.cpp'
]
}],
['OS=="mac"', {
'cflags+': ['-fvisibility=hidden'],
'xcode_settings': {
'GCC_SYMBOLS_PRIVATE_EXTERN': 'YES', # -fvisibility=hidden
'MACOSX_DEPLOYMENT_TARGET': '10.14'
}
}]
],
'xcode_settings': {
'MACOSX_DEPLOYMENT_TARGET': '10.14'
}
]
}]
}
Loading

0 comments on commit bfde0e6

Please sign in to comment.