Skip to content

Commit

Permalink
Merge pull request #163 from OP-Engineering/oscar/fix-reactive-queries
Browse files Browse the repository at this point in the history
Fix reactive queries by triggering them only on transactions after commit
  • Loading branch information
ospfranco authored Oct 15, 2024
2 parents 038a0e7 + 8afdec7 commit 55b3b4a
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 107 deletions.
169 changes: 94 additions & 75 deletions cpp/DBHostObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,49 @@
#include "macros.h"
#include "utils.h"
#include <iostream>
#include <utility>

namespace opsqlite {

namespace jsi = facebook::jsi;
namespace react = facebook::react;

#ifndef OP_SQLITE_USE_LIBSQL
#ifdef OP_SQLITE_USE_LIBSQL
void DBHostObject::flush_pending_reactive_queries() {
// intentionally left blank
}
#else
void DBHostObject::flush_pending_reactive_queries() {
for (const auto &query_ptr : pending_reactive_queries) {
auto query = query_ptr.get();

std::vector<DumbHostObject> results;
std::shared_ptr<std::vector<SmartHostObject>> metadata =
std::make_shared<std::vector<SmartHostObject>>();

auto status = opsqlite_execute_prepared_statement(db_name, query->stmt,
&results, metadata);

if (status.type == SQLiteError) {
invoker->invokeAsync(
[this, callback = query->callback, status = std::move(status)] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromUtf8(rt, status.message));
callback->asObject(rt).asFunction(rt).call(rt, error);
});
} else {
invoker->invokeAsync(
[this,
results = std::make_shared<std::vector<DumbHostObject>>(results),
callback = query->callback, metadata, status = std::move(status)] {
auto jsiResult = createResult(rt, status, results.get(), metadata);
callback->asObject(rt).asFunction(rt).call(rt, jsiResult);
});
}
}
}

