Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Packages with breaking changes:

Packages with other changes:

- [`sqlite_async` - `v0.12.1`](#sqlite_async---v0121)
- [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120)
- [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231)

Expand All @@ -26,6 +27,10 @@ Packages with dependency updates only:

---

#### `sqlite_async` - `v0.12.1`

- Fix distributing updates from shared worker.

#### `sqlite_async` - `v0.12.0`

- Avoid large transactions creating a large internal update queue.
Expand Down
4 changes: 4 additions & 0 deletions packages/sqlite_async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.12.1

- Fix distributing updates from shared worker.

## 0.12.0

- Avoid large transactions creating a large internal update queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
final server = params.portServer;
final commandPort = ReceivePort();

db.throttledUpdatedTables.listen((changedTables) {
db.updatedTables.listen((changedTables) {
client.fire(UpdateNotification(changedTables));
});

Expand Down
121 changes: 69 additions & 52 deletions packages/sqlite_async/lib/src/utils/shared_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -79,68 +79,85 @@ List<Object?> mapParameters(List<Object?> parameters) {
}

extension ThrottledUpdates on CommonDatabase {
/// Wraps [updatesSync] to:
/// An unthrottled stream of updated tables that emits on every commit.
///
/// - Not fire in transactions.
/// - Fire asynchronously.
/// - Only report table names, which are buffered to avoid duplicates.
Stream<Set<String>> get throttledUpdatedTables {
StreamController<Set<String>>? controller;
var pendingUpdates = <String>{};
var paused = false;

Timer? updateDebouncer;

void maybeFireUpdates() {
updateDebouncer?.cancel();
updateDebouncer = null;

if (paused) {
// Continue collecting updates, but don't fire any
return;
/// A paused subscription on this stream will buffer changed tables into a
/// growing set instead of losing events, so this stream is simple to throttle
/// downstream.
Stream<Set<String>> get updatedTables {
final listeners = <_UpdateListener>[];
var uncommitedUpdates = <String>{};
var underlyingSubscriptions = <StreamSubscription<void>>[];

void handleUpdate(SqliteUpdate update) {
uncommitedUpdates.add(update.tableName);
}

void afterCommit() {
for (final listener in listeners) {
listener.notify(uncommitedUpdates);
}

if (!autocommit) {
// Inside a transaction - do not fire updates
return;
uncommitedUpdates.clear();
}

void afterRollback() {
uncommitedUpdates.clear();
}

void addListener(_UpdateListener listener) {
listeners.add(listener);

if (listeners.length == 1) {
// First listener, start listening for raw updates on underlying
// database.
underlyingSubscriptions = [
updatesSync.listen(handleUpdate),
commits.listen((_) => afterCommit()),
commits.listen((_) => afterRollback())
];
}
}

if (pendingUpdates.isNotEmpty) {
controller!.add(pendingUpdates);
pendingUpdates = {};
void removeListener(_UpdateListener listener) {
listeners.remove(listener);
if (listeners.isEmpty) {
for (final sub in underlyingSubscriptions) {
sub.cancel();
}
}
}

void collectUpdate(SqliteUpdate event) {
pendingUpdates.add(event.tableName);
return Stream.multi(
(listener) {
final wrapped = _UpdateListener(listener);
addListener(wrapped);

updateDebouncer ??=
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
listener.onResume = wrapped.addPending;
listener.onCancel = () => removeListener(wrapped);
},
isBroadcast: true,
);
}
}

class _UpdateListener {
final MultiStreamController<Set<String>> downstream;
Set<String> buffered = {};

_UpdateListener(this.downstream);

void notify(Set<String> pendingUpdates) {
buffered.addAll(pendingUpdates);
if (!downstream.isPaused) {
addPending();
}
}

StreamSubscription? txSubscription;
StreamSubscription? sourceSubscription;

controller = StreamController(onListen: () {
txSubscription = commits.listen((_) {
maybeFireUpdates();
}, onError: (error) {
controller?.addError(error);
});

sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) {
controller?.addError(error);
});
}, onPause: () {
paused = true;
}, onResume: () {
paused = false;
maybeFireUpdates();
}, onCancel: () {
txSubscription?.cancel();
sourceSubscription?.cancel();
});

return controller.stream;
void addPending() {
if (buffered.isNotEmpty) {
downstream.add(buffered);
buffered = {};
}
}
}
9 changes: 5 additions & 4 deletions packages/sqlite_async/lib/src/web/worker/worker_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class AsyncSqliteDatabase extends WorkerDatabase {
final Map<ClientConnection, _ConnectionState> _state = {};

AsyncSqliteDatabase({required this.database})
: _updates = database.throttledUpdatedTables;
: _updates = database.updatedTables;

_ConnectionState _findState(ClientConnection connection) {
return _state.putIfAbsent(connection, _ConnectionState.new);
Expand Down Expand Up @@ -144,12 +144,13 @@ class AsyncSqliteDatabase extends WorkerDatabase {
state.unsubscribeUpdates();
_registerCloseListener(state, connection);

state.updatesNotification = _updates.listen((tables) {
connection.customRequest(CustomDatabaseMessage(
late StreamSubscription<void> subscription;
subscription = state.updatesNotification = _updates.listen((tables) {
subscription.pause(connection.customRequest(CustomDatabaseMessage(
CustomDatabaseMessageKind.notifyUpdates,
id,
tables.toList(),
));
)));
});
} else {
state.unsubscribeUpdates();
Expand Down
2 changes: 1 addition & 1 deletion packages/sqlite_async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: sqlite_async
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
version: 0.12.0
version: 0.12.1
repository: https://github.com/powersync-ja/sqlite_async.dart
environment:
sdk: ">=3.5.0 <4.0.0"
Expand Down
46 changes: 46 additions & 0 deletions packages/sqlite_async/test/native/watch_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'dart:math';

import 'package:sqlite3/common.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/utils/shared_utils.dart';
import 'package:test/test.dart';

import '../utils/test_utils_impl.dart';
Expand All @@ -31,6 +32,51 @@ void main() {
return db;
});

test('raw update notifications', () async {
final factory = await testUtils.testFactory(path: path);
final db = factory
.openDB(SqliteOpenOptions(primaryConnection: true, readOnly: false));

db.execute('CREATE TABLE a (bar INTEGER);');
db.execute('CREATE TABLE b (bar INTEGER);');
final events = <Set<String>>[];
final subscription = db.updatedTables.listen(events.add);

db.execute('insert into a default values');
expect(events, isEmpty); // should be async
await pumpEventQueue();
expect(events.removeLast(), {'a'});

db.execute('begin');
db.execute('insert into a default values');
db.execute('insert into b default values');
await pumpEventQueue();
expect(events, isEmpty); // should only trigger on commit
db.execute('commit');

await pumpEventQueue();
expect(events.removeLast(), {'a', 'b'});

db.execute('begin');
db.execute('insert into a default values');
db.execute('rollback');
expect(events, isEmpty);
await pumpEventQueue();
expect(events, isEmpty); // should ignore cancelled transactions

// Should still listen during pause, and dispatch on resume
subscription.pause();
db.execute('insert into a default values');
await pumpEventQueue();
expect(events, isEmpty);

subscription.resume();
await pumpEventQueue();
expect(events.removeLast(), {'a'});

subscription.pause();
});

test('watch in isolate', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
Expand Down