diff --git a/demos/supabase-todolist/lib/widgets/todo_list_page.dart b/demos/supabase-todolist/lib/widgets/todo_list_page.dart index 7e28238e..83c5c286 100644 --- a/demos/supabase-todolist/lib/widgets/todo_list_page.dart +++ b/demos/supabase-todolist/lib/widgets/todo_list_page.dart @@ -1,4 +1,5 @@ import 'package:flutter/material.dart'; +import 'package:powersync/powersync.dart'; import '../powersync.dart'; import './status_app_bar.dart'; @@ -38,30 +39,82 @@ class TodoListPage extends StatelessWidget { } } -class TodoListWidget extends StatelessWidget { +class TodoListWidget extends StatefulWidget { final TodoList list; const TodoListWidget({super.key, required this.list}); + @override + State createState() => _TodoListWidgetState(); +} + +class _TodoListWidgetState extends State { + SyncStreamSubscription? _listSubscription; + + void _subscribe(String listId) { + db + .syncStream('todos', {'list': listId}) + .subscribe(ttl: const Duration(hours: 1)) + .then((sub) { + if (mounted && widget.list.id == listId) { + setState(() { + _listSubscription = sub; + }); + } else { + sub.unsubscribe(); + } + }); + } + + @override + void initState() { + super.initState(); + _subscribe(widget.list.id); + } + + @override + void didUpdateWidget(covariant TodoListWidget oldWidget) { + super.didUpdateWidget(oldWidget); + _subscribe(widget.list.id); + } + + @override + void dispose() { + super.dispose(); + _listSubscription?.unsubscribe(); + } + @override Widget build(BuildContext context) { return StreamBuilder( - stream: TodoList.watchSyncStatus().map((e) => e.hasSynced), - initialData: db.currentStatus.hasSynced, + stream: db.statusStream, + initialData: db.currentStatus, builder: (context, snapshot) { - return StreamBuilder( - stream: list.watchItems(), - builder: (context, snapshot) { - final items = snapshot.data ?? const []; - - return ListView( - padding: const EdgeInsets.symmetric(vertical: 8.0), - children: items.map((todo) { - return TodoItemWidget(todo: todo); - }).toList(), - ); - }, - ); + final hasSynced = switch (_listSubscription) { + null => null, + final sub => snapshot.requireData.statusFor(sub), + } + ?.subscription + .hasSynced ?? + false; + + if (!hasSynced) { + return const CircularProgressIndicator(); + } else { + return StreamBuilder( + stream: widget.list.watchItems(), + builder: (context, snapshot) { + final items = snapshot.data ?? const []; + + return ListView( + padding: const EdgeInsets.symmetric(vertical: 8.0), + children: items.map((todo) { + return TodoItemWidget(todo: todo); + }).toList(), + ); + }, + ); + } }, ); } diff --git a/demos/supabase-todolist/pubspec.lock b/demos/supabase-todolist/pubspec.lock index cea2023c..10127ba2 100644 --- a/demos/supabase-todolist/pubspec.lock +++ b/demos/supabase-todolist/pubspec.lock @@ -605,10 +605,10 @@ packages: dependency: transitive description: name: sqlite3 - sha256: "310af39c40dd0bb2058538333c9d9840a2725ae0b9f77e4fd09ad6696aa8f66e" + sha256: f393d92c71bdcc118d6203d07c991b9be0f84b1a6f89dd4f7eed348131329924 url: "https://pub.dev" source: hosted - version: "2.7.5" + version: "2.9.0" sqlite3_flutter_libs: dependency: transitive description: @@ -621,18 +621,17 @@ packages: dependency: transitive description: name: sqlite3_web - sha256: "967e076442f7e1233bd7241ca61f3efe4c7fc168dac0f38411bdb3bdf471eb3c" + sha256: "0f6ebcb4992d1892ac5c8b5ecd22a458ab9c5eb6428b11ae5ecb5d63545844da" url: "https://pub.dev" source: hosted - version: "0.3.1" + version: "0.3.2" sqlite_async: dependency: "direct main" description: - name: sqlite_async - sha256: "9332aedd311a19dd215dcb55729bc68dc587dc7655b569ab8819b68ee0be0082" - url: "https://pub.dev" - source: hosted - version: "0.11.7" + path: "/Users/simon/src/sqlite_async.dart/packages/sqlite_async" + relative: false + source: path + version: "0.12.1" stack_trace: dependency: transitive description: diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index b4dfe35d..ef2e97c7 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -11,6 +11,7 @@ export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; export 'src/sync/options.dart' hide ResolvedSyncOptions; +export 'src/sync/stream.dart' hide CoreActiveStreamSubscription; export 'src/sync/sync_status.dart' - hide BucketProgress, InternalSyncDownloadProgress; + hide BucketProgress, InternalSyncDownloadProgress, InternalSyncStatusAccess; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index d55b69db..03cdbc01 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -133,6 +133,8 @@ class PowerSyncDatabaseImpl Future connectInternal({ required PowerSyncBackendConnector connector, required ResolvedSyncOptions options, + required List initiallyActiveStreams, + required Stream> activeStreams, required AbortController abort, required Zone asyncWorkZone, }) async { @@ -140,6 +142,7 @@ class PowerSyncDatabaseImpl bool triedSpawningIsolate = false; StreamSubscription? crudUpdateSubscription; + StreamSubscription? activeStreamsSubscription; final receiveMessages = ReceivePort(); final receiveUnhandledErrors = ReceivePort(); final receiveExit = ReceivePort(); @@ -157,6 +160,7 @@ class PowerSyncDatabaseImpl // Cleanup crudUpdateSubscription?.cancel(); + activeStreamsSubscription?.cancel(); receiveMessages.close(); receiveUnhandledErrors.close(); receiveExit.close(); @@ -198,6 +202,10 @@ class PowerSyncDatabaseImpl crudUpdateSubscription = crudStream.listen((event) { port.send(['update']); }); + + activeStreamsSubscription = activeStreams.listen((streams) { + port.send(['changed_subscriptions', streams]); + }); } else if (action == 'uploadCrud') { await (data[1] as PortCompleter).handle(() async { await connector.uploadData(this); @@ -366,6 +374,9 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { } } else if (action == 'close') { await shutdown(); + } else if (action == 'changed_subscriptions') { + openedStreamingSync + ?.updateSubscriptions(message[1] as List); } } }); diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index a4f0b419..ae891cb7 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -7,6 +7,7 @@ import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import '../sync/options.dart'; +import '../sync/streaming_sync.dart'; import 'powersync_database.dart'; import '../connector.dart'; @@ -115,6 +116,8 @@ class PowerSyncDatabaseImpl Future connectInternal({ required PowerSyncBackendConnector connector, required AbortController abort, + required List initiallyActiveStreams, + required Stream> activeStreams, required Zone asyncWorkZone, required ResolvedSyncOptions options, }) { diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index dc4b2ddb..329b367d 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -13,9 +13,13 @@ import 'package:powersync_core/src/powersync_update_notification.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; import 'package:powersync_core/src/schema_logic.dart' as schema_logic; +import 'package:powersync_core/src/sync/connection_manager.dart'; import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/sync_status.dart'; +import '../sync/stream.dart'; +import '../sync/streaming_sync.dart'; + mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Schema used for the local database. Schema get schema; @@ -41,16 +45,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @Deprecated("This field is unused, pass params to connect() instead") Map? clientParams; + late final ConnectionManager _connections; + /// Current connection status. - SyncStatus currentStatus = - const SyncStatus(connected: false, lastSyncedAt: null); + SyncStatus get currentStatus => _connections.currentStatus; /// Use this stream to subscribe to connection status updates. - late final Stream statusStream; - - @protected - StreamController statusStreamController = - StreamController.broadcast(); + Stream get statusStream => _connections.statusStream; late final ActiveDatabaseGroup _activeGroup; @@ -80,15 +81,6 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @protected Future get isInitialized; - /// The abort controller for the current sync iteration. - /// - /// null when disconnected, present when connecting or connected. - /// - /// The controller must only be accessed from within a critical section of the - /// sync mutex. - @protected - AbortController? _abortActiveSync; - @protected Future baseInit() async { String identifier = 'memory'; @@ -106,15 +98,14 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { 'instantiation logic if this is not intentional', ); } - - statusStream = statusStreamController.stream; + _connections = ConnectionManager(this); updates = powerSyncUpdateNotifications(database.updates); await database.initialize(); await _checkVersion(); await database.execute('SELECT powersync_init()'); await updateSchema(schema); - await _updateHasSynced(); + await _connections.resolveOfflineSyncStatus(); } /// Check that a supported version of the powersync extension is loaded. @@ -140,55 +131,15 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { return isInitialized; } - Future _updateHasSynced() async { - // Query the database to see if any data has been synced. - final result = await database.getAll( - 'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority;', - ); - const prioritySentinel = 2147483647; - var hasSynced = false; - DateTime? lastCompleteSync; - final priorityStatusEntries = []; - - DateTime parseDateTime(String sql) { - return DateTime.parse('${sql}Z').toLocal(); - } - - for (final row in result) { - final priority = row.columnAt(0) as int; - final lastSyncedAt = parseDateTime(row.columnAt(1) as String); - - if (priority == prioritySentinel) { - hasSynced = true; - lastCompleteSync = lastSyncedAt; - } else { - priorityStatusEntries.add(( - hasSynced: true, - lastSyncedAt: lastSyncedAt, - priority: BucketPriority(priority) - )); - } - } - - if (hasSynced != currentStatus.hasSynced) { - final status = SyncStatus( - hasSynced: hasSynced, - lastSyncedAt: lastCompleteSync, - priorityStatusEntries: priorityStatusEntries, - ); - setStatus(status); - } - } - /// Returns a [Future] which will resolve once at least one full sync cycle /// has completed (meaninng that the first consistent checkpoint has been /// reached across all buckets). /// /// When [priority] is null (the default), this method waits for the first - /// full sync checkpoint to complete. When set to a [BucketPriority] however, + /// full sync checkpoint to complete. When set to a [StreamPriority] however, /// it completes once all buckets within that priority (as well as those in /// higher priorities) have been synchronized at least once. - Future waitForFirstSync({BucketPriority? priority}) async { + Future waitForFirstSync({StreamPriority? priority}) async { bool matches(SyncStatus status) { if (priority == null) { return status.hasSynced == true; @@ -197,46 +148,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { } } - if (matches(currentStatus)) { - return; - } - await for (final result in statusStream) { - if (matches(result)) { - break; - } - } + return _connections.firstStatusMatching(matches); } @protected @visibleForTesting void setStatus(SyncStatus status) { - if (status != currentStatus) { - final newStatus = SyncStatus( - connected: status.connected, - downloading: status.downloading, - uploading: status.uploading, - connecting: status.connecting, - uploadError: status.uploadError, - downloadError: status.downloadError, - priorityStatusEntries: status.priorityStatusEntries, - downloadProgress: status.downloadProgress, - // Note that currently the streaming sync implementation will never set - // hasSynced. lastSyncedAt implies that syncing has completed at some - // point (hasSynced = true). - // The previous values of hasSynced should be preserved here. - lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt, - hasSynced: status.lastSyncedAt != null - ? true - : status.hasSynced ?? currentStatus.hasSynced, - ); - - // If the absence of hasSynced was the only difference, the new states - // would be equal and don't require an event. So, check again. - if (newStatus != currentStatus) { - currentStatus = newStatus; - statusStreamController.add(currentStatus); - } - } + _connections.manuallyChangeSyncStatus(status); } @override @@ -263,7 +181,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { // If there are paused subscriptionso n the status stream, don't delay // closing the database because of that. - unawaited(statusStreamController.close()); + _connections.close(); await _activeGroup.close(); } } @@ -297,67 +215,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { params: params, ); - if (schema.rawTables.isNotEmpty && - resolvedOptions.source.syncImplementation != - SyncClientImplementation.rust) { - throw UnsupportedError( - 'Raw tables are only supported by the Rust client.'); - } - - // ignore: deprecated_member_use_from_same_package - clientParams = params; - var thisConnectAborter = AbortController(); - final zone = Zone.current; - - late void Function() retryHandler; - - Future connectWithSyncLock() async { - // Ensure there has not been a subsequent connect() call installing a new - // sync client. - assert(identical(_abortActiveSync, thisConnectAborter)); - assert(!thisConnectAborter.aborted); - - await connectInternal( - connector: connector, - options: resolvedOptions, - abort: thisConnectAborter, - // Run follow-up async tasks in the parent zone, a new one is introduced - // while we hold the lock (and async tasks won't hold the sync lock). - asyncWorkZone: zone, - ); - - thisConnectAborter.onCompletion.whenComplete(retryHandler); - } - - // If the sync encounters a failure without being aborted, retry - retryHandler = Zone.current.bindCallback(() async { - _activeGroup.syncConnectMutex.lock(() async { - // Is this still supposed to be active? (abort is only called within - // mutex) - if (!thisConnectAborter.aborted) { - // We only change _abortActiveSync after disconnecting, which resets - // the abort controller. - assert(identical(_abortActiveSync, thisConnectAborter)); - - // We need a new abort controller for this attempt - _abortActiveSync = thisConnectAborter = AbortController(); - - logger.warning('Sync client failed, retrying...'); - await connectWithSyncLock(); - } - }); - }); - - await _activeGroup.syncConnectMutex.lock(() async { - // Disconnect a previous sync client, if one is active. - await _abortCurrentSync(); - assert(_abortActiveSync == null); - - // Install the abort controller for this particular connect call, allowing - // it to be disconnected. - _abortActiveSync = thisConnectAborter; - await connectWithSyncLock(); - }); + await _connections.connect(connector: connector, options: resolvedOptions); } /// Internal method to establish a sync client connection. @@ -371,6 +229,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { Future connectInternal({ required PowerSyncBackendConnector connector, required ResolvedSyncOptions options, + required List initiallyActiveStreams, + required Stream> activeStreams, required AbortController abort, required Zone asyncWorkZone, }); @@ -379,27 +239,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// /// Use [connect] to connect again. Future disconnect() async { - // Also wrap this in the sync mutex to ensure there's no race between us - // connecting and disconnecting. - await _activeGroup.syncConnectMutex.lock(_abortCurrentSync); - - setStatus( - SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt)); - } - - Future _abortCurrentSync() async { - if (_abortActiveSync case final disconnector?) { - /// Checking `disconnecter.aborted` prevents race conditions - /// where multiple calls to `disconnect` can attempt to abort - /// the controller more than once before it has finished aborting. - if (disconnector.aborted == false) { - await disconnector.abort(); - _abortActiveSync = null; - } else { - /// Wait for the abort to complete. Continue updating the sync status after completed - await disconnector.onCompletion; - } - } + await _connections.disconnect(); } /// Disconnect and clear the database. @@ -417,8 +257,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { await tx.execute('select powersync_clear(?)', [clearLocal ? 1 : 0]); }); // The data has been deleted - reset these - currentStatus = SyncStatus(lastSyncedAt: null, hasSynced: false); - statusStreamController.add(currentStatus); + setStatus(SyncStatus(lastSyncedAt: null, hasSynced: false)); } @Deprecated('Use [disconnectAndClear] instead.') @@ -440,9 +279,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { schema.validate(); await _activeGroup.syncConnectMutex.lock(() async { - if (_abortActiveSync != null) { - throw AssertionError('Cannot update schema while connected'); - } + _connections.checkNotConnected(); this.schema = schema; await database.writeLock((tx) => schema_logic.updateSchema(tx, schema)); @@ -629,6 +466,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { Future refreshSchema() async { await database.refreshSchema(); } + + SyncStream syncStream(String name, [Map? parameters]) { + return _connections.syncStream(name, parameters); + } } Stream powerSyncUpdateNotifications( diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 4af2821e..15a83c7d 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -128,6 +128,8 @@ class PowerSyncDatabaseImpl Future connectInternal({ required PowerSyncBackendConnector connector, required AbortController abort, + required List initiallyActiveStreams, + required Stream> activeStreams, required Zone asyncWorkZone, required ResolvedSyncOptions options, }) async { @@ -141,6 +143,7 @@ class PowerSyncDatabaseImpl connector: connector, options: options.source, workerUri: Uri.base.resolve('/powersync_sync.worker.js'), + subscriptions: initiallyActiveStreams, ); } catch (e) { logger.warning( @@ -157,6 +160,7 @@ class PowerSyncDatabaseImpl crudUpdateTriggerStream: crudStream, options: options, client: BrowserClient(), + activeSubscriptions: initiallyActiveStreams, // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. identifier: database.openFactory.path, @@ -168,7 +172,10 @@ class PowerSyncDatabaseImpl }); sync.streamingSync(); + final subscriptions = activeStreams.listen(sync.updateSubscriptions); + abort.onAbort.then((_) async { + subscriptions.cancel(); await sync.abort(); abort.completeAbort(); }).ignore(); diff --git a/packages/powersync_core/lib/src/sync/connection_manager.dart b/packages/powersync_core/lib/src/sync/connection_manager.dart new file mode 100644 index 00000000..480e02e2 --- /dev/null +++ b/packages/powersync_core/lib/src/sync/connection_manager.dart @@ -0,0 +1,390 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:meta/meta.dart'; +import 'package:powersync_core/src/abort_controller.dart'; +import 'package:powersync_core/src/connector.dart'; +import 'package:powersync_core/src/database/active_instances.dart'; +import 'package:powersync_core/src/database/powersync_db_mixin.dart'; +import 'package:powersync_core/src/sync/options.dart'; +import 'package:powersync_core/src/sync/stream.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; + +import 'instruction.dart'; +import 'mutable_sync_status.dart'; +import 'streaming_sync.dart'; + +/// A (stream name, JSON parameters) pair that uniquely identifies a stream +/// instantiation to subscribe to. +typedef _RawStreamKey = (String, String); + +@internal +final class ConnectionManager { + final PowerSyncDatabaseMixin db; + final ActiveDatabaseGroup _activeGroup; + + /// All streams (with parameters) for which a subscription has been requested + /// explicitly. + final Map<_RawStreamKey, _ActiveSubscription> _locallyActiveSubscriptions = + {}; + + final StreamController _statusController = + StreamController.broadcast(); + + /// Fires when an entry is added or removed from [_locallyActiveSubscriptions] + /// while we're connected. + StreamController? _subscriptionsChanged; + + SyncStatus _currentStatus = + const SyncStatus(connected: false, lastSyncedAt: null); + + SyncStatus get currentStatus => _currentStatus; + Stream get statusStream => _statusController.stream; + + /// The abort controller for the current sync iteration. + /// + /// null when disconnected, present when connecting or connected. + /// + /// The controller must only be accessed from within a critical section of the + /// sync mutex. + AbortController? _abortActiveSync; + + ConnectionManager(this.db) : _activeGroup = db.group; + + void checkNotConnected() { + if (_abortActiveSync != null) { + throw StateError('Cannot update schema while connected'); + } + } + + Future _abortCurrentSync() async { + if (_abortActiveSync case final disconnector?) { + /// Checking `disconnecter.aborted` prevents race conditions + /// where multiple calls to `disconnect` can attempt to abort + /// the controller more than once before it has finished aborting. + if (disconnector.aborted == false) { + await disconnector.abort(); + _abortActiveSync = null; + } else { + /// Wait for the abort to complete. Continue updating the sync status after completed + await disconnector.onCompletion; + } + } + } + + Future disconnect() async { + // Also wrap this in the sync mutex to ensure there's no race between us + // connecting and disconnecting. + await _activeGroup.syncConnectMutex.lock(() async { + await _abortCurrentSync(); + _subscriptionsChanged?.close(); + _subscriptionsChanged = null; + }); + + manuallyChangeSyncStatus( + SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt)); + } + + Future firstStatusMatching(bool Function(SyncStatus) predicate) async { + if (predicate(currentStatus)) { + return; + } + await for (final result in statusStream) { + if (predicate(result)) { + break; + } + } + } + + List get _subscribedStreams => [ + for (final active in _locallyActiveSubscriptions.values) + (name: active.name, parameters: active.encodedParameters) + ]; + + Future connect({ + required PowerSyncBackendConnector connector, + required ResolvedSyncOptions options, + }) async { + if (db.schema.rawTables.isNotEmpty && + options.source.syncImplementation != SyncClientImplementation.rust) { + throw UnsupportedError( + 'Raw tables are only supported by the Rust client.'); + } + + var thisConnectAborter = AbortController(); + final zone = Zone.current; + + late void Function() retryHandler; + + final subscriptionsChanged = StreamController(); + + Future connectWithSyncLock() async { + // Ensure there has not been a subsequent connect() call installing a new + // sync client. + assert(identical(_abortActiveSync, thisConnectAborter)); + assert(!thisConnectAborter.aborted); + + // ignore: invalid_use_of_protected_member + await db.connectInternal( + connector: connector, + options: options, + abort: thisConnectAborter, + initiallyActiveStreams: _subscribedStreams, + activeStreams: subscriptionsChanged.stream.map((_) { + return _subscribedStreams; + }), + // Run follow-up async tasks in the parent zone, a new one is introduced + // while we hold the lock (and async tasks won't hold the sync lock). + asyncWorkZone: zone, + ); + + thisConnectAborter.onCompletion.whenComplete(retryHandler); + } + + // If the sync encounters a failure without being aborted, retry + retryHandler = Zone.current.bindCallback(() async { + _activeGroup.syncConnectMutex.lock(() async { + // Is this still supposed to be active? (abort is only called within + // mutex) + if (!thisConnectAborter.aborted) { + // We only change _abortActiveSync after disconnecting, which resets + // the abort controller. + assert(identical(_abortActiveSync, thisConnectAborter)); + + // We need a new abort controller for this attempt + _abortActiveSync = thisConnectAborter = AbortController(); + + db.logger.warning('Sync client failed, retrying...'); + await connectWithSyncLock(); + } + }); + }); + + await _activeGroup.syncConnectMutex.lock(() async { + // Disconnect a previous sync client, if one is active. + await _abortCurrentSync(); + assert(_abortActiveSync == null); + _subscriptionsChanged = subscriptionsChanged; + + // Install the abort controller for this particular connect call, allowing + // it to be disconnected. + _abortActiveSync = thisConnectAborter; + await connectWithSyncLock(); + }); + } + + void manuallyChangeSyncStatus(SyncStatus status) { + if (status != currentStatus) { + final newStatus = SyncStatus( + connected: status.connected, + downloading: status.downloading, + uploading: status.uploading, + connecting: status.connecting, + uploadError: status.uploadError, + downloadError: status.downloadError, + priorityStatusEntries: status.priorityStatusEntries, + downloadProgress: status.downloadProgress, + // Note that currently the streaming sync implementation will never set + // hasSynced. lastSyncedAt implies that syncing has completed at some + // point (hasSynced = true). + // The previous values of hasSynced should be preserved here. + lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt, + hasSynced: status.lastSyncedAt != null + ? true + : status.hasSynced ?? currentStatus.hasSynced, + streamSubscriptions: status.internalSubscriptions, + ); + + // If the absence of hasSynced was the only difference, the new states + // would be equal and don't require an event. So, check again. + if (newStatus != currentStatus) { + _currentStatus = newStatus; + _statusController.add(_currentStatus); + } + } + } + + _SyncStreamSubscriptionHandle _referenceStreamSubscription( + String stream, Map? parameters) { + final key = (stream, json.encode(parameters)); + _ActiveSubscription active; + + if (_locallyActiveSubscriptions[key] case final current?) { + active = current; + } else { + active = _ActiveSubscription(this, + name: stream, parameters: parameters, encodedParameters: key.$2); + _locallyActiveSubscriptions[key] = active; + _subscriptionsChanged?.add(null); + } + + return _SyncStreamSubscriptionHandle(active); + } + + void _clearSubscription(_ActiveSubscription subscription) { + assert(subscription.refcount == 0); + _locallyActiveSubscriptions + .remove((subscription.name, subscription.encodedParameters)); + _subscriptionsChanged?.add(null); + } + + Future _subscriptionsCommand(Object? command) async { + await db.writeTransaction((tx) { + return tx.execute( + 'SELECT powersync_control(?, ?)', + ['subscriptions', json.encode(command)], + ); + }); + _subscriptionsChanged?.add(null); + } + + Future subscribe({ + required String stream, + required Map? parameters, + Duration? ttl, + StreamPriority? priority, + }) async { + await _subscriptionsCommand({ + 'subscribe': { + 'stream': { + 'name': stream, + 'params': parameters, + }, + 'ttl': ttl?.inSeconds, + 'priority': priority, + }, + }); + + await _activeGroup.syncMutex.lock(() async { + if (_abortActiveSync == null) { + // Since we're not connected, update the offline sync status to reflect + // the new subscription. + // With a connection, the sync client would include it in its state. + await resolveOfflineSyncStatus(); + } + }); + } + + Future unsubscribeAll({ + required String stream, + required Object? parameters, + }) async { + await _subscriptionsCommand({ + 'unsubscribe': { + 'name': stream, + 'params': parameters, + }, + }); + } + + Future resolveOfflineSyncStatus() async { + final row = await db.database.get( + 'SELECT powersync_offline_sync_status() AS r;', + ); + + final status = CoreSyncStatus.fromJson( + json.decode(row['r'] as String) as Map); + + manuallyChangeSyncStatus((MutableSyncStatus()..applyFromCore(status)) + .immutableSnapshot(setLastSynced: true)); + } + + SyncStream syncStream(String name, Map? parameters) { + return _SyncStreamImplementation(this, name, parameters); + } + + void close() { + _statusController.close(); + } +} + +final class _SyncStreamImplementation implements SyncStream { + @override + final String name; + + @override + final Map? parameters; + + final ConnectionManager _connections; + + _SyncStreamImplementation(this._connections, this.name, this.parameters); + + @override + Future subscribe({ + Duration? ttl, + StreamPriority? priority, + }) async { + await _connections.subscribe( + stream: name, + parameters: parameters, + ttl: ttl, + priority: priority, + ); + + return _connections._referenceStreamSubscription(name, parameters); + } + + @override + Future unsubscribeAll() async { + await _connections.unsubscribeAll(stream: name, parameters: parameters); + } +} + +final class _ActiveSubscription { + final ConnectionManager connections; + var refcount = 0; + + final String name; + final String encodedParameters; + final Map? parameters; + + _ActiveSubscription( + this.connections, { + required this.name, + required this.encodedParameters, + required this.parameters, + }); + + void decrementRefCount() { + refcount--; + if (refcount == 0) { + connections._clearSubscription(this); + } + } +} + +final class _SyncStreamSubscriptionHandle implements SyncStreamSubscription { + final _ActiveSubscription _source; + + _SyncStreamSubscriptionHandle(this._source) { + _source.refcount++; + + // This is not unreliable, but can help decrementing refcounts on the inner + // subscription when this handle is deallocated without [unsubscribe] being + // called. + _finalizer.attach(this, _source, detach: this); + } + + @override + String get name => _source.name; + + @override + Map? get parameters => _source.parameters; + + @override + Future unsubscribe() async { + _finalizer.detach(this); + _source.decrementRefCount(); + } + + @override + Future waitForFirstSync() async { + return _source.connections.firstStatusMatching((status) { + final currentProgress = status.statusFor(this); + return currentProgress?.subscription.hasSynced ?? false; + }); + } + + static final Finalizer<_ActiveSubscription> _finalizer = + Finalizer((sub) => sub.decrementRefCount()); +} diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart index f0146e8e..cde81303 100644 --- a/packages/powersync_core/lib/src/sync/instruction.dart +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -1,3 +1,4 @@ +import 'stream.dart'; import 'sync_status.dart'; /// An internal instruction emitted by the sync client in the core extension in @@ -62,12 +63,14 @@ final class CoreSyncStatus { final bool connecting; final List priorityStatus; final DownloadProgress? downloading; + final List? streams; CoreSyncStatus({ required this.connected, required this.connecting, required this.priorityStatus, required this.downloading, + required this.streams, }); factory CoreSyncStatus.fromJson(Map json) { @@ -82,6 +85,10 @@ final class CoreSyncStatus { null => null, final raw as Map => DownloadProgress.fromJson(raw), }, + streams: (json['streams'] as List) + .map((e) => + CoreActiveStreamSubscription.fromJson(e as Map)) + .toList(), ); } diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 23e3becb..273cd597 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'instruction.dart'; +import 'stream.dart'; import 'sync_status.dart'; import 'bucket_storage.dart'; import 'protocol.dart'; @@ -15,6 +16,7 @@ final class MutableSyncStatus { InternalSyncDownloadProgress? downloadProgress; List priorityStatusEntries = const []; + List? streams; DateTime? lastSyncedAt; @@ -51,9 +53,9 @@ final class MutableSyncStatus { hasSynced: true, lastSyncedAt: now, priority: maxBy( - applied.checksums.map((cs) => BucketPriority(cs.priority)), + applied.checksums.map((cs) => StreamPriority(cs.priority)), (priority) => priority, - compare: BucketPriority.comparator, + compare: StreamPriority.comparator, )!, ) ]; @@ -90,11 +92,12 @@ final class MutableSyncStatus { final downloading => InternalSyncDownloadProgress(downloading.buckets), }; lastSyncedAt = status.priorityStatus - .firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority) + .firstWhereOrNull((s) => s.priority == StreamPriority.fullSyncPriority) ?.lastSyncedAt; + streams = status.streams; } - SyncStatus immutableSnapshot() { + SyncStatus immutableSnapshot({bool setLastSynced = false}) { return SyncStatus( connected: connected, connecting: connecting, @@ -103,9 +106,10 @@ final class MutableSyncStatus { downloadProgress: downloadProgress?.asSyncDownloadProgress, priorityStatusEntries: UnmodifiableListView(priorityStatusEntries), lastSyncedAt: lastSyncedAt, - hasSynced: null, // Stream client is not supposed to set this value. + hasSynced: setLastSynced ? lastSyncedAt != null : null, uploadError: uploadError, downloadError: downloadError, + streamSubscriptions: streams, ); } } diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index 6ae94b25..48ed293e 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -27,11 +27,14 @@ final class SyncOptions { /// The [SyncClientImplementation] to use. final SyncClientImplementation syncImplementation; + final bool? includeDefaultStreams; + const SyncOptions({ this.crudThrottleTime, this.retryDelay, this.params, this.syncImplementation = SyncClientImplementation.defaultClient, + this.includeDefaultStreams, }); SyncOptions _copyWith({ @@ -44,6 +47,7 @@ final class SyncOptions { retryDelay: retryDelay, params: params ?? this.params, syncImplementation: syncImplementation, + includeDefaultStreams: includeDefaultStreams, ); } } @@ -96,16 +100,23 @@ extension type ResolvedSyncOptions(SyncOptions source) { Map get params => source.params ?? const {}; + bool get includeDefaultStreams => source.includeDefaultStreams ?? true; + (ResolvedSyncOptions, bool) applyFrom(SyncOptions other) { final newOptions = SyncOptions( crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime, retryDelay: other.retryDelay ?? retryDelay, params: other.params ?? params, + syncImplementation: other.syncImplementation, + includeDefaultStreams: + other.includeDefaultStreams ?? includeDefaultStreams, ); final didChange = !_mapEquality.equals(newOptions.params, params) || newOptions.crudThrottleTime != crudThrottleTime || - newOptions.retryDelay != retryDelay; + newOptions.retryDelay != retryDelay || + newOptions.syncImplementation != source.syncImplementation || + newOptions.includeDefaultStreams != includeDefaultStreams; return (ResolvedSyncOptions(newOptions), didChange); } diff --git a/packages/powersync_core/lib/src/sync/stream.dart b/packages/powersync_core/lib/src/sync/stream.dart new file mode 100644 index 00000000..0dd8d9a7 --- /dev/null +++ b/packages/powersync_core/lib/src/sync/stream.dart @@ -0,0 +1,167 @@ +import 'package:meta/meta.dart'; + +import 'sync_status.dart'; +import '../database/powersync_database.dart'; + +/// A description of a sync stream, consisting of its [name] and the +/// [parameters] used when subscribing. +abstract interface class SyncStreamDescription { + /// The name of the stream as it appears in the stream definition for the + /// PowerSync service. + String get name; + + /// The parameters used to subscribe to the stream, if any. + /// + /// The same stream can be subscribed to multiple times with different + /// parameters. + Map? get parameters; +} + +/// Information about a subscribed sync stream. +/// +/// This includes the [SyncStreamDescription] along with information about the +/// current sync status. +abstract interface class SyncSubscriptionDefinition + extends SyncStreamDescription { + /// Whether this stream is active, meaning that the subscription has been + /// acknownledged by the sync serivce. + bool get active; + + /// Whether this stream subscription is included by default, regardless of + /// whether the stream has explicitly been subscribed to or not. + /// + /// It's possible for both [isDefault] and [hasExplicitSubscription] to be + /// true at the same time - this happens when a default stream was subscribed + /// explicitly. + bool get isDefault; + + /// Whether this stream has been subscribed to explicitly. + /// + /// It's possible for both [isDefault] and [hasExplicitSubscription] to be + /// true at the same time - this happens when a default stream was subscribed + /// explicitly. + bool get hasExplicitSubscription; + + /// For sync streams that have a time-to-live, the current time at which the + /// stream would expire if not subscribed to again. + DateTime? get expiresAt; + + /// Whether this stream subscription has been synced at least once. + bool get hasSynced; + + /// If [hasSynced] is true, the last time data from this stream has been + /// synced. + DateTime? get lastSyncedAt; +} + +/// A handle to a [SyncStreamDescription] that allows subscribing to the stream. +/// +/// To obtain an instance of [SyncStream], call [PowerSyncDatabase.syncStream]. +abstract interface class SyncStream extends SyncStreamDescription { + /// Adds a subscription to this stream, requesting it to be included when + /// connecting to the sync service. + /// + /// The [priority] can be used to override the priority of this stream. + Future subscribe({ + Duration? ttl, + StreamPriority? priority, + }); + + Future unsubscribeAll(); +} + +/// A [SyncStream] that has been subscribed to. +abstract interface class SyncStreamSubscription + implements SyncStreamDescription { + /// A variant of [PowerSyncDatabase.waitForFirstSync] that is specific to + /// this stream subscription. + Future waitForFirstSync(); + + /// Removes this stream subscription from the database, if it has been + /// subscribed to explicitly. + /// + /// The subscription may still be included for a while, until the client + /// reconnects and receives new snapshots from the sync service. + Future unsubscribe(); +} + +/// An `ActiveStreamSubscription` as part of the sync status in Rust. +@internal +final class CoreActiveStreamSubscription implements SyncSubscriptionDefinition { + @override + final String name; + @override + final Map? parameters; + final StreamPriority priority; + final List associatedBuckets; + @override + final bool active; + @override + final bool isDefault; + @override + final bool hasExplicitSubscription; + @override + final DateTime? expiresAt; + @override + final DateTime? lastSyncedAt; + + @override + bool get hasSynced => lastSyncedAt != null; + + CoreActiveStreamSubscription._({ + required this.name, + required this.parameters, + required this.priority, + required this.associatedBuckets, + required this.active, + required this.isDefault, + required this.hasExplicitSubscription, + required this.expiresAt, + required this.lastSyncedAt, + }); + + factory CoreActiveStreamSubscription.fromJson(Map json) { + return CoreActiveStreamSubscription._( + name: json['name'] as String, + parameters: json['parameters'] as Map?, + priority: switch (json['priority'] as int?) { + final prio? => StreamPriority(prio), + null => StreamPriority.fullSyncPriority, + }, + associatedBuckets: (json['associated_buckets'] as List).cast(), + active: json['active'] as bool, + isDefault: json['is_default'] as bool, + hasExplicitSubscription: json['has_explicit_subscription'] as bool, + expiresAt: switch (json['expires_at']) { + null => null, + final timestamp as int => + DateTime.fromMillisecondsSinceEpoch(timestamp * 1000), + }, + lastSyncedAt: switch (json['last_synced_at']) { + null => null, + final timestamp as int => + DateTime.fromMillisecondsSinceEpoch(timestamp * 1000), + }, + ); + } + + Map toJson() { + return { + 'name': name, + 'parameters': parameters, + 'priority': priority.priorityNumber, + 'associated_buckets': associatedBuckets, + 'active': active, + 'is_default': isDefault, + 'has_explicit_subscription': hasExplicitSubscription, + 'expires_at': switch (expiresAt) { + null => null, + final expiresAt => expiresAt.millisecondsSinceEpoch / 1000, + }, + 'last_synced_at': switch (lastSyncedAt) { + null => null, + final lastSyncedAt => lastSyncedAt.millisecondsSinceEpoch / 1000, + } + }; + } +} diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index c460de30..f5933f4e 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -21,6 +21,8 @@ import 'stream_utils.dart'; import 'sync_status.dart'; import 'protocol.dart'; +typedef SubscribedStream = ({String name, String parameters}); + abstract interface class StreamingSync { Stream get statusStream; @@ -28,6 +30,8 @@ abstract interface class StreamingSync { /// Close any active streams. Future abort(); + + void updateSubscriptions(List streams); } @internal @@ -36,6 +40,7 @@ class StreamingSyncImplementation implements StreamingSync { final BucketStorage adapter; final InternalConnector connector; final ResolvedSyncOptions options; + List _activeSubscriptions; final Logger logger; @@ -69,6 +74,7 @@ class StreamingSyncImplementation implements StreamingSync { required this.crudUpdateTriggerStream, required this.options, required http.Client client, + List activeSubscriptions = const [], Mutex? syncMutex, Mutex? crudMutex, Logger? logger, @@ -80,7 +86,8 @@ class StreamingSyncImplementation implements StreamingSync { syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"), crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"), _userAgentHeaders = userAgentHeaders(), - logger = logger ?? isolateLogger; + logger = logger ?? isolateLogger, + _activeSubscriptions = activeSubscriptions; Duration get _retryDelay => options.retryDelay; @@ -124,6 +131,14 @@ class StreamingSyncImplementation implements StreamingSync { return _abort?.aborted ?? false; } + @override + void updateSubscriptions(List streams) { + _activeSubscriptions = streams; + if (_nonLineSyncEvents.hasListener) { + _nonLineSyncEvents.add(HandleChangedSubscriptions(streams)); + } + } + @override Future streamingSync() async { try { @@ -449,6 +464,7 @@ class StreamingSyncImplementation implements StreamingSync { _state.updateStatus((s) => s.setConnected()); await handleLine(line as StreamingSyncLine); case UploadCompleted(): + case HandleChangedSubscriptions(): // Only relevant for the Rust sync implementation. break; case AbortCurrentIteration(): @@ -588,6 +604,7 @@ typedef BucketDescription = ({ final class _ActiveRustStreamingIteration { final StreamingSyncImplementation sync; + var _isActive = true; var _hadSyncLine = false; @@ -596,6 +613,12 @@ final class _ActiveRustStreamingIteration { _ActiveRustStreamingIteration(this.sync); + List _encodeSubscriptions(List subscriptions) { + return sync._activeSubscriptions + .map((s) => {'name': s.name, 'params': s.parameters}) + .toList(); + } + Future syncIteration() async { try { await _control( @@ -603,6 +626,8 @@ final class _ActiveRustStreamingIteration { convert.json.encode({ 'parameters': sync.options.params, 'schema': convert.json.decode(sync.schemaJson), + 'include_defaults': sync.options.includeDefaultStreams, + 'active_streams': _encodeSubscriptions(sync._activeSubscriptions), }), ); assert(_completedStream.isCompleted, 'Should have started streaming'); @@ -652,6 +677,9 @@ final class _ActiveRustStreamingIteration { break loop; case TokenRefreshComplete(): await _control('refreshed_token'); + case HandleChangedSubscriptions(:final currentSubscriptions): + await _control('update_subscriptions', + convert.json.encode(_encodeSubscriptions(currentSubscriptions))); } } } @@ -741,3 +769,9 @@ final class TokenRefreshComplete implements SyncEvent { final class AbortCurrentIteration implements SyncEvent { const AbortCurrentIteration(); } + +final class HandleChangedSubscriptions implements SyncEvent { + final List currentSubscriptions; + + HandleChangedSubscriptions(this.currentSubscriptions); +} diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index 62c48df1..3665d0f8 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -2,9 +2,11 @@ import 'dart:math'; import 'package:collection/collection.dart'; import 'package:meta/meta.dart'; +import '../database/powersync_database.dart'; import 'bucket_storage.dart'; import 'protocol.dart'; +import 'stream.dart'; final class SyncStatus { /// true if currently connected. @@ -54,6 +56,9 @@ final class SyncStatus { final List priorityStatusEntries; + final List? _internalSubscriptions; + + @internal const SyncStatus({ this.connected = false, this.connecting = false, @@ -65,7 +70,8 @@ final class SyncStatus { this.downloadError, this.uploadError, this.priorityStatusEntries = const [], - }); + List? streamSubscriptions, + }) : _internalSubscriptions = streamSubscriptions; @override bool operator ==(Object other) { @@ -78,8 +84,10 @@ final class SyncStatus { other.uploadError == uploadError && other.lastSyncedAt == lastSyncedAt && other.hasSynced == hasSynced && - _statusEquality.equals( + _listEquality.equals( other.priorityStatusEntries, priorityStatusEntries) && + _listEquality.equals( + other._internalSubscriptions, _internalSubscriptions) && other.downloadProgress == downloadProgress); } @@ -110,6 +118,18 @@ final class SyncStatus { ); } + /// All sync streams currently being tracked in this subscription. + /// + /// This returns null when the sync stream is currently being opened and we + /// don't have reliable information about all included streams yet (in that + /// state, [PowerSyncDatabase.subscribedStreams] can still be used to + /// resolve known subscriptions locally). + Iterable? get activeSubscriptions { + return _internalSubscriptions?.map((subscription) { + return SyncStreamStatus._(subscription, downloadProgress); + }); + } + /// Get the current [downloadError] or [uploadError]. Object? get anyError { return downloadError ?? uploadError; @@ -149,6 +169,21 @@ final class SyncStatus { ); } + /// If the [stream] appears in [activeSubscriptions], returns the current + /// status for that stream. + SyncStreamStatus? statusFor(SyncStreamDescription stream) { + final raw = _internalSubscriptions?.firstWhereOrNull( + (e) => + e.name == stream.name && + _mapEquality.equals(e.parameters, stream.parameters), + ); + + if (raw == null) { + return null; + } + return SyncStreamStatus._(raw, downloadProgress); + } + @override int get hashCode { return Object.hash( @@ -159,8 +194,9 @@ final class SyncStatus { uploadError, downloadError, lastSyncedAt, - _statusEquality.hash(priorityStatusEntries), + _listEquality.hash(priorityStatusEntries), downloadProgress, + _listEquality.hash(_internalSubscriptions), ); } @@ -169,37 +205,57 @@ final class SyncStatus { return "SyncStatus"; } - // This should be a ListEquality, but that appears to - // cause weird type errors with DDC (but only after hot reloads?!) - static const _statusEquality = ListEquality(); + static const _listEquality = ListEquality(); + static const _mapEquality = MapEquality(); +} + +@internal +extension InternalSyncStatusAccess on SyncStatus { + List? get internalSubscriptions => + _internalSubscriptions; +} + +final class SyncStreamStatus { + final ProgressWithOperations? progress; + final CoreActiveStreamSubscription _internal; + + SyncSubscriptionDefinition get subscription => _internal; + StreamPriority get priority => _internal.priority; + bool get isDefault => _internal.isDefault; + + SyncStreamStatus._(this._internal, SyncDownloadProgress? progress) + : progress = progress?._internal._forStream(_internal); } -/// The priority of a PowerSync bucket. -extension type const BucketPriority._(int priorityNumber) { +@Deprecated('Use StreamPriority instead') +typedef BucketPriority = StreamPriority; + +/// The priority of a PowerSync stream. +extension type const StreamPriority._(int priorityNumber) { static const _highest = 0; - factory BucketPriority(int i) { + factory StreamPriority(int i) { assert(i >= _highest); - return BucketPriority._(i); + return StreamPriority._(i); } - bool operator >(BucketPriority other) => comparator(this, other) > 0; - bool operator >=(BucketPriority other) => comparator(this, other) >= 0; - bool operator <(BucketPriority other) => comparator(this, other) < 0; - bool operator <=(BucketPriority other) => comparator(this, other) <= 0; + bool operator >(StreamPriority other) => comparator(this, other) > 0; + bool operator >=(StreamPriority other) => comparator(this, other) >= 0; + bool operator <(StreamPriority other) => comparator(this, other) < 0; + bool operator <=(StreamPriority other) => comparator(this, other) <= 0; - /// A [Comparator] instance suitable for comparing [BucketPriority] values. - static int comparator(BucketPriority a, BucketPriority b) => + /// A [Comparator] instance suitable for comparing [StreamPriority] values. + static int comparator(StreamPriority a, StreamPriority b) => -a.priorityNumber.compareTo(b.priorityNumber); /// The priority used by PowerSync to indicate that a full sync was completed. - static const fullSyncPriority = BucketPriority._(2147483647); + static const fullSyncPriority = StreamPriority._(2147483647); } /// Partial information about the synchronization status for buckets within a /// priority. typedef SyncPriorityStatus = ({ - BucketPriority priority, + StreamPriority priority, DateTime? lastSyncedAt, bool? hasSynced, }); @@ -227,7 +283,7 @@ class UploadQueueStats { /// Per-bucket download progress information. @internal typedef BucketProgress = ({ - BucketPriority priority, + StreamPriority priority, int atLast, int sinceLast, int targetCount, @@ -288,13 +344,23 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { /// Sums the total target and completed operations for all buckets up until /// the given [priority] (inclusive). ProgressWithOperations untilPriority(BucketPriority priority) { - final (total, downloaded) = - buckets.values.where((e) => e.priority >= priority).fold( + final (total, downloaded) = buckets.values + .where((e) => e.priority >= priority) + .fold((0, 0), _addProgress); + + return ProgressWithOperations._(total, downloaded); + } + + ProgressWithOperations _forStream(CoreActiveStreamSubscription subscription) { + final (total, downloaded) = subscription.associatedBuckets.fold( (0, 0), - (prev, entry) { - final downloaded = entry.sinceLast; - final total = entry.targetCount - entry.atLast; - return (prev.$1 + total, prev.$2 + downloaded); + (prev, bucket) { + final foundProgress = buckets[bucket]; + if (foundProgress == null) { + return prev; + } + + return _addProgress(prev, foundProgress); }, ); @@ -340,6 +406,12 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { } static const _mapEquality = MapEquality(); + + (int, int) _addProgress((int, int) prev, BucketProgress entry) { + final downloaded = entry.sinceLast; + final total = entry.targetCount - entry.atLast; + return (prev.$1 + total, prev.$2 + downloaded); + } } /// Information about a progressing download. diff --git a/packages/powersync_core/lib/src/web/sync_controller.dart b/packages/powersync_core/lib/src/web/sync_controller.dart index 7f05cff3..b3f0ef18 100644 --- a/packages/powersync_core/lib/src/web/sync_controller.dart +++ b/packages/powersync_core/lib/src/web/sync_controller.dart @@ -15,6 +15,7 @@ class SyncWorkerHandle implements StreamingSync { final PowerSyncBackendConnector connector; final SyncOptions options; late final WorkerCommunicationChannel _channel; + List subscriptions; final StreamController _status = StreamController.broadcast(); @@ -24,6 +25,7 @@ class SyncWorkerHandle implements StreamingSync { required this.options, required MessagePort sendToWorker, required SharedWorker worker, + required this.subscriptions, }) { _channel = WorkerCommunicationChannel( port: sendToWorker, @@ -81,6 +83,7 @@ class SyncWorkerHandle implements StreamingSync { required PowerSyncBackendConnector connector, required Uri workerUri, required SyncOptions options, + required List subscriptions, }) async { final worker = SharedWorker(workerUri.toString().toJS); final handle = SyncWorkerHandle._( @@ -89,6 +92,7 @@ class SyncWorkerHandle implements StreamingSync { connector: connector, sendToWorker: worker.port, worker: worker, + subscriptions: subscriptions, ); // Make sure that the worker is working, or throw immediately. @@ -116,6 +120,13 @@ class SyncWorkerHandle implements StreamingSync { database.database.openFactory.path, ResolvedSyncOptions(options), database.schema, + subscriptions, ); } + + @override + void updateSubscriptions(List streams) { + subscriptions = streams; + _channel.updateSubscriptions(streams); + } } diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index ddc4eaf0..25dfddaf 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -8,6 +8,7 @@ import 'dart:convert'; import 'dart:js_interop'; import 'package:async/async.dart'; +import 'package:collection/collection.dart'; import 'package:http/browser_client.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; @@ -45,8 +46,12 @@ class _SyncWorker { }); } - _SyncRunner referenceSyncTask(String databaseIdentifier, SyncOptions options, - String schemaJson, _ConnectedClient client) { + _SyncRunner referenceSyncTask( + String databaseIdentifier, + SyncOptions options, + String schemaJson, + List subscriptions, + _ConnectedClient client) { return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { return _SyncRunner(databaseIdentifier); }) @@ -54,6 +59,7 @@ class _SyncWorker { client, options, schemaJson, + subscriptions, ); } } @@ -90,13 +96,22 @@ class _ConnectedClient { }, ); - _runner = _worker.referenceSyncTask(request.databaseName, - recoveredOptions, request.schemaJson, this); + _runner = _worker.referenceSyncTask( + request.databaseName, + recoveredOptions, + request.schemaJson, + request.subscriptions?.toDart ?? const [], + this, + ); return (JSObject(), null); case SyncWorkerMessageType.abortSynchronization: _runner?.disconnectClient(this); _runner = null; return (JSObject(), null); + case SyncWorkerMessageType.updateSubscriptions: + _runner?.updateClientSubscriptions( + this, (payload as UpdateSubscriptions).toDart); + return (JSObject(), null); default: throw StateError('Unexpected message type $type'); } @@ -137,9 +152,10 @@ class _SyncRunner { final StreamGroup<_RunnerEvent> _group = StreamGroup(); final StreamController<_RunnerEvent> _mainEvents = StreamController(); - StreamingSync? sync; + StreamingSyncImplementation? sync; _ConnectedClient? databaseHost; - final connections = <_ConnectedClient>[]; + final connections = <_ConnectedClient, List>{}; + List currentStreams = []; _SyncRunner(this.identifier) { _group.add(_mainEvents.stream); @@ -152,8 +168,9 @@ class _SyncRunner { :final client, :final options, :final schemaJson, + :final subscriptions, ): - connections.add(client); + connections[client] = subscriptions; final (newOptions, reconnect) = this.options.applyFrom(options); this.options = newOptions; this.schemaJson = schemaJson; @@ -165,6 +182,8 @@ class _SyncRunner { sync?.abort(); sync = null; await _requestDatabase(client); + } else { + reindexSubscriptions(); } case _RemoveConnection(:final client): connections.remove(client); @@ -191,6 +210,12 @@ class _SyncRunner { } else { await _requestDatabase(newHost); } + case _ClientSubscriptionsChanged( + :final client, + :final subscriptions + ): + connections[client] = subscriptions; + reindexSubscriptions(); } } catch (e, s) { _logger.warning('Error handling $event', e, s); @@ -199,12 +224,22 @@ class _SyncRunner { }); } + /// Updates [currentStreams] to the union of values in [connections]. + void reindexSubscriptions() { + final before = currentStreams.toSet(); + final after = connections.values.flattenedToSet; + if (!const SetEquality().equals(before, after)) { + currentStreams = after.toList(); + sync?.updateSubscriptions(currentStreams); + } + } + /// Pings all current [connections], removing those that don't answer in 5s /// (as they are likely closed tabs as well). /// /// Returns the first client that responds (without waiting for others). Future<_ConnectedClient?> _collectActiveClients() async { - final candidates = connections.toList(); + final candidates = connections.keys.toList(); if (candidates.isEmpty) { return null; } @@ -269,6 +304,7 @@ class _SyncRunner { ); } + currentStreams = connections.values.flattenedToSet.toList(); sync = StreamingSyncImplementation( adapter: WebBucketStorage(database), schemaJson: client._runner!.schemaJson, @@ -283,10 +319,11 @@ class _SyncRunner { options: options, client: BrowserClient(), identifier: identifier, + activeSubscriptions: currentStreams, ); sync!.statusStream.listen((event) { _logger.fine('Broadcasting sync event: $event'); - for (final client in connections) { + for (final client in connections.keys) { client.channel.notify(SyncWorkerMessageType.notifySyncStatus, SerializedSyncStatus.from(event)); } @@ -294,9 +331,9 @@ class _SyncRunner { sync!.streamingSync(); } - void registerClient( - _ConnectedClient client, SyncOptions options, String schemaJson) { - _mainEvents.add(_AddConnection(client, options, schemaJson)); + void registerClient(_ConnectedClient client, SyncOptions options, + String schemaJson, List subscriptions) { + _mainEvents.add(_AddConnection(client, options, schemaJson, subscriptions)); } /// Remove a client, disconnecting if no clients remain.. @@ -308,6 +345,11 @@ class _SyncRunner { void disconnectClient(_ConnectedClient client) { _mainEvents.add(_DisconnectClient(client)); } + + void updateClientSubscriptions( + _ConnectedClient client, List subscriptions) { + _mainEvents.add(_ClientSubscriptionsChanged(client, subscriptions)); + } } sealed class _RunnerEvent {} @@ -316,8 +358,10 @@ final class _AddConnection implements _RunnerEvent { final _ConnectedClient client; final SyncOptions options; final String schemaJson; + final List subscriptions; - _AddConnection(this.client, this.options, this.schemaJson); + _AddConnection( + this.client, this.options, this.schemaJson, this.subscriptions); } final class _RemoveConnection implements _RunnerEvent { @@ -332,6 +376,13 @@ final class _DisconnectClient implements _RunnerEvent { _DisconnectClient(this.client); } +final class _ClientSubscriptionsChanged implements _RunnerEvent { + final _ConnectedClient client; + final List subscriptions; + + _ClientSubscriptionsChanged(this.client, this.subscriptions); +} + final class _ActiveDatabaseClosed implements _RunnerEvent { const _ActiveDatabaseClosed(); } diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index 3c64d90f..99ca78b8 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -5,10 +5,12 @@ import 'dart:js_interop'; import 'package:logging/logging.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/sync/options.dart'; +import 'package:powersync_core/src/sync/stream.dart'; import 'package:web/web.dart'; import '../connector.dart'; import '../log.dart'; +import '../sync/streaming_sync.dart'; import '../sync/sync_status.dart'; /// Names used in [SyncWorkerMessage] @@ -20,6 +22,9 @@ enum SyncWorkerMessageType { /// If parameters change, the sync worker reconnects. startSynchronization, + /// Update the active subscriptions that this client is interested in. + updateSubscriptions, + /// The [SyncWorkerMessage.payload] for the request is a numeric id, the /// response can be anything (void). /// This disconnects immediately, even if other clients are still open. @@ -74,6 +79,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required String implementationName, required String schemaJson, String? syncParamsEncoded, + UpdateSubscriptions? subscriptions, }); external String get databaseName; @@ -83,6 +89,36 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external String? get implementationName; external String get schemaJson; external String? get syncParamsEncoded; + external UpdateSubscriptions? get subscriptions; +} + +@anonymous +extension type UpdateSubscriptions._raw(JSObject _inner) implements JSObject { + external factory UpdateSubscriptions._({ + required int requestId, + required JSArray content, + }); + + factory UpdateSubscriptions(int requestId, List streams) { + return UpdateSubscriptions._( + requestId: requestId, + content: streams + .map((e) => [e.name.toJS, e.parameters.toJS].toJS) + .toList() + .toJS, + ); + } + + external int get requestId; + external JSArray get content; + + List get toDart { + return content.toDart.map((e) { + final [name, parameters] = (e as JSArray).toDart; + + return (name: name.toDart, parameters: parameters.toDart); + }).toList(); + } } @anonymous @@ -212,6 +248,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { required String? downloadError, required JSArray? priorityStatusEntries, required JSArray? syncProgress, + required JSString streamSubscriptions, }); factory SerializedSyncStatus.from(SyncStatus status) { @@ -237,6 +274,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { var other => SerializedBucketProgress.serialize( InternalSyncDownloadProgress.ofPublic(other).buckets), }, + streamSubscriptions: json.encode(status.internalSubscriptions).toJS, ); } @@ -250,8 +288,11 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { external String? downloadError; external JSArray? priorityStatusEntries; external JSArray? syncProgress; + external JSString? streamSubscriptions; SyncStatus asSyncStatus() { + final streamSubscriptions = this.streamSubscriptions?.toDart; + return SyncStatus( connected: connected, connecting: connecting, @@ -271,7 +312,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { final syncedMillis = (rawSynced as JSNumber?)?.toDartInt; return ( - priority: BucketPriority((rawPriority as JSNumber).toDartInt), + priority: StreamPriority((rawPriority as JSNumber).toDartInt), lastSyncedAt: syncedMillis != null ? DateTime.fromMicrosecondsSinceEpoch(syncedMillis) : null, @@ -285,6 +326,13 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { SerializedBucketProgress.deserialize(serializedProgress)) .asSyncDownloadProgress, }, + streamSubscriptions: switch (streamSubscriptions) { + null => null, + final serialized => (json.decode(serialized) as List) + .map((e) => CoreActiveStreamSubscription.fromJson( + e as Map)) + .toList(), + }, ); } } @@ -339,6 +387,8 @@ final class WorkerCommunicationChannel { return; case SyncWorkerMessageType.startSynchronization: requestId = (message.payload as StartSynchronization).requestId; + case SyncWorkerMessageType.updateSubscriptions: + requestId = (message.payload as UpdateSubscriptions).requestId; case SyncWorkerMessageType.requestEndpoint: case SyncWorkerMessageType.abortSynchronization: case SyncWorkerMessageType.credentialsCallback: @@ -413,7 +463,11 @@ final class WorkerCommunicationChannel { } Future startSynchronization( - String databaseName, ResolvedSyncOptions options, Schema schema) async { + String databaseName, + ResolvedSyncOptions options, + Schema schema, + List streams, + ) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.startSynchronization.name, @@ -428,11 +482,22 @@ final class WorkerCommunicationChannel { null => null, final params => jsonEncode(params), }, + subscriptions: UpdateSubscriptions(-1, streams), ), )); await completion; } + Future updateSubscriptions(List streams) async { + final (id, completion) = _newRequest(); + port.postMessage(SyncWorkerMessage( + type: SyncWorkerMessageType.updateSubscriptions.name, + payload: UpdateSubscriptions(id, streams), + )); + + await completion; + } + Future abortSynchronization() async { await _numericRequest(SyncWorkerMessageType.abortSynchronization); } diff --git a/packages/powersync_core/test/bucket_storage_test.dart b/packages/powersync_core/test/sync/bucket_storage_test.dart similarity index 99% rename from packages/powersync_core/test/bucket_storage_test.dart rename to packages/powersync_core/test/sync/bucket_storage_test.dart index 94338791..496e5a49 100644 --- a/packages/powersync_core/test/bucket_storage_test.dart +++ b/packages/powersync_core/test/sync/bucket_storage_test.dart @@ -4,8 +4,9 @@ import 'package:powersync_core/src/sync/protocol.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:test/test.dart'; -import 'utils/abstract_test_utils.dart'; -import 'utils/test_utils_impl.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; final testUtils = TestUtils(); @@ -39,11 +40,6 @@ const removeAsset1_4 = OplogEntry( const removeAsset1_5 = OplogEntry( opId: '5', op: OpType.remove, rowType: 'assets', rowId: 'O1', checksum: 5); -BucketChecksum checksum( - {required String bucket, required int checksum, int priority = 1}) { - return BucketChecksum(bucket: bucket, priority: priority, checksum: checksum); -} - SyncDataBatch syncDataBatch(List data) { return SyncDataBatch(data); } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/sync/in_memory_sync_test.dart similarity index 89% rename from packages/powersync_core/test/in_memory_sync_test.dart rename to packages/powersync_core/test/sync/in_memory_sync_test.dart index 57e36b0c..e47e756b 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/sync/in_memory_sync_test.dart @@ -5,15 +5,14 @@ import 'package:async/async.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite3_common.dart'; -import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; -import 'bucket_storage_test.dart'; -import 'server/sync_server/in_memory_sync_server.dart'; -import 'utils/abstract_test_utils.dart'; -import 'utils/in_memory_http.dart'; -import 'utils/test_utils_impl.dart'; +import '../server/sync_server/in_memory_sync_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; void main() { _declareTests( @@ -53,35 +52,32 @@ void _declareTests(String name, SyncOptions options, bool bson) { late TestPowerSyncFactory factory; late CommonDatabase raw; - late PowerSyncDatabase database; + late TestDatabase database; late MockSyncService syncService; late Logger logger; - late StreamingSync syncClient; var credentialsCallbackCount = 0; Future Function(PowerSyncDatabase) uploadData = (db) async {}; - void createSyncClient({Schema? schema}) { + Future connect() async { final (client, server) = inMemoryServer(); server.mount(syncService.router.call); - final thisSyncClient = syncClient = database.connectWithMockService( - client, - TestConnector(() async { - credentialsCallbackCount++; - return PowerSyncCredentials( - endpoint: server.url.toString(), - token: 'token$credentialsCallbackCount', - expiresAt: DateTime.now(), - ); - }, uploadData: (db) => uploadData(db)), + database.httpClient = client; + await database.connect( + connector: TestConnector( + () async { + credentialsCallbackCount++; + return PowerSyncCredentials( + endpoint: server.url.toString(), + token: 'token$credentialsCallbackCount', + expiresAt: DateTime.now(), + ); + }, + uploadData: (db) => uploadData(db), + ), options: options, - customSchema: schema, ); - - addTearDown(() async { - await thisSyncClient.abort(); - }); } setUp(() async { @@ -92,7 +88,6 @@ void _declareTests(String name, SyncOptions options, bool bson) { factory = await testUtils.testFactory(); (raw, database) = await factory.openInMemoryDatabase(); await database.initialize(); - createSyncClient(); }); tearDown(() async { @@ -109,7 +104,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { } }); } - syncClient.streamingSync(); + await connect(); await syncService.waitForListener; expect(database.currentStatus.lastSyncedAt, isNull); @@ -144,7 +139,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); await expectLater( status, emits(isSyncStatus(downloading: false, hasSynced: true))); - await syncClient.abort(); + await database.disconnect(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -155,7 +150,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { // A complete sync also means that all partial syncs have completed expect( independentDb.currentStatus - .statusForPriority(BucketPriority(3)) + .statusForPriority(StreamPriority(3)) .hasSynced, isTrue); }); @@ -249,7 +244,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { database.watch('SELECT * FROM lists', throttle: Duration.zero)); await expectLater(query, emits(isEmpty)); - createSyncClient(schema: schema); + await database.updateSchema(schema); await waitForConnection(); syncService @@ -374,13 +369,13 @@ void _declareTests(String name, SyncOptions options, bool bson) { status, emitsThrough( isSyncStatus(downloading: true, hasSynced: false).having( - (e) => e.statusForPriority(BucketPriority(0)).hasSynced, + (e) => e.statusForPriority(StreamPriority(0)).hasSynced, 'status for $prio', isTrue, )), ); - await database.waitForFirstSync(priority: BucketPriority(prio)); + await database.waitForFirstSync(priority: StreamPriority(prio)); expect(await database.getAll('SELECT * FROM customers'), hasLength(prio + 1)); } @@ -417,9 +412,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { 'priority': 1, } }); - await database.waitForFirstSync(priority: BucketPriority(1)); + await database.waitForFirstSync(priority: StreamPriority(1)); expect(database.currentStatus.hasSynced, isFalse); - await syncClient.abort(); + await database.disconnect(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -428,12 +423,12 @@ void _declareTests(String name, SyncOptions options, bool bson) { // Completing a sync for prio 1 implies a completed sync for prio 0 expect( independentDb.currentStatus - .statusForPriority(BucketPriority(0)) + .statusForPriority(StreamPriority(0)) .hasSynced, isTrue); expect( independentDb.currentStatus - .statusForPriority(BucketPriority(3)) + .statusForPriority(StreamPriority(3)) .hasSynced, isFalse); }); @@ -681,10 +676,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectProgress(status, total: progress(5, 10)); // Emulate the app closing - create a new independent sync client. - await syncClient.abort(); + await database.disconnect(); syncService.endCurrentListener(); - createSyncClient(); status = await waitForConnection(); // Send same checkpoint again @@ -715,10 +709,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectProgress(status, total: progress(5, 10)); // Emulate the app closing - create a new independent sync client. - await syncClient.abort(); + await database.disconnect(); syncService.endCurrentListener(); - createSyncClient(); status = await waitForConnection(); // Send checkpoint with additional data @@ -749,9 +742,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { // A sync rule deploy could reset buckets, making the new bucket smaller // than the existing one. - await syncClient.abort(); + await database.disconnect(); syncService.endCurrentListener(); - createSyncClient(); + status = await waitForConnection(); syncService.addLine({ 'checkpoint': Checkpoint( @@ -770,8 +763,8 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectProgress( status, priorities: { - BucketPriority(0): prio0, - BucketPriority(2): prio2, + StreamPriority(0): prio0, + StreamPriority(2): prio2, }, total: prio2, ); @@ -835,7 +828,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); await expectLater(status, emits(isSyncStatus(downloading: true))); - await syncClient.abort(); + await database.disconnect(); expect(syncService.controller.hasListener, isFalse); }); @@ -854,9 +847,6 @@ void _declareTests(String name, SyncOptions options, bool bson) { syncService.addLine({ 'checkpoint_complete': {'last_op_id': '10'} }); - - await pumpEventQueue(); - expect(syncService.controller.hasListener, isFalse); syncService.endCurrentListener(); // Should reconnect after delay. @@ -876,9 +866,6 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectLater(status, emits(isSyncStatus(downloading: true))); syncService.addKeepAlive(0); - - await pumpEventQueue(); - expect(syncService.controller.hasListener, isFalse); syncService.endCurrentListener(); // Should reconnect after delay. @@ -939,51 +926,3 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); }); } - -TypeMatcher isSyncStatus({ - Object? downloading, - Object? connected, - Object? connecting, - Object? hasSynced, - Object? downloadProgress, -}) { - var matcher = isA(); - if (downloading != null) { - matcher = matcher.having((e) => e.downloading, 'downloading', downloading); - } - if (connected != null) { - matcher = matcher.having((e) => e.connected, 'connected', connected); - } - if (connecting != null) { - matcher = matcher.having((e) => e.connecting, 'connecting', connecting); - } - if (hasSynced != null) { - matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); - } - if (downloadProgress != null) { - matcher = matcher.having( - (e) => e.downloadProgress, 'downloadProgress', downloadProgress); - } - - return matcher; -} - -TypeMatcher isSyncDownloadProgress({ - required Object progress, - Map priorities = const {}, -}) { - var matcher = - isA().having((e) => e, 'untilCompletion', progress); - priorities.forEach((priority, expected) { - matcher = matcher.having( - (e) => e.untilPriority(priority), 'untilPriority($priority)', expected); - }); - - return matcher; -} - -TypeMatcher progress(int completed, int total) { - return isA() - .having((e) => e.downloadedOperations, 'completed', completed) - .having((e) => e.totalOperations, 'total', total); -} diff --git a/packages/powersync_core/test/sync/stream_test.dart b/packages/powersync_core/test/sync/stream_test.dart new file mode 100644 index 00000000..0d6d8f59 --- /dev/null +++ b/packages/powersync_core/test/sync/stream_test.dart @@ -0,0 +1,222 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:async/async.dart'; +import 'package:logging/logging.dart'; +import 'package:powersync_core/powersync_core.dart'; + +import 'package:test/test.dart'; + +import '../server/sync_server/in_memory_sync_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; + +void main() { + late final testUtils = TestUtils(); + + late TestPowerSyncFactory factory; + + late TestDatabase database; + late MockSyncService syncService; + late Logger logger; + late SyncOptions options; + + var credentialsCallbackCount = 0; + + Future connect() async { + final (client, server) = inMemoryServer(); + server.mount(syncService.router.call); + + database.httpClient = client; + await database.connect( + connector: TestConnector( + () async { + credentialsCallbackCount++; + return PowerSyncCredentials( + endpoint: server.url.toString(), + token: 'token$credentialsCallbackCount', + expiresAt: DateTime.now(), + ); + }, + uploadData: (db) async {}, + ), + options: options, + ); + } + + setUp(() async { + options = SyncOptions(syncImplementation: SyncClientImplementation.rust); + logger = Logger.detached('powersync.active')..level = Level.ALL; + credentialsCallbackCount = 0; + syncService = MockSyncService(); + + factory = await testUtils.testFactory(); + (_, database) = await factory.openInMemoryDatabase(); + await database.initialize(); + }); + + tearDown(() async { + await database.close(); + await syncService.stop(); + }); + + Future> waitForConnection( + {bool expectNoWarnings = true}) async { + if (expectNoWarnings) { + logger.onRecord.listen((e) { + if (e.level >= Level.WARNING) { + fail('Unexpected log: $e, ${e.stackTrace}'); + } + }); + } + await connect(); + await syncService.waitForListener; + + expect(database.currentStatus.lastSyncedAt, isNull); + expect(database.currentStatus.downloading, isFalse); + final status = StreamQueue(database.statusStream); + addTearDown(status.cancel); + + syncService.addKeepAlive(); + await expectLater( + status, emitsThrough(isSyncStatus(connected: true, hasSynced: false))); + return status; + } + + test('can disable default streams', () async { + options = SyncOptions( + syncImplementation: SyncClientImplementation.rust, + includeDefaultStreams: false, + ); + + await waitForConnection(); + final request = await syncService.waitForListener; + expect(json.decode(await request.readAsString()), + containsPair('streams', containsPair('include_defaults', false))); + }); + + test('subscribes with streams', () async { + final a = await database.syncStream('stream', {'foo': 'a'}).subscribe(); + final b = await database.syncStream('stream', {'foo': 'b'}).subscribe( + priority: StreamPriority(1)); + + final statusStream = await waitForConnection(); + final request = await syncService.waitForListener; + expect( + json.decode(await request.readAsString()), + containsPair( + 'streams', + containsPair('subscriptions', [ + { + 'stream': 'stream', + 'parameters': {'foo': 'a'}, + 'override_priority': null, + }, + { + 'stream': 'stream', + 'parameters': {'foo': 'b'}, + 'override_priority': 1, + }, + ]), + ), + ); + + syncService.addLine( + checkpoint( + lastOpId: 0, + buckets: [ + bucketDescription('a', subscriptions: [ + {'sub': 0} + ]), + bucketDescription('b', priority: 1, subscriptions: [ + {'sub': 1} + ]) + ], + streams: [ + stream('stream', false), + ], + ), + ); + + var status = await statusStream.next; + for (final subscription in [a, b]) { + expect(status.statusFor(subscription)!.subscription.active, true); + expect(status.statusFor(subscription)!.subscription.lastSyncedAt, isNull); + expect( + status.statusFor(subscription)!.subscription.hasExplicitSubscription, + true, + ); + } + + syncService.addLine(checkpointComplete(priority: 1)); + status = await statusStream.next; + expect(status.statusFor(a)!.subscription.lastSyncedAt, isNull); + expect(status.statusFor(b)!.subscription.lastSyncedAt, isNotNull); + await b.waitForFirstSync(); + + syncService.addLine(checkpointComplete()); + await a.waitForFirstSync(); + }); + + test('reports default streams', () async { + final status = await waitForConnection(); + syncService.addLine( + checkpoint(lastOpId: 0, streams: [stream('default_stream', true)]), + ); + + await expectLater( + status, + emits( + isSyncStatus( + activeSubscriptions: [ + isStreamStatus( + subscription: isSyncSubscription( + name: 'default_stream', + parameters: null, + ), + isDefault: true, + ), + ], + ), + ), + ); + }); + + test('changes subscriptions dynamically', () async { + await waitForConnection(); + syncService.addKeepAlive(); + + final subscription = await database.syncStream('a').subscribe(); + syncService.endCurrentListener(); + final request = await syncService.waitForListener; + expect( + json.decode(await request.readAsString()), + containsPair( + 'streams', + containsPair('subscriptions', [ + { + 'stream': 'a', + 'parameters': null, + 'override_priority': null, + }, + ]), + ), + ); + + // Given that the subscription has a TTL, dropping the handle should not + // re-subscribe. + await subscription.unsubscribe(); + await pumpEventQueue(); + expect(syncService.controller.hasListener, isTrue); + }); + + test('subscriptions update while offline', () async { + final stream = StreamQueue(database.statusStream); + + final subscription = await database.syncStream('foo').subscribe(); + var status = await stream.next; + expect(status.statusFor(subscription), isNotNull); + }); +} diff --git a/packages/powersync_core/test/streaming_sync_test.dart b/packages/powersync_core/test/sync/streaming_sync_test.dart similarity index 97% rename from packages/powersync_core/test/streaming_sync_test.dart rename to packages/powersync_core/test/sync/streaming_sync_test.dart index 40becd16..5017993f 100644 --- a/packages/powersync_core/test/streaming_sync_test.dart +++ b/packages/powersync_core/test/sync/streaming_sync_test.dart @@ -9,10 +9,10 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; -import 'server/sync_server/in_memory_sync_server.dart'; -import 'test_server.dart'; -import 'utils/abstract_test_utils.dart'; -import 'utils/test_utils_impl.dart'; +import '../server/sync_server/in_memory_sync_server.dart'; +import '../test_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); diff --git a/packages/powersync_core/test/sync_types_test.dart b/packages/powersync_core/test/sync/sync_types_test.dart similarity index 100% rename from packages/powersync_core/test/sync_types_test.dart rename to packages/powersync_core/test/sync/sync_types_test.dart diff --git a/packages/powersync_core/test/sync/utils.dart b/packages/powersync_core/test/sync/utils.dart new file mode 100644 index 00000000..0a619514 --- /dev/null +++ b/packages/powersync_core/test/sync/utils.dart @@ -0,0 +1,134 @@ +import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; +import 'package:test/test.dart'; + +TypeMatcher isSyncStatus({ + Object? downloading, + Object? connected, + Object? connecting, + Object? hasSynced, + Object? downloadProgress, + Object? activeSubscriptions, +}) { + var matcher = isA(); + if (downloading != null) { + matcher = matcher.having((e) => e.downloading, 'downloading', downloading); + } + if (connected != null) { + matcher = matcher.having((e) => e.connected, 'connected', connected); + } + if (connecting != null) { + matcher = matcher.having((e) => e.connecting, 'connecting', connecting); + } + if (hasSynced != null) { + matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); + } + if (downloadProgress != null) { + matcher = matcher.having( + (e) => e.downloadProgress, 'downloadProgress', downloadProgress); + } + if (activeSubscriptions != null) { + matcher = matcher.having((e) => e.activeSubscriptions, + 'activeSubscriptions', activeSubscriptions); + } + + return matcher; +} + +TypeMatcher isSyncDownloadProgress({ + required Object progress, + Map priorities = const {}, +}) { + var matcher = + isA().having((e) => e, 'untilCompletion', progress); + priorities.forEach((priority, expected) { + matcher = matcher.having( + (e) => e.untilPriority(priority), 'untilPriority($priority)', expected); + }); + + return matcher; +} + +TypeMatcher progress(int completed, int total) { + return isA() + .having((e) => e.downloadedOperations, 'completed', completed) + .having((e) => e.totalOperations, 'total', total); +} + +TypeMatcher isStreamStatus({ + required Object? subscription, + Object? progress, + Object? isDefault, +}) { + var matcher = isA() + .having((e) => e.subscription, 'subscription', subscription); + if (progress case final progress?) { + matcher = matcher.having((e) => e.progress, 'progress', progress); + } + if (isDefault case final isDefault?) { + matcher = matcher.having((e) => e.isDefault, 'isDefault', isDefault); + } + + return matcher; +} + +TypeMatcher isSyncSubscription({ + required Object name, + required Object? parameters, +}) { + return isA() + .having((e) => e.name, 'name', name) + .having((e) => e.parameters, 'parameters', parameters); +} + +BucketChecksum checksum( + {required String bucket, required int checksum, int priority = 1}) { + return BucketChecksum(bucket: bucket, priority: priority, checksum: checksum); +} + +/// Creates a `checkpoint` line. +Object checkpoint({ + required int lastOpId, + List buckets = const [], + String? writeCheckpoint, + List streams = const [], +}) { + return { + 'checkpoint': { + 'last_op_id': '$lastOpId', + 'write_checkpoint': null, + 'buckets': buckets, + 'streams': streams, + } + }; +} + +Object stream(String name, bool isDefault, {List errors = const []}) { + return {'name': name, 'is_default': isDefault, 'errors': errors}; +} + +/// Creates a `checkpoint_complete` or `partial_checkpoint_complete` line. +Object checkpointComplete({int? priority, String lastOpId = '1'}) { + return { + priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': { + 'last_op_id': lastOpId, + if (priority != null) 'priority': priority, + }, + }; +} + +Object bucketDescription( + String name, { + int checksum = 0, + int priority = 3, + int count = 1, + Object? subscriptions, +}) { + return { + 'bucket': name, + 'checksum': checksum, + 'priority': priority, + 'count': count, + if (subscriptions != null) 'subscriptions': subscriptions, + }; +} diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 2b456429..1a5e744b 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,8 +1,11 @@ +import 'dart:async'; import 'dart:convert'; import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/src/abort_controller.dart'; +import 'package:powersync_core/src/database/powersync_db_mixin.dart'; import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/sync/internal_connector.dart'; import 'package:powersync_core/src/sync/options.dart'; @@ -63,20 +66,20 @@ Logger _makeTestLogger({Level level = Level.ALL, String? name}) { abstract mixin class TestPowerSyncFactory implements PowerSyncOpenFactory { Future openRawInMemoryDatabase(); - Future<(CommonDatabase, PowerSyncDatabase)> openInMemoryDatabase() async { + Future<(CommonDatabase, TestDatabase)> openInMemoryDatabase() async { final raw = await openRawInMemoryDatabase(); return (raw, wrapRaw(raw)); } - PowerSyncDatabase wrapRaw( + TestDatabase wrapRaw( CommonDatabase raw, { Logger? logger, }) { - return PowerSyncDatabase.withDatabase( - schema: schema, + return TestDatabase( database: SqliteDatabase.singleConnection( SqliteConnection.synchronousWrapper(raw)), - logger: logger, + logger: logger ?? Logger.detached('PowerSync.test'), + schema: schema, ); } } @@ -147,6 +150,78 @@ class TestConnector extends PowerSyncBackendConnector { } } +final class TestDatabase + with SqliteQueries, PowerSyncDatabaseMixin + implements PowerSyncDatabase { + @override + final SqliteDatabase database; + @override + final Logger logger; + @override + Schema schema; + + @override + late final Future isInitialized; + + Client? httpClient; + + TestDatabase({ + required this.database, + required this.logger, + required this.schema, + }) { + isInitialized = baseInit(); + } + + @override + Future connectInternal({ + required PowerSyncBackendConnector connector, + required ResolvedSyncOptions options, + required List initiallyActiveStreams, + required Stream> activeStreams, + required AbortController abort, + required Zone asyncWorkZone, + }) async { + final impl = StreamingSyncImplementation( + adapter: BucketStorage(this), + schemaJson: jsonEncode(schema), + client: httpClient!, + options: options, + connector: InternalConnector.wrap(connector, this), + logger: logger, + crudUpdateTriggerStream: database + .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), + activeSubscriptions: initiallyActiveStreams, + ); + impl.statusStream.listen(setStatus); + + asyncWorkZone.run(impl.streamingSync); + final subscriptions = activeStreams.listen(impl.updateSubscriptions); + + abort.onAbort.then((_) async { + subscriptions.cancel(); + await impl.abort(); + abort.completeAbort(); + }).ignore(); + } + + @override + Future readLock(Future Function(SqliteReadContext tx) callback, + {String? debugContext, Duration? lockTimeout}) async { + await isInitialized; + return database.readLock(callback, + debugContext: debugContext, lockTimeout: lockTimeout); + } + + @override + Future writeLock(Future Function(SqliteWriteContext tx) callback, + {String? debugContext, Duration? lockTimeout}) async { + await isInitialized; + return database.writeLock(callback, + debugContext: debugContext, lockTimeout: lockTimeout); + } +} + extension MockSync on PowerSyncDatabase { StreamingSyncImplementation connectWithMockService( Client client,