From 71f279b77d94da5a578c14b326212acd5c6aa0b1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 10:30:30 +0200 Subject: [PATCH 1/6] Refactor update streams on web --- .../lib/src/utils/shared_utils.dart | 69 +++++++ .../sqlite_async/lib/src/web/database.dart | 8 +- .../sqlite_async/lib/src/web/protocol.dart | 2 + .../lib/src/web/update_notifications.dart | 57 ++++++ .../lib/src/web/web_sqlite_open_factory.dart | 14 +- .../web/worker/throttled_common_database.dart | 191 ------------------ .../lib/src/web/worker/worker_utils.dart | 48 ++++- packages/sqlite_async/lib/web.dart | 25 ++- 8 files changed, 205 insertions(+), 209 deletions(-) create mode 100644 packages/sqlite_async/lib/src/web/update_notifications.dart delete mode 100644 packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index 9faf928..ff291f4 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -1,6 +1,8 @@ import 'dart:async'; import 'dart:convert'; +import 'package:sqlite3/common.dart'; + import '../sqlite_connection.dart'; Future internalReadTransaction(SqliteReadContext ctx, @@ -75,3 +77,70 @@ Object? mapParameter(Object? parameter) { List mapParameters(List parameters) { return [for (var p in parameters) mapParameter(p)]; } + +extension ThrottledUpdates on CommonDatabase { + /// Wraps [updatesSync] to: + /// + /// - Not fire in transactions. + /// - Fire asynchronously. + /// - Only report table names, which are buffered to avoid duplicates. + Stream> get throttledUpdatedTables { + StreamController>? controller; + var pendingUpdates = {}; + var paused = false; + + Timer? updateDebouncer; + + void maybeFireUpdates() { + updateDebouncer?.cancel(); + updateDebouncer = null; + + if (paused) { + // Continue collecting updates, but don't fire any + return; + } + + if (!autocommit) { + // Inside a transaction - do not fire updates + return; + } + + if (pendingUpdates.isNotEmpty) { + controller!.add(pendingUpdates); + pendingUpdates = {}; + } + } + + void collectUpdate(SqliteUpdate event) { + pendingUpdates.add(event.tableName); + + updateDebouncer ??= + Timer(const Duration(milliseconds: 1), maybeFireUpdates); + } + + StreamSubscription? txSubscription; + StreamSubscription? sourceSubscription; + + controller = StreamController(onListen: () { + txSubscription = commits.listen((_) { + maybeFireUpdates(); + }, onError: (error) { + controller?.addError(error); + }); + + sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) { + controller?.addError(error); + }); + }, onPause: () { + paused = true; + }, onResume: () { + paused = false; + maybeFireUpdates(); + }, onCancel: () { + txSubscription?.cancel(); + sourceSubscription?.cancel(); + }); + + return controller.stream; + } +} diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index 3e0797b..cfaf987 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -21,6 +21,9 @@ class WebDatabase final Mutex? _mutex; final bool profileQueries; + @override + final Stream updates; + /// For persistent databases that aren't backed by a shared worker, we use /// web broadcast channels to forward local update events to other tabs. final BroadcastUpdates? broadcastUpdates; @@ -32,6 +35,7 @@ class WebDatabase this._database, this._mutex, { required this.profileQueries, + required this.updates, this.broadcastUpdates, }); @@ -113,10 +117,6 @@ class WebDatabase } } - @override - Stream get updates => - _database.updates.map((event) => UpdateNotification({event.tableName})); - @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, diff --git a/packages/sqlite_async/lib/src/web/protocol.dart b/packages/sqlite_async/lib/src/web/protocol.dart index cb3a5fd..d17c06b 100644 --- a/packages/sqlite_async/lib/src/web/protocol.dart +++ b/packages/sqlite_async/lib/src/web/protocol.dart @@ -13,6 +13,8 @@ enum CustomDatabaseMessageKind { getAutoCommit, executeInTransaction, executeBatchInTransaction, + updateSubscriptionManagement, + notifyUpdates, } extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject { diff --git a/packages/sqlite_async/lib/src/web/update_notifications.dart b/packages/sqlite_async/lib/src/web/update_notifications.dart new file mode 100644 index 0000000..ecea60d --- /dev/null +++ b/packages/sqlite_async/lib/src/web/update_notifications.dart @@ -0,0 +1,57 @@ +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:sqlite3_web/sqlite3_web.dart'; + +import '../update_notification.dart'; +import 'protocol.dart'; + +/// Utility to request a stream of update notifications from the worker. +/// +/// Because we want to debounce update notifications on the worker, we're using +/// custom requests instead of the default [Database.updates] stream. +/// +/// Clients send a message to the worker to subscribe or unsubscribe, providing +/// an id for the subscription. The worker distributes update notifications with +/// custom requests to the client, which [handleRequest] distributes to the +/// original streams. +final class UpdateNotificationStreams { + var _idCounter = 0; + final Map> _updates = {}; + + Future handleRequest(JSAny? request) async { + final customRequest = request as CustomDatabaseMessage; + if (customRequest.kind == CustomDatabaseMessageKind.notifyUpdates) { + final notification = UpdateNotification(customRequest.rawParameters.toDart + .map((e) => (e as JSString).toDart) + .toSet()); + + _updates[customRequest.rawSql.toDart]?.add(notification); + } + + return null; + } + + Stream updatesFor(Database database) { + final id = (_idCounter++).toString(); + final controller = _updates[id] = StreamController(); + + controller + ..onListen = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [true], + )); + } + ..onCancel = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [false], + )); + }; + + return controller.stream; + } +} diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index a724329..205734f 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -69,15 +69,19 @@ class DefaultSqliteOpenFactory ? null : MutexImpl(identifier: path); // Use the DB path as a mutex identifier - BroadcastUpdates? updates; + BroadcastUpdates? broadcastUpdates; if (connection.access != AccessMode.throughSharedWorker && connection.storage != StorageMode.inMemory) { - updates = BroadcastUpdates(path); + broadcastUpdates = BroadcastUpdates(path); } - return WebDatabase(connection.database, options.mutex ?? mutex, - broadcastUpdates: updates, - profileQueries: sqliteOptions.profileQueries); + return WebDatabase( + connection.database, + options.mutex ?? mutex, + broadcastUpdates: broadcastUpdates, + profileQueries: sqliteOptions.profileQueries, + updates: updatesFor(connection.database), + ); } @override diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart deleted file mode 100644 index 8a0e901..0000000 --- a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart +++ /dev/null @@ -1,191 +0,0 @@ -import 'dart:async'; - -import 'package:sqlite_async/sqlite3_wasm.dart'; - -/// Wrap a CommonDatabase to throttle its updates stream. -/// This is so that we can throttle the updates _within_ -/// the worker process, avoiding mass notifications over -/// the MessagePort. -class ThrottledCommonDatabase extends CommonDatabase { - final CommonDatabase _db; - final StreamController _transactionController = - StreamController.broadcast(); - - ThrottledCommonDatabase(this._db); - - @override - int get userVersion => _db.userVersion; - - @override - set userVersion(int userVersion) { - _db.userVersion = userVersion; - } - - @override - bool get autocommit => _db.autocommit; - - @override - DatabaseConfig get config => _db.config; - - @override - void createAggregateFunction({ - required String functionName, - required AggregateFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true, - bool subtype = false, - }) { - _db.createAggregateFunction(functionName: functionName, function: function); - } - - @override - void createCollation( - {required String name, required CollatingFunction function}) { - _db.createCollation(name: name, function: function); - } - - @override - void createFunction({ - required String functionName, - required ScalarFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true, - bool subtype = false, - }) { - _db.createFunction(functionName: functionName, function: function); - } - - @override - void dispose() { - _db.dispose(); - } - - @override - void execute(String sql, [List parameters = const []]) { - _db.execute(sql, parameters); - } - - @override - int getUpdatedRows() { - // ignore: deprecated_member_use - return _db.getUpdatedRows(); - } - - @override - int get lastInsertRowId => _db.lastInsertRowId; - - @override - CommonPreparedStatement prepare(String sql, - {bool persistent = false, bool vtab = true, bool checkNoTail = false}) { - return _db.prepare(sql, - persistent: persistent, vtab: vtab, checkNoTail: checkNoTail); - } - - @override - List prepareMultiple(String sql, - {bool persistent = false, bool vtab = true}) { - return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab); - } - - @override - ResultSet select(String sql, [List parameters = const []]) { - bool preAutocommit = _db.autocommit; - final result = _db.select(sql, parameters); - bool postAutocommit = _db.autocommit; - if (!preAutocommit && postAutocommit) { - _transactionController.add(true); - } - return result; - } - - @override - int get updatedRows => _db.updatedRows; - - @override - Stream get updates { - return throttledUpdates(_db, _transactionController.stream); - } - - @override - VoidPredicate? get commitFilter => _db.commitFilter; - - @override - set commitFilter(VoidPredicate? filter) => _db.commitFilter = filter; - - @override - Stream get commits => _db.commits; - - @override - Stream get rollbacks => _db.rollbacks; -} - -/// This throttles the database update stream to: -/// 1. Trigger max once every 1ms. -/// 2. Only trigger _after_ transactions. -Stream throttledUpdates( - CommonDatabase source, Stream transactionStream) { - StreamController? controller; - Set pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; - } - - if (!source.autocommit) { - // Inside a transaction - do not fire updates - return; - } - - if (pendingUpdates.isNotEmpty) { - for (var update in pendingUpdates) { - controller!.add(update); - } - - pendingUpdates.clear(); - } - } - - void collectUpdate(SqliteUpdate event) { - // We merge updates with the same kind and tableName. - // rowId is never used in sqlite_async. - pendingUpdates.add(SqliteUpdate(event.kind, event.tableName, 0)); - - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - } - - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = transactionStream.listen((event) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = source.updates.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; -} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 3ecb257..7603306 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; @@ -6,8 +7,7 @@ import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; - -import 'throttled_common_database.dart'; +import 'package:sqlite_async/src/utils/database_utils.dart'; import '../protocol.dart'; @@ -22,13 +22,9 @@ base class AsyncSqliteController extends DatabaseController { // Register any custom functions here if needed - final throttled = ThrottledCommonDatabase(db); - - return AsyncSqliteDatabase(database: throttled); + return AsyncSqliteDatabase(database: db); } - /// Opens a database with the `sqlite3` package that will be wrapped in a - /// [ThrottledCommonDatabase] for [openDatabase]. @visibleForOverriding CommonDatabase openUnderlying( WasmSqlite3 sqlite3, @@ -51,6 +47,7 @@ base class AsyncSqliteController extends DatabaseController { class AsyncSqliteDatabase extends WorkerDatabase { @override final CommonDatabase database; + final Stream> _updates; // This mutex is only used for lock requests from clients. Clients only send // these requests for shared workers, so we can assume each database is only @@ -58,7 +55,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { final mutex = ReadWriteMutex(); final Map _state = {}; - AsyncSqliteDatabase({required this.database}); + AsyncSqliteDatabase({required this.database}) + : _updates = database.throttledUpdatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); @@ -67,9 +65,15 @@ class AsyncSqliteDatabase extends WorkerDatabase { void _markHoldsMutex(ClientConnection connection) { final state = _findState(connection); state.holdsMutex = true; + _registerCloseListener(state, connection); + } + + void _registerCloseListener( + _ConnectionState state, ClientConnection connection) { if (!state.hasOnCloseListener) { state.hasOnCloseListener = true; connection.closed.then((_) { + state.unsubscribeUpdates(); if (state.holdsMutex) { mutex.release(); } @@ -93,6 +97,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { _findState(connection).holdsMutex = false; mutex.release(); case CustomDatabaseMessageKind.lockObtained: + case CustomDatabaseMessageKind.notifyUpdates: throw UnsupportedError('This is a response, not a request'); case CustomDatabaseMessageKind.getAutoCommit: return database.autocommit.toJS; @@ -129,6 +134,25 @@ class AsyncSqliteDatabase extends WorkerDatabase { "Transaction rolled back by earlier statement. Cannot execute: $sql"); } database.execute(sql, parameters); + case CustomDatabaseMessageKind.updateSubscriptionManagement: + final shouldSubscribe = (message.rawParameters[0] as JSBoolean).toDart; + final id = message.rawSql.toDart; + final state = _findState(connection); + + if (shouldSubscribe) { + state.unsubscribeUpdates(); + _registerCloseListener(state, connection); + + state.updatesNotification = _updates.listen((tables) { + connection.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.notifyUpdates, + id, + tables.toList(), + )); + }); + } else { + state.unsubscribeUpdates(); + } } return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained); @@ -148,4 +172,12 @@ class AsyncSqliteDatabase extends WorkerDatabase { final class _ConnectionState { bool hasOnCloseListener = false; bool holdsMutex = false; + StreamSubscription>? updatesNotification; + + void unsubscribeUpdates() { + if (updatesNotification case final active?) { + updatesNotification = null; + active.cancel(); + } + } } diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index 3a65115..f151b81 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -4,12 +4,15 @@ /// workers. library; +import 'dart:js_interop'; + import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:web/web.dart'; import 'sqlite3_common.dart'; import 'sqlite_async.dart'; import 'src/web/database.dart'; +import 'src/web/update_notifications.dart'; /// An endpoint that can be used, by any running JavaScript context in the same /// website, to connect to an existing [WebSqliteConnection]. @@ -33,6 +36,13 @@ typedef WebDatabaseEndpoint = ({ /// compiling for the web. abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { + final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); + + /// Handles a custom request sent from the worker to the client. + Future handleCustomRequest(JSAny? request) { + return _updateStreams.handleRequest(request); + } + /// Opens a [WebSqlite] instance for the given [options]. /// /// This method can be overriden in scenarios where the way [WebSqlite] is @@ -43,6 +53,7 @@ abstract mixin class WebSqliteOpenFactory return WebSqlite.open( worker: Uri.parse(options.workerUri), wasmModule: Uri.parse(options.wasmUri), + handleCustomRequest: handleCustomRequest, ); } @@ -54,6 +65,14 @@ abstract mixin class WebSqliteOpenFactory WebSqlite sqlite, String name) { return sqlite.connectToRecommended(name); } + + /// Obtains a stream of [UpdateNotification]s from a [database]. + /// + /// The default implementation uses custom requests to allow workers to + /// debounce the stream on their side to avoid messages where possible. + Stream updatesFor(Database database) { + return _updateStreams.updatesFor(database); + } } /// A [SqliteConnection] interface implemented by opened connections when @@ -85,8 +104,11 @@ abstract class WebSqliteConnection implements SqliteConnection { /// contexts to exchange opened database connections. static Future connectToEndpoint( WebDatabaseEndpoint endpoint) async { + final updates = UpdateNotificationStreams(); final rawSqlite = await WebSqlite.connectToPort( - (endpoint.connectPort, endpoint.connectName)); + (endpoint.connectPort, endpoint.connectName), + handleCustomRequest: updates.handleRequest, + ); final database = WebDatabase( rawSqlite, @@ -95,6 +117,7 @@ abstract class WebSqliteConnection implements SqliteConnection { null => null, }, profileQueries: false, + updates: updates.updatesFor(rawSqlite), ); return database; } From 7557d82a7ebef280b63852934926f8e672cd7691 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 12:02:24 +0200 Subject: [PATCH 2/6] Fix tests --- .../native_sqlite_connection_impl.dart | 43 +++---------------- .../lib/src/utils/shared_utils.dart | 1 + .../lib/src/web/update_notifications.dart | 5 ++- .../lib/src/web/web_sqlite_open_factory.dart | 11 ++--- packages/sqlite_async/lib/web.dart | 4 +- scripts/sqlite3_wasm_download.dart | 2 +- 6 files changed, 19 insertions(+), 47 deletions(-) diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index e2df0f3..301d0af 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -303,40 +303,13 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - Timer? updateDebouncer; - Set updatedTables = {}; + db.throttledUpdatedTables.listen((changedTables) { + client.fire(UpdateNotification(changedTables)); + }); + int? txId; Object? txError; - void maybeFireUpdates() { - // We keep buffering the set of updated tables until we are not - // in a transaction. Firing transactions inside a transaction - // has multiple issues: - // 1. Watched queries would detect changes to the underlying tables, - // but the data would not be visible to queries yet. - // 2. It would trigger many more notifications than required. - // - // This still includes updates for transactions that are rolled back. - // We could handle those better at a later stage. - - if (updatedTables.isNotEmpty && db.autocommit) { - client.fire(UpdateNotification(updatedTables)); - updatedTables.clear(); - } - updateDebouncer?.cancel(); - updateDebouncer = null; - } - - db.updates.listen((event) { - updatedTables.add(event.tableName); - - // This handles two cases: - // 1. Update arrived after _SqliteIsolateClose (not sure if this could happen). - // 2. Long-running _SqliteIsolateClosure that should fire updates while running. - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - }); - ResultSet runStatement(_SqliteIsolateStatement data) { if (data.sql == 'BEGIN' || data.sql == 'BEGIN IMMEDIATE') { if (txId != null) { @@ -388,8 +361,6 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, throw sqlite.SqliteException( 0, 'Transaction must be closed within the read or write lock'); } - // We would likely have received updates by this point - fire now. - maybeFireUpdates(); return null; case _SqliteIsolateStatement(): return task.timeSync( @@ -399,11 +370,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, parameters: data.args, ); case _SqliteIsolateClosure(): - try { - return await data.cb(db); - } finally { - maybeFireUpdates(); - } + return await data.cb(db); case _SqliteIsolateConnectionClose(): db.dispose(); return null; diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ff291f4..0e9c9d6 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -123,6 +123,7 @@ extension ThrottledUpdates on CommonDatabase { controller = StreamController(onListen: () { txSubscription = commits.listen((_) { + print('did commit'); maybeFireUpdates(); }, onError: (error) { controller?.addError(error); diff --git a/packages/sqlite_async/lib/src/web/update_notifications.dart b/packages/sqlite_async/lib/src/web/update_notifications.dart index ecea60d..f04a785 100644 --- a/packages/sqlite_async/lib/src/web/update_notifications.dart +++ b/packages/sqlite_async/lib/src/web/update_notifications.dart @@ -26,7 +26,8 @@ final class UpdateNotificationStreams { .map((e) => (e as JSString).toDart) .toSet()); - _updates[customRequest.rawSql.toDart]?.add(notification); + final controller = _updates[customRequest.rawSql.toDart]; + controller?.add(notification); } return null; @@ -50,6 +51,8 @@ final class UpdateNotificationStreams { id, [false], )); + + _updates.remove(id); }; return controller.stream; diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index 205734f..513b1f2 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -10,7 +10,7 @@ import 'package:sqlite_async/web.dart'; import 'database.dart'; import 'worker/worker_utils.dart'; -Map> webSQLiteImplementations = {}; +Map> _webSQLiteImplementations = {}; /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory @@ -20,13 +20,13 @@ class DefaultSqliteOpenFactory final cacheKey = sqliteOptions.webSqliteOptions.wasmUri + sqliteOptions.webSqliteOptions.workerUri; - if (webSQLiteImplementations.containsKey(cacheKey)) { - return webSQLiteImplementations[cacheKey]!; + if (_webSQLiteImplementations.containsKey(cacheKey)) { + return _webSQLiteImplementations[cacheKey]!; } - webSQLiteImplementations[cacheKey] = + _webSQLiteImplementations[cacheKey] = openWebSqlite(sqliteOptions.webSqliteOptions); - return webSQLiteImplementations[cacheKey]!; + return _webSQLiteImplementations[cacheKey]!; }); DefaultSqliteOpenFactory( @@ -42,6 +42,7 @@ class DefaultSqliteOpenFactory wasmModule: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), worker: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), controller: AsyncSqliteController(), + handleCustomRequest: handleCustomRequest, ); } diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index f151b81..e318697 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -29,6 +29,8 @@ typedef WebDatabaseEndpoint = ({ String? lockName, }); +final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); + /// An additional interface for [SqliteOpenFactory] exposing additional /// functionality that is only relevant when compiling to the web. /// @@ -36,8 +38,6 @@ typedef WebDatabaseEndpoint = ({ /// compiling for the web. abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { - final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); - /// Handles a custom request sent from the worker to the client. Future handleCustomRequest(JSAny? request) { return _updateStreams.handleRequest(request); diff --git a/scripts/sqlite3_wasm_download.dart b/scripts/sqlite3_wasm_download.dart index 62acbbe..9716eee 100644 --- a/scripts/sqlite3_wasm_download.dart +++ b/scripts/sqlite3_wasm_download.dart @@ -4,7 +4,7 @@ library; import 'dart:io'; final sqliteUrl = - 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.4.3/sqlite3.wasm'; + 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.8.0/sqlite3.wasm'; void main() async { // Create assets directory if it doesn't exist From a0bab168c218d4d533f6ed9010062d06213dde79 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 12:18:58 +0200 Subject: [PATCH 3/6] Remove debug print --- packages/sqlite_async/lib/src/utils/shared_utils.dart | 1 - packages/sqlite_async/pubspec.yaml | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index 0e9c9d6..ff291f4 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -123,7 +123,6 @@ extension ThrottledUpdates on CommonDatabase { controller = StreamController(onListen: () { txSubscription = commits.listen((_) { - print('did commit'); maybeFireUpdates(); }, onError: (error) { controller?.addError(error); diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index a155f2b..7d7c1c3 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -3,7 +3,7 @@ description: High-performance asynchronous interface for SQLite on Dart and Flut version: 0.11.8 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: ">=3.5.0 <4.0.0" + sdk: ">=3.6.0 <4.0.0" topics: - sqlite @@ -12,8 +12,8 @@ topics: - flutter dependencies: - sqlite3: ^2.8.0 - sqlite3_web: ^0.3.0 + sqlite3: ^2.9.0 + sqlite3_web: ^0.3.1 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 From b2a432800486f31ad3f829145b30a01eb99ef999 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 14:39:11 +0200 Subject: [PATCH 4/6] Update sqlite3_web as well --- packages/sqlite_async/pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 7d7c1c3..586bcc6 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -13,7 +13,7 @@ topics: dependencies: sqlite3: ^2.9.0 - sqlite3_web: ^0.3.1 + sqlite3_web: ^0.3.2 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 From 2f580747f4869fd2dd03badf0c621ef70569e4d9 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 15:44:28 +0200 Subject: [PATCH 5/6] Fix import --- packages/sqlite_async/lib/src/web/worker/worker_utils.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 7603306..265f4f7 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -7,7 +7,7 @@ import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; -import 'package:sqlite_async/src/utils/database_utils.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import '../protocol.dart'; From d546c5dd39738b8504eab9e3ea26bf65617fcdd2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 10:49:36 +0200 Subject: [PATCH 6/6] Avoid updating minimum SDK constraint --- packages/sqlite_async/lib/src/web/worker/worker_utils.dart | 3 ++- packages/sqlite_async/pubspec.yaml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 265f4f7..0c3e8f7 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -135,7 +135,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { } database.execute(sql, parameters); case CustomDatabaseMessageKind.updateSubscriptionManagement: - final shouldSubscribe = (message.rawParameters[0] as JSBoolean).toDart; + final shouldSubscribe = + (message.rawParameters.toDart[0] as JSBoolean).toDart; final id = message.rawSql.toDart; final state = _findState(connection); diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 586bcc6..464bd2d 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -3,7 +3,7 @@ description: High-performance asynchronous interface for SQLite on Dart and Flut version: 0.11.8 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: ">=3.6.0 <4.0.0" + sdk: ">=3.5.0 <4.0.0" topics: - sqlite