void DBHostObject::auto_register_update_hook() {
if (update_hook_callback == nullptr && reactive_queries.empty() &&
is_update_hook_registered) {
Expand All @@ -31,19 +67,17 @@ void DBHostObject::auto_register_update_hook() {
auto hook = [this](std::string name, std::string table_name,
std::string operation, int rowid) {
if (update_hook_callback != nullptr) {
jsCallInvoker->invokeAsync(
[this,
callback = update_hook_callback, table_name,
operation = std::move(operation), rowid] {
auto res = jsi::Object(rt);
res.setProperty(rt, "table",
jsi::String::createFromUtf8(rt, table_name));
res.setProperty(rt, "operation",
jsi::String::createFromUtf8(rt, operation));
res.setProperty(rt, "rowId", jsi::Value(rowid));

callback->asObject(rt).asFunction(rt).call(rt, res);
});
invoker->invokeAsync([this, callback = update_hook_callback, table_name,
operation = std::move(operation), rowid] {
auto res = jsi::Object(rt);
res.setProperty(rt, "table",
jsi::String::createFromUtf8(rt, table_name));
res.setProperty(rt, "operation",
jsi::String::createFromUtf8(rt, operation));
res.setProperty(rt, "rowId", jsi::Value(rowid));

callback->asObject(rt).asFunction(rt).call(rt, res);
});
}

for (const auto &query_ptr : reactive_queries) {
Expand All @@ -55,54 +89,28 @@ void DBHostObject::auto_register_update_hook() {
bool shouldFire = false;

for (const auto &discriminator : query->discriminators) {
// Tables don't match then skip
if (discriminator.table != table_name) {
continue;
}
// Table has matched

// If no ids are specified, then we should fire
if (discriminator.ids.size() == 0) {
shouldFire = true;
break;
} else { // If ids are specified, then we should check if the rowId
// matches
for (const auto &discrimator_id : discriminator.ids) {
if (rowid == discrimator_id) {
shouldFire = true;
break;
}
}
}
}

if (!shouldFire) {
continue;
// If ids are specified, then we should check if the rowId matches
for (const auto &discrimator_id : discriminator.ids) {
if (rowid == discrimator_id) {
shouldFire = true;
break;
}
}
}

std::vector<DumbHostObject> results;
std::shared_ptr<std::vector<SmartHostObject>> metadata =
std::make_shared<std::vector<SmartHostObject>>();

auto status = opsqlite_execute_prepared_statement(db_name, query->stmt,
&results, metadata);

if (status.type == SQLiteError) {
jsCallInvoker->invokeAsync(
[this, callback = query->callback, status = std::move(status)] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromUtf8(rt, status.message));
callback->asObject(rt).asFunction(rt).call(rt, error);
});
} else {
jsCallInvoker->invokeAsync(
[this,
results = std::make_shared<std::vector<DumbHostObject>>(results),
callback = query->callback, metadata, status = std::move(status)] {
auto jsiResult =
createResult(rt, status, results.get(), metadata);
callback->asObject(rt).asFunction(rt).call(rt, jsiResult);
});
if (shouldFire) {
pending_reactive_queries.insert(query_ptr);
}
}
};
Expand All @@ -115,10 +123,10 @@ void DBHostObject::auto_register_update_hook() {
#ifdef OP_SQLITE_USE_LIBSQL
DBHostObject::DBHostObject(jsi::Runtime &rt, std::string &url,
std::string &auth_token,
std::shared_ptr<react::CallInvoker> js_call_invoker,
std::shared_ptr<react::CallInvoker> invoker,
std::shared_ptr<ThreadPool> thread_pool)
: db_name(url), jsCallInvoker(js_call_invoker), thread_pool(thread_pool),
rt(rt) {
: db_name(url), invoker(std::move(invoker)),
thread_pool(std::move(thread_pool)), rt(rt) {
BridgeResult result = opsqlite_libsql_open_remote(url, auth_token);

if (result.type == SQLiteError) {
Expand All @@ -134,8 +142,8 @@ DBHostObject::DBHostObject(jsi::Runtime &rt,
std::string &db_name, std::string &path,
std::string &url, std::string &auth_token,
int sync_interval)
: db_name(db_name), jsCallInvoker(invoker), thread_pool(thread_pool),
rt(rt) {
: db_name(db_name), invoker(std::move(invoker)),
thread_pool(std::move(thread_pool)), rt(rt) {
BridgeResult result =
opsqlite_libsql_open_sync(db_name, path, url, auth_token, sync_interval);

Expand All @@ -149,14 +157,14 @@ DBHostObject::DBHostObject(jsi::Runtime &rt,
#endif

DBHostObject::DBHostObject(jsi::Runtime &rt, std::string &base_path,
std::shared_ptr<react::CallInvoker> jsCallInvoker,
std::shared_ptr<react::CallInvoker> invoker,
std::shared_ptr<ThreadPool> thread_pool,
std::string &db_name, std::string &path,
std::string &crsqlite_path,
std::string &sqlite_vec_path,
std::string &encryption_key)
: base_path(base_path), jsCallInvoker(jsCallInvoker),
thread_pool(thread_pool), db_name(db_name), rt(rt) {
: base_path(base_path), invoker(std::move(invoker)),
thread_pool(std::move(thread_pool)), db_name(db_name), rt(rt) {

#ifdef OP_SQLITE_USE_SQLCIPHER
BridgeResult result = opsqlite_open(db_name, path, crsqlite_path,
Expand Down Expand Up @@ -268,7 +276,7 @@ void DBHostObject::create_jsi_functions() {
if (!location.empty()) {
if (location == ":memory:") {
path = ":memory:";
} else if (location.rfind("/", 0) == 0) {
} else if (location.rfind('/', 0) == 0) {
path = location;
} else {
path = path + "/" + location;
Expand Down Expand Up @@ -304,7 +312,7 @@ void DBHostObject::create_jsi_functions() {
auto reject = std::make_shared<jsi::Value>(rt, args[1]);

auto task = [&rt, this, query, params = std::move(params), resolve,
reject, invoker = this->jsCallInvoker]() {
reject, invoker = this->invoker]() {
try {
std::vector<std::vector<JSVariant>> results;

Expand Down Expand Up @@ -367,7 +375,7 @@ void DBHostObject::create_jsi_functions() {

auto task = [&rt, this, query = std::move(query),
params = std::move(params), resolve, reject,
invoker = this->jsCallInvoker]() {
invoker = this->invoker]() {
try {

#ifdef OP_SQLITE_USE_LIBSQL
Expand Down Expand Up @@ -429,7 +437,7 @@ void DBHostObject::create_jsi_functions() {
auto reject = std::make_shared<jsi::Value>(rt, args[1]);

auto task = [&rt, this, query, params = std::move(params), resolve,
reject, invoker = this->jsCallInvoker]() {
reject, invoker = this->invoker]() {
try {
std::vector<DumbHostObject> results;
std::shared_ptr<std::vector<SmartHostObject>> metadata =
Expand Down Expand Up @@ -525,8 +533,8 @@ void DBHostObject::create_jsi_functions() {
return;
}

jsCallInvoker->invokeAsync([&rt, batchResult = std::move(batchResult),
resolve, reject] {
invoker->invokeAsync([&rt, batchResult = std::move(batchResult),
resolve, reject] {
if (batchResult.type == SQLiteOk) {
auto res = jsi::Object(rt);
res.setProperty(rt, "rowsAffected",
Expand All @@ -541,7 +549,7 @@ void DBHostObject::create_jsi_functions() {
});
} catch (std::exception &exc) {
auto what = exc.what();
jsCallInvoker->invokeAsync([&rt, what = std::move(what), reject] {
invoker->invokeAsync([&rt, what = std::move(what), reject] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromAscii(rt, what));
Expand Down Expand Up @@ -585,8 +593,8 @@ void DBHostObject::create_jsi_functions() {
try {
const auto result = importSQLFile(db_name, sqlFileName);

jsCallInvoker->invokeAsync([&rt, result = std::move(result), resolve,
reject] {
invoker->invokeAsync([&rt, result = std::move(result), resolve,
reject] {
if (result.type == SQLiteOk) {
auto res = jsi::Object(rt);
res.setProperty(rt, "rowsAffected",
Expand All @@ -602,7 +610,7 @@ void DBHostObject::create_jsi_functions() {
});
} catch (std::exception &exc) {
auto what = exc.what();
jsCallInvoker->invokeAsync([&rt, what = std::move(what), reject] {
invoker->invokeAsync([&rt, what = std::move(what), reject] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromAscii(rt, what));
Expand Down Expand Up @@ -643,7 +651,7 @@ void DBHostObject::create_jsi_functions() {
commit_hook_callback = callback;

auto hook = [&rt, this, callback](std::string dbName) {
jsCallInvoker->invokeAsync(
invoker->invokeAsync(
[&rt, callback] { callback->asObject(rt).asFunction(rt).call(rt); });
};

Expand All @@ -667,7 +675,7 @@ void DBHostObject::create_jsi_functions() {
rollback_hook_callback = callback;

auto hook = [&rt, this, callback](std::string db_name) {
jsCallInvoker->invokeAsync(
invoker->invokeAsync(
[&rt, callback] { callback->asObject(rt).asFunction(rt).call(rt); });
};

Expand Down Expand Up @@ -772,8 +780,8 @@ void DBHostObject::create_jsi_functions() {
sqlite3_stmt *statement = opsqlite_prepare_statement(db_name, query);
#endif
auto preparedStatementHostObject =
std::make_shared<PreparedStatementHostObject>(
db_name, statement, jsCallInvoker, thread_pool);
std::make_shared<PreparedStatementHostObject>(db_name, statement,
invoker, thread_pool);

return jsi::Object::createFromHostObject(rt, preparedStatementHostObject);
});
Expand Down Expand Up @@ -801,6 +809,12 @@ void DBHostObject::create_jsi_functions() {
return jsi::String::createFromUtf8(rt, result);
});

auto flush_pending_reactive_queries_js =
HOSTFN("flushPendingReactiveQueries") {
flush_pending_reactive_queries();
return {};
});

function_map["attach"] = std::move(attach);
function_map["detach"] = std::move(detach);
function_map["close"] = std::move(close);
Expand All @@ -811,6 +825,8 @@ void DBHostObject::create_jsi_functions() {
function_map["executeBatch"] = std::move(execute_batch);
function_map["prepareStatement"] = std::move(prepare_statement);
function_map["getDbPath"] = std::move(get_db_path);
function_map["flushPendingReactiveQueries"] =
std::move(flush_pending_reactive_queries_js);
#ifdef OP_SQLITE_USE_LIBSQL
function_map["sync"] = std::move(sync);
#else
Expand All @@ -833,6 +849,12 @@ jsi::Value DBHostObject::get(jsi::Runtime &rt,
const jsi::PropNameID &propNameID) {

auto name = propNameID.utf8(rt);
if (name == "execute") {
return jsi::Value(rt, function_map["execute"]);
}
if (name == "flushPendingReactiveQueries") {
return jsi::Value(rt, function_map["flushPendingReactiveQueries"]);
}
if (name == "attach") {
return jsi::Value(rt, function_map["attach"]);
}
Expand All @@ -845,9 +867,6 @@ jsi::Value DBHostObject::get(jsi::Runtime &rt,
if (name == "executeRaw") {
return jsi::Value(rt, function_map["executeRaw"]);
}
if (name == "execute") {
return jsi::Value(rt, function_map["execute"]);
}
if (name == "executeWithHostObjects") {
return jsi::Value(rt, function_map["executeWithHostObjects"]);
}
Expand Down
Loading

0 comments on commit 55b3b4a

Please sign in to comment.