Skip to content

Commit 3468b38

Browse files
committed
Add nextCrudTransactions() stream
1 parent 57c0c52 commit 3468b38

File tree

2 files changed

+106
-55
lines changed

2 files changed

+106
-55
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 73 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22

3+
import 'package:async/async.dart';
34
import 'package:logging/logging.dart';
45
import 'package:meta/meta.dart';
56
import 'package:powersync_core/sqlite3_common.dart';
@@ -508,23 +509,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
508509
}
509510
final last = all[all.length - 1];
510511
return CrudBatch(
511-
crud: all,
512-
haveMore: haveMore,
513-
complete: ({String? writeCheckpoint}) async {
514-
await writeTransaction((db) async {
515-
await db
516-
.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
517-
if (writeCheckpoint != null &&
518-
await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') == null) {
519-
await db.execute(
520-
'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'',
521-
[writeCheckpoint]);
522-
} else {
523-
await db.execute(
524-
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
525-
}
526-
});
527-
});
512+
crud: all,
513+
haveMore: haveMore,
514+
complete: _crudCompletionCallback(last.clientId),
515+
);
528516
}
529517

530518
/// Get the next recorded transaction to upload.
@@ -538,46 +526,76 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
538526
///
539527
/// Unlike [getCrudBatch], this only returns data from a single transaction at a time.
540528
/// All data for the transaction is loaded into memory.
541-
Future<CrudTransaction?> getNextCrudTransaction() async {
542-
return await readTransaction((tx) async {
543-
final first = await tx.getOptional(
544-
'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
545-
if (first == null) {
546-
return null;
547-
}
548-
final txId = first['tx_id'] as int?;
549-
List<CrudEntry> all;
550-
if (txId == null) {
551-
all = [CrudEntry.fromRow(first)];
552-
} else {
553-
final rows = await tx.getAll(
554-
'SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC',
555-
[txId]);
556-
all = [for (var row in rows) CrudEntry.fromRow(row)];
529+
Future<CrudTransaction?> getNextCrudTransaction() {
530+
return nextCrudTransactions().firstOrNull;
531+
}
532+
533+
/// Returns a stream of completed transactions with local writes against the
534+
/// database.
535+
///
536+
/// This is typically used from the [PowerSyncBackendConnector.uploadData]
537+
/// method. Each entry emitted by the stream is a full transaction containing
538+
/// all local writes made while that transaction was active.
539+
///
540+
/// Unlike [getNextCrudTransaction], which awalys returns the oldest
541+
/// transaction that hasn't been [CrudTransaction.complete]d yet, this stream
542+
/// can be used to receive multiple transactions. Calling
543+
/// [CrudTransaction.complete] will mark _all_ transactions emitted by the
544+
/// stream until that point as completed.
545+
///
546+
/// This can be used to upload multiple transactions in a single batch, e.g.
547+
/// with:AbortController
548+
///
549+
/// If there is no local data to upload, the stream emits a single `onDone`
550+
/// event.
551+
Stream<CrudTransaction> nextCrudTransactions() async* {
552+
var lastCrudItemId = -1;
553+
const sql = '''
554+
WITH RECURSIVE crud_entries AS (
555+
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
556+
UNION ALL
557+
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
558+
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
559+
WHERE crud_entries.tx_id = ps_crud.tx_id
560+
)
561+
SELECT * FROM crud_entries;
562+
''';
563+
564+
while (true) {
565+
final nextTransaction = await getAll(sql, [lastCrudItemId]);
566+
if (nextTransaction.isEmpty) {
567+
break;
557568
}
558569

559-
final last = all[all.length - 1];
560-
561-
return CrudTransaction(
562-
transactionId: txId,
563-
crud: all,
564-
complete: ({String? writeCheckpoint}) async {
565-
await writeTransaction((db) async {
566-
await db.execute(
567-
'DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
568-
if (writeCheckpoint != null &&
569-
await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') ==
570-
null) {
571-
await db.execute(
572-
'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'',
573-
[writeCheckpoint]);
574-
} else {
575-
await db.execute(
576-
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
577-
}
578-
});
579-
});
580-
});
570+
final items = [for (var row in nextTransaction) CrudEntry.fromRow(row)];
571+
final last = items.last;
572+
final txId = last.transactionId;
573+
574+
yield CrudTransaction(
575+
crud: items,
576+
complete: _crudCompletionCallback(last.clientId),
577+
transactionId: txId,
578+
);
579+
lastCrudItemId = last.clientId;
580+
}
581+
}
582+
583+
Future<void> Function({String? writeCheckpoint}) _crudCompletionCallback(
584+
int lastClientId) {
585+
return ({String? writeCheckpoint}) async {
586+
await writeTransaction((db) async {
587+
await db.execute('DELETE FROM ps_crud WHERE id <= ?', [lastClientId]);
588+
if (writeCheckpoint != null &&
589+
await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') == null) {
590+
await db.execute(
591+
'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'',
592+
[writeCheckpoint]);
593+
} else {
594+
await db.execute(
595+
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
596+
}
597+
});
598+
};
581599
}
582600

583601
/// Takes a read lock, without starting a transaction.

packages/powersync_core/test/crud_test.dart

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,39 @@ void main() {
271271
expect(await powersync.getNextCrudTransaction(), equals(null));
272272
});
273273

274+
test('nextCrudTransactions', () async {
275+
Future<void> createTransaction(int size) {
276+
return powersync.writeTransaction((tx) async {
277+
for (var i = 0; i < size; i++) {
278+
await tx.execute('INSERT INTO assets (id) VALUES (uuid())');
279+
}
280+
});
281+
}
282+
283+
await expectLater(powersync.nextCrudTransactions(), emitsDone);
284+
285+
await createTransaction(5);
286+
await createTransaction(10);
287+
await createTransaction(15);
288+
289+
CrudTransaction? lastTransaction;
290+
final batch = <CrudEntry>[];
291+
await for (final transaction in powersync.nextCrudTransactions()) {
292+
batch.addAll(transaction.crud);
293+
lastTransaction = transaction;
294+
295+
if (batch.length > 10) {
296+
break;
297+
}
298+
}
299+
300+
expect(batch, hasLength(15));
301+
await lastTransaction!.complete();
302+
303+
final remainingTransaction = await powersync.getNextCrudTransaction();
304+
expect(remainingTransaction?.crud, hasLength(15));
305+
});
306+
274307
test('include metadata', () async {
275308
await powersync.updateSchema(Schema([
276309
Table(

0 commit comments

Comments
 (0)