From 3fd7f82d025618c81583d293b1644027861071b1 Mon Sep 17 00:00:00 2001 From: Mickael Lecoq Date: Mon, 7 Jul 2025 16:48:09 +0200 Subject: [PATCH 1/3] feat: trigger reactive queries for lists on executeBatch --- cpp/DBHostObject.cpp | 33 ++++++++++++++------------------- cpp/bridge.cpp | 6 ++++++ cpp/libsql/bridge.cpp | 6 ++++++ cpp/types.h | 2 ++ cpp/utils.cpp | 41 +++++++++++++++++++++++++++++++++++++++++ cpp/utils.h | 2 ++ 6 files changed, 71 insertions(+), 19 deletions(-) diff --git a/cpp/DBHostObject.cpp b/cpp/DBHostObject.cpp index 0e7eed44..849b3c10 100644 --- a/cpp/DBHostObject.cpp +++ b/cpp/DBHostObject.cpp @@ -18,14 +18,14 @@ namespace react = facebook::react; #ifdef OP_SQLITE_USE_LIBSQL void DBHostObject::flush_pending_reactive_queries( - const std::shared_ptr &resolve) { + const std::shared_ptr &resolve, std::optional rowsAffected) { invoker->invokeAsync([this, resolve]() { resolve->asObject(rt).asFunction(rt).call(rt, {}); }); } #else void DBHostObject::flush_pending_reactive_queries( - const std::shared_ptr &resolve) { + const std::shared_ptr &resolve, std::optional rowsAffected) { for (const auto &query_ptr : pending_reactive_queries) { auto query = query_ptr.get(); @@ -36,14 +36,13 @@ void DBHostObject::flush_pending_reactive_queries( auto status = opsqlite_execute_prepared_statement(db, query->stmt, &results, metadata); - invoker->invokeAsync( - [this, - results = std::make_shared>(results), - callback = query->callback, metadata, status = std::move(status)] { - auto jsiResult = - create_result(rt, status, results.get(), metadata); - callback->asObject(rt).asFunction(rt).call(rt, jsiResult); - }); + invoker->invokeAsync([this, resolve, rowsAffected]() { + auto res = jsi::Object(rt); + if (rowsAffected.has_value()) { + res.setProperty(rt, "rowsAffected", jsi::Value(rowsAffected.value())); + } + resolve->asObject(rt).asFunction(rt).call(rt, std::move(res)); + }); } pending_reactive_queries.clear(); @@ -530,19 +529,15 @@ void DBHostObject::create_jsi_functions() { auto batchResult = opsqlite_execute_batch(db, &commands); #endif + for (const auto& table : batchResult.modifiedTables) { + on_update(table, "UNKNOWN", -1); + } + if (invalidated) { return; } - invoker->invokeAsync([&rt, - batchResult = std::move(batchResult), - resolve] { - auto res = jsi::Object(rt); - res.setProperty(rt, "rowsAffected", - jsi::Value(batchResult.affectedRows)); - resolve->asObject(rt).asFunction(rt).call( - rt, std::move(res)); - }); + flush_pending_reactive_queries(resolve, batchResult.affectedRows); } catch (std::runtime_error &e) { auto what = e.what(); invoker->invokeAsync([&rt, what, reject] { diff --git a/cpp/bridge.cpp b/cpp/bridge.cpp index cd869ef0..2de8ab17 100644 --- a/cpp/bridge.cpp +++ b/cpp/bridge.cpp @@ -850,12 +850,17 @@ opsqlite_execute_batch(sqlite3 *db, } int affectedRows = 0; + std::unordered_set modifiedTables; opsqlite_execute(db, "BEGIN EXCLUSIVE TRANSACTION", nullptr); for (int i = 0; i < commandCount; i++) { const auto &command = commands->at(i); // We do not provide a datastructure to receive query data because we // don't need/want to handle this results in a batch execution try { + auto maybeTable = extract_modified_table(command.sql); + if (maybeTable.has_value()) { + modifiedTables.insert(*maybeTable); + } auto result = opsqlite_execute(db, command.sql, &command.params); affectedRows += result.affectedRows; } catch (std::exception &exc) { @@ -867,6 +872,7 @@ opsqlite_execute_batch(sqlite3 *db, return BatchResult{ .affectedRows = affectedRows, .commands = static_cast(commandCount), + .modifiedTables = std::move(modifiedTables), }; } diff --git a/cpp/libsql/bridge.cpp b/cpp/libsql/bridge.cpp index 0e5fe655..c17b0245 100644 --- a/cpp/libsql/bridge.cpp +++ b/cpp/libsql/bridge.cpp @@ -716,9 +716,14 @@ opsqlite_libsql_execute_batch(DB const &db, try { int affectedRows = 0; + std::unordered_set modifiedTables; opsqlite_libsql_execute(db, "BEGIN EXCLUSIVE TRANSACTION", nullptr); for (int i = 0; i < commandCount; i++) { auto command = commands->at(i); + auto maybeTable = extract_modified_table(command.sql); + if (maybeTable.has_value()) { + modifiedTables.insert(*maybeTable); + } // We do not provide a datastructure to receive query data because // we don't need/want to handle this results in a batch execution auto result = @@ -729,6 +734,7 @@ opsqlite_libsql_execute_batch(DB const &db, return BatchResult{ .affectedRows = affectedRows, .commands = static_cast(commandCount), + .modifiedTables = std::move(modifiedTables), }; } catch (std::exception &exc) { opsqlite_libsql_execute(db, "ROLLBACK", nullptr); diff --git a/cpp/types.h b/cpp/types.h index d13509ee..54da18b2 100644 --- a/cpp/types.h +++ b/cpp/types.h @@ -4,6 +4,7 @@ #include #include #include +#include struct ArrayBuffer { std::shared_ptr data; @@ -25,6 +26,7 @@ struct BatchResult { std::string message; int affectedRows; int commands; + std::unordered_set modifiedTables; }; struct BatchArguments { diff --git a/cpp/utils.cpp b/cpp/utils.cpp index b1b0ced2..ba2e98db 100644 --- a/cpp/utils.cpp +++ b/cpp/utils.cpp @@ -5,6 +5,7 @@ #endif #include #include +#include namespace opsqlite { @@ -324,4 +325,44 @@ void log_to_console(jsi::Runtime &runtime, const std::string &message) { log.call(runtime, jsi::String::createFromUtf8(runtime, message)); } + +std::optional extract_modified_table(const std::string &sql) { + std::istringstream stream(sql); + std::string token; + std::string table; + + while (stream >> token) { + std::string upper = token; + std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); + + if (upper == "INTO" || upper == "UPDATE" || upper == "FROM") { + // Next token should be the table name + if (stream >> table) { + table.erase(std::remove_if(table.begin(), table.end(), [](char c) { + return c == '`' || c == '"' || c == '\'' || c == '('; + }), table.end()); + return table; + } + } + + // Early exit for UPDATE statements + if (upper == "UPDATE") { + if (stream >> table) { + table.erase(std::remove_if(table.begin(), table.end(), [](char c) { + return c == '`' || c == '"' || c == '\'' || c == '('; + }), table.end()); + return table; + } + } + + // When we reach a semicolon, we can stop processing + if (token.find(';') != std::string::npos) { + break; + } + } + + return std::nullopt; +} + + } // namespace opsqlite diff --git a/cpp/utils.h b/cpp/utils.h index 938216e9..655c5e67 100644 --- a/cpp/utils.h +++ b/cpp/utils.h @@ -48,4 +48,6 @@ bool file_exists(const std::string &path); void log_to_console(jsi::Runtime &rt, const std::string &message); +std::optional extract_modified_table(std::string const& sql); + } // namespace opsqlite From 86e24ff9673a7ca55312b0dd08f70770e7911268 Mon Sep 17 00:00:00 2001 From: Mickael Lecoq Date: Tue, 8 Jul 2025 11:03:24 +0200 Subject: [PATCH 2/3] fix definition --- cpp/DBHostObject.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/DBHostObject.h b/cpp/DBHostObject.h index f0947c23..d1c2fc67 100644 --- a/cpp/DBHostObject.h +++ b/cpp/DBHostObject.h @@ -79,7 +79,7 @@ class JSI_EXPORT DBHostObject : public jsi::HostObject { void auto_register_update_hook(); void create_jsi_functions(); void - flush_pending_reactive_queries(const std::shared_ptr &resolve); + flush_pending_reactive_queries(const std::shared_ptr &resolve, std::optional rowsAffected); std::unordered_map function_map; std::string base_path; From fe35345a5658561ff7e1ae4d6eed19cf9db3a86b Mon Sep 17 00:00:00 2001 From: Mickael Lecoq Date: Tue, 8 Jul 2025 16:29:02 +0200 Subject: [PATCH 3/3] fix call --- cpp/DBHostObject.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/DBHostObject.cpp b/cpp/DBHostObject.cpp index 849b3c10..686b304a 100644 --- a/cpp/DBHostObject.cpp +++ b/cpp/DBHostObject.cpp @@ -790,7 +790,7 @@ void DBHostObject::create_jsi_functions() { auto resolve = std::make_shared(rt, args[0]); auto task = [&rt, this, resolve]() { - flush_pending_reactive_queries(resolve); + flush_pending_reactive_queries(resolve, std::nullopt); }; _thread_pool->queueWork(task);