From 7e5758d2d88bcec9a0f53624fdd9cc3714c27376 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Sat, 1 Mar 2025 14:22:18 +0200 Subject: [PATCH 01/20] improve watches --- .gitignore | 1 + .../kotlin/com/powersync/PsSqlDriver.kt | 30 +++++---- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 2 +- .../powersync/db/internal/InternalDatabase.kt | 2 +- .../db/internal/InternalDatabaseImpl.kt | 67 ++----------------- .../com/powersync/utils/AtomicMutableSet.kt | 28 ++++++++ .../androidApp/src/main/res/xml/network.xml | 4 ++ 7 files changed, 57 insertions(+), 77 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt create mode 100644 demos/supabase-todolist/androidApp/src/main/res/xml/network.xml diff --git a/.gitignore b/.gitignore index f73044b6..575429a5 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ captures Pods/ dialect/bin .build +.vscode diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index a1ec0bfb..8f4edf77 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -1,6 +1,7 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver +import com.powersync.utils.AtomicMutableSet import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow @@ -14,33 +15,34 @@ internal class PsSqlDriver( private val scope: CoroutineScope, ) : SqlDriver by driver { // MutableSharedFlow to emit batched table updates - private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) + private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) // In-memory buffer to store table names before flushing - private val pendingUpdates = mutableSetOf() + private val pendingUpdates = AtomicMutableSet() fun updateTable(tableName: String) { - pendingUpdates.add(tableName) + scope.launch { + pendingUpdates.add(tableName) + } } fun clearTableUpdates() { - pendingUpdates.clear() + scope.launch { + pendingUpdates.clear() + } } - // Flows on table updates - fun tableUpdates(): Flow> = tableUpdatesFlow.asSharedFlow() + // Flows on table updates containing tables + fun updatesOnTables(tableNames: Set): Flow = tableUpdatesFlow.asSharedFlow().filter { it.intersect(tableNames).isNotEmpty() }.map { } - // Flows on table updates containing a specific table - fun updatesOnTable(tableName: String): Flow = tableUpdates().filter { it.contains(tableName) }.map { } + suspend fun fireTableUpdates() { + val updates = pendingUpdates.toSet() + pendingUpdates.clear() - fun fireTableUpdates() { - val updates = pendingUpdates.toList() if (updates.isEmpty()) { return } - scope.launch { - tableUpdatesFlow.emit(updates) - } - pendingUpdates.clear() + + tableUpdatesFlow.emit(updates) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 2a3abfdb..a9b93ade 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -141,7 +141,7 @@ internal class PowerSyncDatabaseImpl( uploadJob = scope.launch { - internalDb.updatesOnTable(InternalTable.CRUD.toString()).debounce(crudThrottleMs).collect { + internalDb.updatesOnTables(setOf(InternalTable.CRUD.toString())).debounce(crudThrottleMs).collect { syncStream!!.triggerCrudUpload() } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 54a9cd16..72cf34b1 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -14,5 +14,5 @@ internal interface InternalDatabase : fun getExistingTableNames(tableGlob: String): List - fun updatesOnTable(tableName: String): Flow + fun updatesOnTables(tableNames: Set): Flow } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 91cad180..1c480931 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -20,6 +20,7 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex @@ -35,12 +36,6 @@ internal class InternalDatabaseImpl( override val transactor: PsDatabase = PsDatabase(driver) override val queries: PowersyncQueries = transactor.powersyncQueries - // Register callback for table updates - private fun tableUpdates(): Flow> = driver.tableUpdates() - - // Debounced by transaction completion - private val tableUpdatesMutex = Mutex() - // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO private val transaction = @@ -74,31 +69,6 @@ internal class InternalDatabaseImpl( const val DEFAULT_WATCH_THROTTLE_MS = 30L } - init { - scope.launch { - val accumulatedUpdates = mutableSetOf() - // Store table changes in an accumulated array which will be (debounced) emitted on transaction end - tableUpdates() - .onEach { tables -> - val dataTables = - tables - .map { toFriendlyTableName(it) } - .filter { it.isNotBlank() } - tableUpdatesMutex.withLock { - accumulatedUpdates.addAll(dataTables) - } - } - // debounce ignores events inside the throttle. Debouncing needs to be done after accumulation - .debounce(DEFAULT_WATCH_THROTTLE_MS) - .collect { _ -> - tableUpdatesMutex.withLock { - driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) - accumulatedUpdates.clear() - } - } - } - } - override suspend fun execute( sql: String, parameters: List?, @@ -196,13 +166,10 @@ internal class InternalDatabaseImpl( .map { toFriendlyTableName(it) } .filter { it.isNotBlank() } .toSet() - return watchQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper, - tables = tables, - ).asFlow().mapToList(scope.coroutineContext) + + return updatesOnTables(tables).debounce(DEFAULT_WATCH_THROTTLE_MS).map { + getAll(sql, parameters = parameters, mapper = mapper) + } } private fun createQuery( @@ -218,28 +185,6 @@ internal class InternalDatabaseImpl( } } - private fun watchQuery( - query: String, - mapper: (SqlCursor) -> T, - parameters: Int = 0, - binders: (SqlPreparedStatement.() -> Unit)? = null, - tables: Set = setOf(), - ): Query = - object : Query(wrapperMapper(mapper)) { - override fun execute(mapper: (app.cash.sqldelight.db.SqlCursor) -> QueryResult): QueryResult = - runWrapped { - driver.executeQuery(null, query, mapper, parameters, binders) - } - - override fun addListener(listener: Listener) { - driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) - } - - override fun removeListener(listener: Listener) { - driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) - } - } - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = withContext(dbContext) { transactor.transactionWithResult(noEnclosing = true) { @@ -271,7 +216,7 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTable(tableName: String): Flow = driver.updatesOnTable(tableName) + override fun updatesOnTables(tableNames: Set): Flow = driver.updatesOnTables(tableNames) private fun toFriendlyTableName(tableName: String): String { val regex = POWERSYNC_TABLE_MATCH.toRegex() diff --git a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt new file mode 100644 index 00000000..4703fdff --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt @@ -0,0 +1,28 @@ +package com.powersync.utils +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +internal class AtomicMutableSet { + private val mutex = Mutex() + private val set = mutableSetOf() + + suspend fun add(element: T): Boolean = mutex.withLock { + set.add(element) + } + + suspend fun remove(element: T): Boolean = mutex.withLock { + set.remove(element) + } + + suspend fun clear(): Unit = mutex.withLock { + set.clear() + } + + suspend fun contains(element: T): Boolean = mutex.withLock { + set.contains(element) + } + + suspend fun toSet(): Set = mutex.withLock { + set.toSet() + } +} diff --git a/demos/supabase-todolist/androidApp/src/main/res/xml/network.xml b/demos/supabase-todolist/androidApp/src/main/res/xml/network.xml new file mode 100644 index 00000000..624ed13a --- /dev/null +++ b/demos/supabase-todolist/androidApp/src/main/res/xml/network.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file From 718789f83945c4ce1994091ad0576b745625bf5f Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 11:16:20 +0200 Subject: [PATCH 02/20] wrap table updates in mutex --- .../kotlin/com/powersync/PsSqlDriver.kt | 17 ++++---- .../db/internal/InternalDatabaseImpl.kt | 30 +++++++------- .../com/powersync/utils/AtomicMutableSet.kt | 41 +++++++++++-------- .../src/androidMain/AndroidManifest.xml | 4 +- .../androidApp/src/main/res/xml/network.xml | 4 -- .../main/res/xml/network_security_config.xml | 6 +++ 6 files changed, 56 insertions(+), 46 deletions(-) delete mode 100644 demos/supabase-todolist/androidApp/src/main/res/xml/network.xml create mode 100644 demos/supabase-todolist/androidApp/src/main/res/xml/network_security_config.xml diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 8f4edf77..08b5633b 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -18,7 +18,7 @@ internal class PsSqlDriver( private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) // In-memory buffer to store table names before flushing - private val pendingUpdates = AtomicMutableSet() + private val pendingUpdates = AtomicMutableSet() fun updateTable(tableName: String) { scope.launch { @@ -33,16 +33,13 @@ internal class PsSqlDriver( } // Flows on table updates containing tables - fun updatesOnTables(tableNames: Set): Flow = tableUpdatesFlow.asSharedFlow().filter { it.intersect(tableNames).isNotEmpty() }.map { } + // The table names here are raw SQLite tables + fun updatesOnTables(tableNames: Set): Flow = + tableUpdatesFlow.asSharedFlow().filter { it.intersect(tableNames).isNotEmpty() }.map { } - suspend fun fireTableUpdates() { - val updates = pendingUpdates.toSet() - pendingUpdates.clear() - - if (updates.isEmpty()) { - return + fun fireTableUpdates() { + scope.launch { + tableUpdatesFlow.emit(pendingUpdates.toSet(true)) } - - tableUpdatesFlow.emit(updates) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 1c480931..968de3b2 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -1,9 +1,6 @@ package com.powersync.db.internal import app.cash.sqldelight.ExecutableQuery -import app.cash.sqldelight.Query -import app.cash.sqldelight.coroutines.asFlow -import app.cash.sqldelight.coroutines.mapToList import app.cash.sqldelight.db.QueryResult import app.cash.sqldelight.db.SqlPreparedStatement import com.persistence.PowersyncQueries @@ -21,10 +18,7 @@ import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString @@ -74,9 +68,11 @@ internal class InternalDatabaseImpl( parameters: List?, ): Long = withContext(dbContext) { - val r = executeSync(sql, parameters) - driver.fireTableUpdates() - r + executeSync(sql, parameters) + }.also { + withContext(dbContext) { + driver.fireTableUpdates() + } } private fun executeSync( @@ -163,13 +159,14 @@ internal class InternalDatabaseImpl( ): Flow> { val tables = getSourceTables(sql, parameters) - .map { toFriendlyTableName(it) } .filter { it.isNotBlank() } .toSet() - return updatesOnTables(tables).debounce(DEFAULT_WATCH_THROTTLE_MS).map { - getAll(sql, parameters = parameters, mapper = mapper) - } + return updatesOnTables(tables) + .debounce(DEFAULT_WATCH_THROTTLE_MS) + .map { + getAll(sql, parameters = parameters, mapper = mapper) + }.onStart { emit(getAll(sql, parameters = parameters, mapper = mapper)) } } private fun createQuery( @@ -211,8 +208,11 @@ internal class InternalDatabaseImpl( } } // Trigger watched queries - driver.fireTableUpdates() r + }.also { + withContext(dbContext) { + driver.fireTableUpdates() + } } // Register callback for table updates on a specific table diff --git a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt index 4703fdff..33a83e6e 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt @@ -2,27 +2,36 @@ package com.powersync.utils import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -internal class AtomicMutableSet { +public class AtomicMutableSet { private val mutex = Mutex() private val set = mutableSetOf() - suspend fun add(element: T): Boolean = mutex.withLock { - set.add(element) - } + public suspend fun add(element: T): Boolean = + mutex.withLock { + set.add(element) + } - suspend fun remove(element: T): Boolean = mutex.withLock { - set.remove(element) - } + public suspend fun remove(element: T): Boolean = + mutex.withLock { + set.remove(element) + } - suspend fun clear(): Unit = mutex.withLock { - set.clear() - } + public suspend fun clear(): Unit = + mutex.withLock { + set.clear() + } - suspend fun contains(element: T): Boolean = mutex.withLock { - set.contains(element) - } + public suspend fun contains(element: T): Boolean = + mutex.withLock { + set.contains(element) + } - suspend fun toSet(): Set = mutex.withLock { - set.toSet() - } + public suspend fun toSet(clear: Boolean = false): Set = + mutex.withLock { + val copied = set.toSet() + if (clear) { + set.clear() + } + copied + } } diff --git a/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml b/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml index 89050814..11d32bb4 100644 --- a/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml +++ b/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml @@ -10,7 +10,9 @@ android:roundIcon="@mipmap/ic_launcher_round" android:supportsRtl="true" android:theme="@style/Theme.AppCompat.Light.NoActionBar" - android:enableOnBackInvokedCallback="true"> + android:enableOnBackInvokedCallback="true" + android:networkSecurityConfig="@xml/network_security_config" + > - - - \ No newline at end of file diff --git a/demos/supabase-todolist/androidApp/src/main/res/xml/network_security_config.xml b/demos/supabase-todolist/androidApp/src/main/res/xml/network_security_config.xml new file mode 100644 index 00000000..de61259a --- /dev/null +++ b/demos/supabase-todolist/androidApp/src/main/res/xml/network_security_config.xml @@ -0,0 +1,6 @@ + + + + localhost + + From ee647fec1a229eb449c01dd9361e3e38d1107b94 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 13:47:14 +0200 Subject: [PATCH 03/20] fix resolving of tables - remove from main thread --- .../kotlin/com/powersync/PsSqlDriver.kt | 18 +++++-- .../db/internal/InternalDatabaseImpl.kt | 51 ++++++++++--------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 08b5633b..2d254906 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -33,9 +33,21 @@ internal class PsSqlDriver( } // Flows on table updates containing tables - // The table names here are raw SQLite tables - fun updatesOnTables(tableNames: Set): Flow = - tableUpdatesFlow.asSharedFlow().filter { it.intersect(tableNames).isNotEmpty() }.map { } + fun updatesOnTables(tableNames: Set): Flow { + // Spread the input table names in order to account for internal views + val resolvedTableNames = + tableNames + .flatMap { t -> setOf("ps_data__$t", "ps_data_local__$t", t) } + .toSet() + return tableUpdatesFlow + .asSharedFlow() + .filter { + it + .intersect( + resolvedTableNames, + ).isNotEmpty() + }.map { } + } fun fireTableUpdates() { scope.launch { diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 968de3b2..f33b9378 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -17,6 +17,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.withContext @@ -70,9 +72,7 @@ internal class InternalDatabaseImpl( withContext(dbContext) { executeSync(sql, parameters) }.also { - withContext(dbContext) { - driver.fireTableUpdates() - } + driver.fireTableUpdates() } private fun executeSync( @@ -156,18 +156,26 @@ internal class InternalDatabaseImpl( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): Flow> { - val tables = - getSourceTables(sql, parameters) - .filter { it.isNotBlank() } - .toSet() + ): Flow> = + flow { + println("Getting tables for $sql") + // Fetch the tables asynchronously with getAll + val tables = + getSourceTables(sql, parameters) + .filter { it.isNotBlank() } + .toSet() - return updatesOnTables(tables) - .debounce(DEFAULT_WATCH_THROTTLE_MS) - .map { - getAll(sql, parameters = parameters, mapper = mapper) - }.onStart { emit(getAll(sql, parameters = parameters, mapper = mapper)) } - } + emitAll( + updatesOnTables(tables) + .debounce(DEFAULT_WATCH_THROTTLE_MS) + .map { + getAll(sql, parameters = parameters, mapper = mapper) + }.onStart { + // Emit the initial query result + emit(getAll(sql, parameters = parameters, mapper = mapper)) + }, + ) + } private fun createQuery( query: String, @@ -210,9 +218,7 @@ internal class InternalDatabaseImpl( // Trigger watched queries r }.also { - withContext(dbContext) { - driver.fireTableUpdates() - } + driver.fireTableUpdates() } // Register callback for table updates on a specific table @@ -226,15 +232,14 @@ internal class InternalDatabaseImpl( return tableName } - private fun getSourceTables( + private suspend fun getSourceTables( sql: String, parameters: List?, ): Set { val rows = - createQuery( - query = "EXPLAIN $sql", - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), + getAll( + "EXPLAIN $sql", + parameters = parameters, mapper = { ExplainQueryResult( addr = it.getString(0)!!, @@ -244,7 +249,7 @@ internal class InternalDatabaseImpl( p3 = it.getLong(4)!!, ) }, - ).executeAsList() + ) val rootPages = mutableListOf() for (row in rows) { From 5f06ea05f57ee57a7b666616443e803bb67173cb Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 14:04:03 +0200 Subject: [PATCH 04/20] cleanup --- .../kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index f33b9378..b460eeb3 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -158,7 +158,6 @@ internal class InternalDatabaseImpl( mapper: (SqlCursor) -> RowType, ): Flow> = flow { - println("Getting tables for $sql") // Fetch the tables asynchronously with getAll val tables = getSourceTables(sql, parameters) From f5d0ae77313ed77650de3192da624a08f699a7e8 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 15:14:21 +0200 Subject: [PATCH 05/20] remove unused blocking functions --- .../com/powersync/bucket/BucketStorageImpl.kt | 13 -------- .../powersync/db/internal/InternalDatabase.kt | 2 -- .../db/internal/InternalDatabaseImpl.kt | 33 ++----------------- .../com/powersync/utils/AtomicMutableSet.kt | 2 +- .../com/powersync/bucket/BucketStorageTest.kt | 12 +------ 5 files changed, 5 insertions(+), 57 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index ec61a209..2bdd340b 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -18,7 +18,6 @@ internal class BucketStorageImpl( private val db: InternalDatabase, private val logger: Logger, ) : BucketStorage { - private val tableNames: MutableSet = mutableSetOf() private var hasCompletedSync = AtomicBoolean(false) private var pendingBucketDeletes = AtomicBoolean(false) @@ -32,18 +31,6 @@ internal class BucketStorageImpl( const val COMPACT_OPERATION_INTERVAL = 1_000 } - init { - readTableNames() - } - - private fun readTableNames() { - tableNames.clear() - // Query to get existing table names - val names = db.getExistingTableNames("ps_data_*") - - tableNames.addAll(names) - } - override fun getMaxOpId(): String = MAX_OP_ID override suspend fun getClientId(): String { diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 72cf34b1..c71319b8 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -12,7 +12,5 @@ internal interface InternalDatabase : val transactor: PsDatabase val queries: PowersyncQueries - fun getExistingTableNames(tableGlob: String): List - fun updatesOnTables(tableNames: Set): Flow } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index b460eeb3..0e9f6372 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -61,7 +61,6 @@ internal class InternalDatabaseImpl( } companion object { - const val POWERSYNC_TABLE_MATCH = "(^ps_data__|^ps_data_local__)" const val DEFAULT_WATCH_THROTTLE_MS = 30L } @@ -223,14 +222,6 @@ internal class InternalDatabaseImpl( // Register callback for table updates on a specific table override fun updatesOnTables(tableNames: Set): Flow = driver.updatesOnTables(tableNames) - private fun toFriendlyTableName(tableName: String): String { - val regex = POWERSYNC_TABLE_MATCH.toRegex() - if (regex.containsMatchIn(tableName)) { - return tableName.replace(regex, "") - } - return tableName - } - private suspend fun getSourceTables( sql: String, parameters: List?, @@ -258,33 +249,15 @@ internal class InternalDatabaseImpl( } val params = listOf(JsonUtil.json.encodeToString(rootPages)) val tableRows = - createQuery( + getAll( "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", - parameters = params.size, - binders = { - bindString(0, params[0]) - }, + parameters = params, mapper = { it.getString(0)!! }, - ).executeAsList() + ) return tableRows.toSet() } - override fun getExistingTableNames(tableGlob: String): List { - val existingTableNames = - createQuery( - "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?", - parameters = 1, - binders = { - bindString(0, tableGlob) - }, - mapper = { cursor -> - cursor.getString(0)!! - }, - ).executeAsList() - return existingTableNames - } - override fun close() { runWrapped { this.driver.close() } } diff --git a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt index 33a83e6e..215f65f1 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt @@ -28,7 +28,7 @@ public class AtomicMutableSet { public suspend fun toSet(clear: Boolean = false): Set = mutex.withLock { - val copied = set.toSet() + val copied = set.toList().toSet() if (clear) { set.clear() } diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt index 946b8580..e3b8a69e 100644 --- a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -24,9 +24,7 @@ class BucketStorageTest { @Test fun testGetMaxOpId() { mockDb = - mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") - } + mock {} bucketStorage = BucketStorageImpl(mockDb, Logger) assertEquals("9223372036854775807", bucketStorage.getMaxOpId()) } @@ -36,7 +34,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), @@ -55,7 +52,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), @@ -88,7 +84,6 @@ class BucketStorageTest { ) mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns mockCrudEntry } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -102,7 +97,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns null } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -116,7 +110,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns 1L } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -129,7 +122,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns null } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -142,7 +134,6 @@ class BucketStorageTest { runBlocking { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), @@ -164,7 +155,6 @@ class BucketStorageTest { val mockBucketStates = listOf(BucketState("bucket1", "op1"), BucketState("bucket2", "op2")) mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), From fd0c03fdaf42ca18ad50414d57fa85c5a6159f0b Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 15:29:18 +0200 Subject: [PATCH 06/20] add ability to throttle watched queries --- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 27 ++++++++++++------- .../kotlin/com/powersync/db/Queries.kt | 4 +++ .../db/internal/InternalDatabaseImpl.kt | 12 ++++++--- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index a9b93ade..3d2d8293 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -139,11 +139,12 @@ internal class PowerSyncDatabaseImpl( } } - uploadJob = + uploadJob = scope.launch { - internalDb.updatesOnTables(setOf(InternalTable.CRUD.toString())).debounce(crudThrottleMs).collect { - syncStream!!.triggerCrudUpload() - } + internalDb.updatesOnTables(setOf(InternalTable.CRUD.toString())) + .debounce(crudThrottleMs).collect { + syncStream!!.triggerCrudUpload() + } } } @@ -210,7 +211,8 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne() + override suspend fun getPowerSyncVersion(): String = + internalDb.queries.powerSyncVersion().executeAsOne() override suspend fun get( sql: String, @@ -233,12 +235,15 @@ internal class PowerSyncDatabaseImpl( override fun watch( sql: String, parameters: List?, + throttleMs: Long?, mapper: (SqlCursor) -> RowType, - ): Flow> = internalDb.watch(sql, parameters, mapper) + ): Flow> = internalDb.watch(sql, parameters, throttleMs, mapper) - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) + override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = + internalDb.writeTransaction(callback) - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) + override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = + internalDb.writeTransaction(callback) override suspend fun execute( sql: String, @@ -280,7 +285,11 @@ internal class PowerSyncDatabaseImpl( syncStream = null } - currentStatus.update(connected = false, connecting = false, lastSyncedAt = currentStatus.lastSyncedAt) + currentStatus.update( + connected = false, + connecting = false, + lastSyncedAt = currentStatus.lastSyncedAt + ) } override suspend fun disconnectAndClear(clearLocal: Boolean) { diff --git a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt index f400297c..ba1a0063 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt @@ -54,6 +54,10 @@ public interface Queries { public fun watch( sql: String, parameters: List? = listOf(), + /** + * Specify the minimum interval, in milliseconds, between queries. + */ + throttleMs: Long? = null, mapper: (SqlCursor) -> RowType, ): Flow> diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 0e9f6372..2fca8b12 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -51,13 +51,15 @@ internal class InternalDatabaseImpl( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): List = this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) + ): List = + this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) override fun getOptional( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): RowType? = this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) + ): RowType? = + this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) } companion object { @@ -154,6 +156,7 @@ internal class InternalDatabaseImpl( override fun watch( sql: String, parameters: List?, + throttleMs: Long?, mapper: (SqlCursor) -> RowType, ): Flow> = flow { @@ -165,7 +168,7 @@ internal class InternalDatabaseImpl( emitAll( updatesOnTables(tables) - .debounce(DEFAULT_WATCH_THROTTLE_MS) + .debounce(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) .map { getAll(sql, parameters = parameters, mapper = mapper) }.onStart { @@ -220,7 +223,8 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTables(tableNames: Set): Flow = driver.updatesOnTables(tableNames) + override fun updatesOnTables(tableNames: Set): Flow = + driver.updatesOnTables(tableNames) private suspend fun getSourceTables( sql: String, From e20208d2ad0029e7d74e5d297b49b6d26117f9ae Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 15:51:20 +0200 Subject: [PATCH 07/20] cleanup --- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 3d2d8293..13000093 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -139,10 +139,12 @@ internal class PowerSyncDatabaseImpl( } } - uploadJob = + uploadJob = scope.launch { - internalDb.updatesOnTables(setOf(InternalTable.CRUD.toString())) - .debounce(crudThrottleMs).collect { + internalDb + .updatesOnTables(setOf(InternalTable.CRUD.toString())) + .debounce(crudThrottleMs) + .collect { syncStream!!.triggerCrudUpload() } } @@ -211,8 +213,7 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun getPowerSyncVersion(): String = - internalDb.queries.powerSyncVersion().executeAsOne() + override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne() override suspend fun get( sql: String, @@ -239,11 +240,9 @@ internal class PowerSyncDatabaseImpl( mapper: (SqlCursor) -> RowType, ): Flow> = internalDb.watch(sql, parameters, throttleMs, mapper) - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = - internalDb.writeTransaction(callback) + override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = - internalDb.writeTransaction(callback) + override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) override suspend fun execute( sql: String, @@ -288,7 +287,7 @@ internal class PowerSyncDatabaseImpl( currentStatus.update( connected = false, connecting = false, - lastSyncedAt = currentStatus.lastSyncedAt + lastSyncedAt = currentStatus.lastSyncedAt, ) } From 5387ee5d2b0ebbf7127f1a8f6792a9416c66265b Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 16:45:31 +0200 Subject: [PATCH 08/20] format --- .../com/powersync/db/internal/InternalDatabaseImpl.kt | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 2fca8b12..eacccfdd 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -51,15 +51,13 @@ internal class InternalDatabaseImpl( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): List = - this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) + ): List = this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) override fun getOptional( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): RowType? = - this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) + ): RowType? = this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) } companion object { @@ -223,8 +221,7 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTables(tableNames: Set): Flow = - driver.updatesOnTables(tableNames) + override fun updatesOnTables(tableNames: Set): Flow = driver.updatesOnTables(tableNames) private suspend fun getSourceTables( sql: String, From 9f93371d733d89a4ef0f55c5c4e916a45200dbb6 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 3 Mar 2025 17:01:59 +0200 Subject: [PATCH 09/20] lint --- .../commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt index e3b8a69e..0347d967 100644 --- a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -5,7 +5,6 @@ import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.internal.InternalDatabase import dev.mokkery.answering.returns -import dev.mokkery.every import dev.mokkery.everySuspend import dev.mokkery.matcher.any import dev.mokkery.mock From f50fb95c950e5c75224835638cf6bc0223133365 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 4 Mar 2025 09:34:54 +0200 Subject: [PATCH 10/20] better throttling of watched queries --- .../kotlin/com/powersync/PsSqlDriver.kt | 13 +++++++-- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 15 ++++++---- .../powersync/db/internal/InternalDatabase.kt | 2 +- .../db/internal/InternalDatabaseImpl.kt | 16 ++++++----- .../com/powersync/utils/ThrottleFlow.kt | 28 +++++++++++++++++++ 5 files changed, 58 insertions(+), 16 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 2d254906..3ef1452d 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -2,6 +2,7 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver import com.powersync.utils.AtomicMutableSet +import com.powersync.utils.throttle import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow @@ -33,20 +34,26 @@ internal class PsSqlDriver( } // Flows on table updates containing tables - fun updatesOnTables(tableNames: Set): Flow { + fun updatesOnTables(tableNames: Set, throttleMs: Long?): Flow { // Spread the input table names in order to account for internal views val resolvedTableNames = tableNames .flatMap { t -> setOf("ps_data__$t", "ps_data_local__$t", t) } .toSet() - return tableUpdatesFlow + var flow = tableUpdatesFlow .asSharedFlow() .filter { it .intersect( resolvedTableNames, ).isNotEmpty() - }.map { } + } + + if (throttleMs != null) { + flow = flow.throttle(throttleMs) + } + + return flow.map { } } fun fireTableUpdates() { diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 13000093..6dbc574f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -142,8 +142,10 @@ internal class PowerSyncDatabaseImpl( uploadJob = scope.launch { internalDb - .updatesOnTables(setOf(InternalTable.CRUD.toString())) - .debounce(crudThrottleMs) + .updatesOnTables( + setOf(InternalTable.CRUD.toString()), + throttleMs = crudThrottleMs + ) .collect { syncStream!!.triggerCrudUpload() } @@ -213,7 +215,8 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne() + override suspend fun getPowerSyncVersion(): String = + internalDb.queries.powerSyncVersion().executeAsOne() override suspend fun get( sql: String, @@ -240,9 +243,11 @@ internal class PowerSyncDatabaseImpl( mapper: (SqlCursor) -> RowType, ): Flow> = internalDb.watch(sql, parameters, throttleMs, mapper) - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) + override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = + internalDb.writeTransaction(callback) - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) + override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = + internalDb.writeTransaction(callback) override suspend fun execute( sql: String, diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index c71319b8..a70738ec 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -12,5 +12,5 @@ internal interface InternalDatabase : val transactor: PsDatabase val queries: PowersyncQueries - fun updatesOnTables(tableNames: Set): Flow + fun updatesOnTables(tableNames: Set, throttleMs: Long?): Flow } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index eacccfdd..384228eb 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -16,7 +16,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map @@ -51,13 +50,15 @@ internal class InternalDatabaseImpl( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): List = this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) + ): List = + this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) override fun getOptional( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): RowType? = this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) + ): RowType? = + this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) } companion object { @@ -165,11 +166,11 @@ internal class InternalDatabaseImpl( .toSet() emitAll( - updatesOnTables(tables) - .debounce(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) + updatesOnTables(tables, throttleMs = throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) .map { getAll(sql, parameters = parameters, mapper = mapper) - }.onStart { + } + .onStart { // Emit the initial query result emit(getAll(sql, parameters = parameters, mapper = mapper)) }, @@ -221,7 +222,8 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTables(tableNames: Set): Flow = driver.updatesOnTables(tableNames) + override fun updatesOnTables(tableNames: Set, throttleMs: Long?): Flow = + driver.updatesOnTables(tableNames, throttleMs) private suspend fun getSourceTables( sql: String, diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt new file mode 100644 index 00000000..eb1589fc --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt @@ -0,0 +1,28 @@ +package com.powersync.utils + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.* + +/** + * Throttles a flow with emissions on the leading and trailing edge. + * Events, from the incoming flow, during the throttle window are discarded. + * Events are discarded by using a conflated buffer. + * This throttle method acts as a slow consumer, but backpressure is not a concern + * due to the conflated buffer dropping events during the throttle window. + */ +internal fun Flow.throttle(windowMs: Long): Flow = flow { + // Use a buffer before throttle (ensure only the latest event is kept) + val bufferedFlow = this@throttle.buffer(Channel.CONFLATED) + + bufferedFlow.collect { value -> + // Emit the event immediately (leading edge) + emit(value) + + // Delay for the throttle window to avoid emitting too frequently + delay(windowMs) + + // The next incoming event will be provided from the buffer. + // The next collect will emit the trailing edge + } +} From 7d8f423fdd0bb67fd557cfe8d30f868a6d12ed5f Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 4 Mar 2025 09:37:43 +0200 Subject: [PATCH 11/20] cleanup --- .../kotlin/com/powersync/PsSqlDriver.kt | 22 ++++++++++-------- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 15 ++++-------- .../powersync/db/internal/InternalDatabase.kt | 5 +++- .../db/internal/InternalDatabaseImpl.kt | 15 ++++++------ .../com/powersync/utils/ThrottleFlow.kt | 23 ++++++++++--------- 5 files changed, 41 insertions(+), 39 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 3ef1452d..1cb7af34 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -34,20 +34,24 @@ internal class PsSqlDriver( } // Flows on table updates containing tables - fun updatesOnTables(tableNames: Set, throttleMs: Long?): Flow { + fun updatesOnTables( + tableNames: Set, + throttleMs: Long?, + ): Flow { // Spread the input table names in order to account for internal views val resolvedTableNames = tableNames .flatMap { t -> setOf("ps_data__$t", "ps_data_local__$t", t) } .toSet() - var flow = tableUpdatesFlow - .asSharedFlow() - .filter { - it - .intersect( - resolvedTableNames, - ).isNotEmpty() - } + var flow = + tableUpdatesFlow + .asSharedFlow() + .filter { + it + .intersect( + resolvedTableNames, + ).isNotEmpty() + } if (throttleMs != null) { flow = flow.throttle(throttleMs) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 6dbc574f..7cc74917 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -28,7 +28,6 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -144,9 +143,8 @@ internal class PowerSyncDatabaseImpl( internalDb .updatesOnTables( setOf(InternalTable.CRUD.toString()), - throttleMs = crudThrottleMs - ) - .collect { + throttleMs = crudThrottleMs, + ).collect { syncStream!!.triggerCrudUpload() } } @@ -215,8 +213,7 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun getPowerSyncVersion(): String = - internalDb.queries.powerSyncVersion().executeAsOne() + override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne() override suspend fun get( sql: String, @@ -243,11 +240,9 @@ internal class PowerSyncDatabaseImpl( mapper: (SqlCursor) -> RowType, ): Flow> = internalDb.watch(sql, parameters, throttleMs, mapper) - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = - internalDb.writeTransaction(callback) + override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = - internalDb.writeTransaction(callback) + override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) override suspend fun execute( sql: String, diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index a70738ec..25095f2d 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -12,5 +12,8 @@ internal interface InternalDatabase : val transactor: PsDatabase val queries: PowersyncQueries - fun updatesOnTables(tableNames: Set, throttleMs: Long?): Flow + fun updatesOnTables( + tableNames: Set, + throttleMs: Long?, + ): Flow } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 384228eb..069d3d0f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -50,15 +50,13 @@ internal class InternalDatabaseImpl( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): List = - this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) + ): List = this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) override fun getOptional( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): RowType? = - this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) + ): RowType? = this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) } companion object { @@ -169,8 +167,7 @@ internal class InternalDatabaseImpl( updatesOnTables(tables, throttleMs = throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) .map { getAll(sql, parameters = parameters, mapper = mapper) - } - .onStart { + }.onStart { // Emit the initial query result emit(getAll(sql, parameters = parameters, mapper = mapper)) }, @@ -222,8 +219,10 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTables(tableNames: Set, throttleMs: Long?): Flow = - driver.updatesOnTables(tableNames, throttleMs) + override fun updatesOnTables( + tableNames: Set, + throttleMs: Long?, + ): Flow = driver.updatesOnTables(tableNames, throttleMs) private suspend fun getSourceTables( sql: String, diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt index eb1589fc..dbd40746 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt @@ -11,18 +11,19 @@ import kotlinx.coroutines.flow.* * This throttle method acts as a slow consumer, but backpressure is not a concern * due to the conflated buffer dropping events during the throttle window. */ -internal fun Flow.throttle(windowMs: Long): Flow = flow { - // Use a buffer before throttle (ensure only the latest event is kept) - val bufferedFlow = this@throttle.buffer(Channel.CONFLATED) +internal fun Flow.throttle(windowMs: Long): Flow = + flow { + // Use a buffer before throttle (ensure only the latest event is kept) + val bufferedFlow = this@throttle.buffer(Channel.CONFLATED) - bufferedFlow.collect { value -> - // Emit the event immediately (leading edge) - emit(value) + bufferedFlow.collect { value -> + // Emit the event immediately (leading edge) + emit(value) - // Delay for the throttle window to avoid emitting too frequently - delay(windowMs) + // Delay for the throttle window to avoid emitting too frequently + delay(windowMs) - // The next incoming event will be provided from the buffer. - // The next collect will emit the trailing edge + // The next incoming event will be provided from the buffer. + // The next collect will emit the trailing edge + } } -} From 9e18996db66045ff848625dfdc28e59e4e2eb917 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 4 Mar 2025 11:12:14 +0200 Subject: [PATCH 12/20] lint --- .../src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt index dbd40746..711b2cb4 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt @@ -2,7 +2,9 @@ package com.powersync.utils import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.flow /** * Throttles a flow with emissions on the leading and trailing edge. From 6b2c80bd9935d92f2ebdd24d8dc036f7ce1ad4f6 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 4 Mar 2025 17:54:11 +0200 Subject: [PATCH 13/20] fix race conditions in watches --- .../java/com/powersync/AndroidDatabaseTest.kt | 60 +++++++++++++++++++ .../kotlin/com/powersync/PsSqlDriver.kt | 51 +++++----------- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 10 ++-- .../powersync/db/internal/InternalDatabase.kt | 6 +- .../db/internal/InternalDatabaseImpl.kt | 48 +++++++++------ 5 files changed, 114 insertions(+), 61 deletions(-) diff --git a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt index 0ca5d9ea..727879d5 100644 --- a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt +++ b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt @@ -7,7 +7,11 @@ import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.test.advanceTimeBy import org.junit.After +import kotlinx.coroutines.delay +import kotlinx.coroutines.* + import org.junit.Test import org.junit.runner.RunWith @@ -91,4 +95,60 @@ class AndroidDatabaseTest { query.cancel() } } + + @Test + fun testThrottledTableUpdates() = + runTest { + turbineScope { + // Avoids skipping delays + withContext(Dispatchers.Default) { + val query = + database.watch( + "SELECT * FROM users", + throttleMs = 1000 + ) { UserRow.from(it) }.testIn(this) + + // Wait for initial query + assertEquals(0, query.awaitItem().size) + + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("Test", "test@example.org"), + ) + + database.writeTransaction { + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("Test2", "test2@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("Test3", "test3@example.org"), + ) + } + + assertEquals(3, query.awaitItem().size) + + try { + database.writeTransaction { + it.execute("DELETE FROM users;") + it.execute("syntax error, revert please") + } + } catch (e: Exception) { + // Ignore + } + + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("Test4", "test4@example.org"), + ) + assertEquals(4, query.awaitItem().size) + + query.expectNoEvents() + query.cancel() + + } + + } + } } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 1cb7af34..70703217 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -2,14 +2,11 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver import com.powersync.utils.AtomicMutableSet -import com.powersync.utils.throttle import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking internal class PsSqlDriver( private val driver: SqlDriver, @@ -22,47 +19,31 @@ internal class PsSqlDriver( private val pendingUpdates = AtomicMutableSet() fun updateTable(tableName: String) { - scope.launch { + // This should only ever be executed by an execute operation which should + // always be executed with the IO Dispatcher + runBlocking { pendingUpdates.add(tableName) } } fun clearTableUpdates() { - scope.launch { + // This should only ever be executed on rollback which should be executed via the + // IO Dispatcher. + runBlocking { pendingUpdates.clear() } } - // Flows on table updates containing tables - fun updatesOnTables( - tableNames: Set, - throttleMs: Long?, - ): Flow { + // Flows on any table change + // This specifically returns a SharedFlow for timing considerations + fun updatesOnTables(): SharedFlow> { // Spread the input table names in order to account for internal views - val resolvedTableNames = - tableNames - .flatMap { t -> setOf("ps_data__$t", "ps_data_local__$t", t) } - .toSet() - var flow = - tableUpdatesFlow - .asSharedFlow() - .filter { - it - .intersect( - resolvedTableNames, - ).isNotEmpty() - } - - if (throttleMs != null) { - flow = flow.throttle(throttleMs) - } - - return flow.map { } + return tableUpdatesFlow + .asSharedFlow() } - fun fireTableUpdates() { - scope.launch { - tableUpdatesFlow.emit(pendingUpdates.toSet(true)) - } + suspend fun fireTableUpdates() { + val updates = pendingUpdates.toSet(true) + tableUpdatesFlow.emit(updates) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 7cc74917..b5bb5b2f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -22,12 +22,14 @@ import com.powersync.sync.SyncStatusData import com.powersync.sync.SyncStream import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil +import com.powersync.utils.throttle import com.powersync.utils.toJsonObject import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -141,10 +143,10 @@ internal class PowerSyncDatabaseImpl( uploadJob = scope.launch { internalDb - .updatesOnTables( - setOf(InternalTable.CRUD.toString()), - throttleMs = crudThrottleMs, - ).collect { + .updatesOnTables() + .filter { it.contains(InternalTable.CRUD.toString()) } + .throttle(crudThrottleMs) + .collect { syncStream!!.triggerCrudUpload() } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 25095f2d..0d264d45 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -5,6 +5,7 @@ import com.persistence.PowersyncQueries import com.powersync.db.Queries import com.powersync.persistence.PsDatabase import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow internal interface InternalDatabase : Queries, @@ -12,8 +13,5 @@ internal interface InternalDatabase : val transactor: PsDatabase val queries: PowersyncQueries - fun updatesOnTables( - tableNames: Set, - throttleMs: Long?, - ): Flow + fun updatesOnTables(): SharedFlow> } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 069d3d0f..7114a1b7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -10,16 +10,17 @@ import com.powersync.db.SqlCursor import com.powersync.db.runWrapped import com.powersync.persistence.PsDatabase import com.powersync.utils.JsonUtil +import com.powersync.utils.throttle import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.onSubscription import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString @@ -156,22 +157,36 @@ internal class InternalDatabaseImpl( throttleMs: Long?, mapper: (SqlCursor) -> RowType, ): Flow> = - flow { + // Use a channel flow here since we throttle (buffer used under the hood) + // This causes some emissions to be from different scopes. + channelFlow { // Fetch the tables asynchronously with getAll val tables = getSourceTables(sql, parameters) .filter { it.isNotBlank() } .toSet() - emitAll( - updatesOnTables(tables, throttleMs = throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) - .map { - getAll(sql, parameters = parameters, mapper = mapper) - }.onStart { - // Emit the initial query result - emit(getAll(sql, parameters = parameters, mapper = mapper)) - }, - ) + // Register a listener before fetching the initial result + val updateFlow = + updatesOnTables() + + val initialResult = getAll(sql, parameters = parameters, mapper = mapper) + // Listen for updates before emitting the initial result + + updateFlow + // onSubscription here is very important. + // This ensures that the initial result and all updates are emitted. + .onSubscription { + println("emitting initial result") + send(initialResult) + }.filter { + // Only trigger updates on relevant tables + it.intersect(tables).isNotEmpty() + }.throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) + .collect { + println("mapping update to result") + send(getAll(sql, parameters = parameters, mapper = mapper)) + } } private fun createQuery( @@ -219,10 +234,7 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTables( - tableNames: Set, - throttleMs: Long?, - ): Flow = driver.updatesOnTables(tableNames, throttleMs) + override fun updatesOnTables(): SharedFlow> = driver.updatesOnTables() private suspend fun getSourceTables( sql: String, From 3dd7e431cde801abe595877d68fcbd4c56742d0d Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Mar 2025 09:02:29 +0200 Subject: [PATCH 14/20] fix race conditions --- .../kotlin/com/powersync/PsSqlDriver.kt | 12 ++++----- .../db/internal/InternalDatabaseImpl.kt | 13 ++------- .../kotlin/com/powersync/utils/JsonTest.kt | 27 +++++++++++++++++++ 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 70703217..c7efac85 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -27,20 +27,18 @@ internal class PsSqlDriver( } fun clearTableUpdates() { - // This should only ever be executed on rollback which should be executed via the - // IO Dispatcher. + // This should only ever be executed by an execute operation which should + // always be executed with the IO Dispatcher runBlocking { pendingUpdates.clear() } } // Flows on any table change - // This specifically returns a SharedFlow for timing considerations - fun updatesOnTables(): SharedFlow> { - // Spread the input table names in order to account for internal views - return tableUpdatesFlow + // This specifically returns a SharedFlow for downstream timing considerations + fun updatesOnTables(): SharedFlow> = + tableUpdatesFlow .asSharedFlow() - } suspend fun fireTableUpdates() { val updates = pendingUpdates.toSet(true) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 7114a1b7..d111494c 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -166,25 +166,16 @@ internal class InternalDatabaseImpl( .filter { it.isNotBlank() } .toSet() - // Register a listener before fetching the initial result - val updateFlow = - updatesOnTables() - - val initialResult = getAll(sql, parameters = parameters, mapper = mapper) - // Listen for updates before emitting the initial result - - updateFlow + updatesOnTables() // onSubscription here is very important. // This ensures that the initial result and all updates are emitted. .onSubscription { - println("emitting initial result") - send(initialResult) + send(getAll(sql, parameters = parameters, mapper = mapper)) }.filter { // Only trigger updates on relevant tables it.intersect(tables).isNotEmpty() }.throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) .collect { - println("mapping update to result") send(getAll(sql, parameters = parameters, mapper = mapper)) } } diff --git a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt index 0157012f..aa2611e1 100644 --- a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt @@ -1,5 +1,10 @@ package com.powersync.utils +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonNull import kotlinx.serialization.json.JsonObject @@ -36,6 +41,28 @@ class JsonTest { assertEquals("test", jsonElement.content) } + @Test + fun testThrottle() { + runTest { + val t = + flow { + emit(1) + delay(10) + emit(2) + delay(20) + emit(3) + delay(100) + emit(4) + }.throttle(100) + .map { + // Adding a delay here to simulate a slow consumer + delay(1000) + it + }.toList() + assertEquals(t, listOf(1, 4)) + } + } + @Test fun testBooleanToJsonElement() { val boolean = JsonParam.Boolean(true) From eda549a6581e8e0ab3bcb7afab8149070188d6dd Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Mar 2025 09:32:44 +0200 Subject: [PATCH 15/20] remove runBlocking, add write lock --- .../kotlin/com/powersync/PsSqlDriver.kt | 20 +++++++------- .../db/internal/InternalDatabaseImpl.kt | 27 ++++++++++++------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index c7efac85..9f7b27f8 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -3,10 +3,11 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver import com.powersync.utils.AtomicMutableSet import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.async import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.launch internal class PsSqlDriver( private val driver: SqlDriver, @@ -19,17 +20,13 @@ internal class PsSqlDriver( private val pendingUpdates = AtomicMutableSet() fun updateTable(tableName: String) { - // This should only ever be executed by an execute operation which should - // always be executed with the IO Dispatcher - runBlocking { + scope.launch { pendingUpdates.add(tableName) } } fun clearTableUpdates() { - // This should only ever be executed by an execute operation which should - // always be executed with the IO Dispatcher - runBlocking { + scope.launch { pendingUpdates.clear() } } @@ -41,7 +38,12 @@ internal class PsSqlDriver( .asSharedFlow() suspend fun fireTableUpdates() { - val updates = pendingUpdates.toSet(true) - tableUpdatesFlow.emit(updates) + // Use the same scope as the async table updates, this should help with queuing + val job = + scope.async { + val updates = pendingUpdates.toSet(true) + tableUpdatesFlow.emit(updates) + } + job.await() } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index d111494c..1791e1be 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -21,6 +21,8 @@ import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.onSubscription +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString @@ -34,6 +36,8 @@ internal class InternalDatabaseImpl( // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO + private val writeLock = Mutex() + private val transaction = object : PowerSyncTransaction { override fun execute( @@ -68,10 +72,12 @@ internal class InternalDatabaseImpl( sql: String, parameters: List?, ): Long = - withContext(dbContext) { - executeSync(sql, parameters) - }.also { - driver.fireTableUpdates() + writeLock.withLock { + withContext(dbContext) { + executeSync(sql, parameters) + }.also { + driver.fireTableUpdates() + } } private fun executeSync( @@ -207,8 +213,8 @@ internal class InternalDatabaseImpl( } override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = - withContext(dbContext) { - val r = + writeLock.withLock { + withContext(dbContext) { transactor.transactionWithResult(noEnclosing = true) { runWrapped { val result = callback.execute(transaction) @@ -218,10 +224,11 @@ internal class InternalDatabaseImpl( result } } - // Trigger watched queries - r - }.also { - driver.fireTableUpdates() + }.also { + // Trigger watched queries + // Fire updates inside the write lock + driver.fireTableUpdates() + } } // Register callback for table updates on a specific table From 704fb0871309cd5c10200a18ac3cd565b1018d2c Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Mar 2025 09:33:48 +0200 Subject: [PATCH 16/20] lint --- .../kotlin/com/powersync/db/internal/InternalDatabase.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 0d264d45..92362904 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -4,7 +4,6 @@ import app.cash.sqldelight.db.Closeable import com.persistence.PowersyncQueries import com.powersync.db.Queries import com.powersync.persistence.PsDatabase -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow internal interface InternalDatabase : From f4de16f1b5ec9f6a08aa234b741f921c26bc307b Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Mar 2025 10:40:46 +0200 Subject: [PATCH 17/20] added changelog --- CHANGELOG.md | 4 ++++ .../src/androidTest/java/com/powersync/AndroidDatabaseTest.kt | 2 -- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 134f481a..b22b7608 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA27 + +* Improved watch query internals. Added the ability to throttle watched queries. + ## 1.0.0-BETA26 * Support bucket priorities and partial syncs. diff --git a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt index 727879d5..4bcb1135 100644 --- a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt +++ b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt @@ -96,8 +96,6 @@ class AndroidDatabaseTest { } } - @Test - fun testThrottledTableUpdates() = runTest { turbineScope { // Avoids skipping delays From 8d2b8cc803bf6d816c37c2fe8fa015a020d91502 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Mar 2025 10:41:47 +0200 Subject: [PATCH 18/20] delete shaky test test --- .../java/com/powersync/AndroidDatabaseTest.kt | 69 ++----------------- 1 file changed, 5 insertions(+), 64 deletions(-) diff --git a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt index 4bcb1135..a6a90eff 100644 --- a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt +++ b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt @@ -1,23 +1,18 @@ package com.powersync -import androidx.test.platform.app.InstrumentationRegistry import androidx.test.ext.junit.runners.AndroidJUnit4 +import androidx.test.platform.app.InstrumentationRegistry import app.cash.turbine.turbineScope import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow +import kotlinx.coroutines.* import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest -import kotlinx.coroutines.test.advanceTimeBy import org.junit.After -import kotlinx.coroutines.delay -import kotlinx.coroutines.* - - -import org.junit.Test -import org.junit.runner.RunWith - import org.junit.Assert.* import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) class AndroidDatabaseTest { @@ -95,58 +90,4 @@ class AndroidDatabaseTest { query.cancel() } } - - runTest { - turbineScope { - // Avoids skipping delays - withContext(Dispatchers.Default) { - val query = - database.watch( - "SELECT * FROM users", - throttleMs = 1000 - ) { UserRow.from(it) }.testIn(this) - - // Wait for initial query - assertEquals(0, query.awaitItem().size) - - database.execute( - "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", - listOf("Test", "test@example.org"), - ) - - database.writeTransaction { - it.execute( - "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", - listOf("Test2", "test2@example.org"), - ) - it.execute( - "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", - listOf("Test3", "test3@example.org"), - ) - } - - assertEquals(3, query.awaitItem().size) - - try { - database.writeTransaction { - it.execute("DELETE FROM users;") - it.execute("syntax error, revert please") - } - } catch (e: Exception) { - // Ignore - } - - database.execute( - "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", - listOf("Test4", "test4@example.org"), - ) - assertEquals(4, query.awaitItem().size) - - query.expectNoEvents() - query.cancel() - - } - - } - } -} \ No newline at end of file +} From 2e32fcf20abd4bd277de06a305fc2157abf3deca Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Mar 2025 12:08:28 +0200 Subject: [PATCH 19/20] add throttle comment --- .../com/powersync/db/internal/InternalDatabaseImpl.kt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 1791e1be..4d37cecc 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -180,7 +180,13 @@ internal class InternalDatabaseImpl( }.filter { // Only trigger updates on relevant tables it.intersect(tables).isNotEmpty() - }.throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) + } + // Throttling here is a feature which prevents watch queries from spamming updates. + // Throttling by design discards and delays events within the throttle window. Discarded events + // still trigger a trailing edge update. + // Backpressure is avoided on the throttling and consumer level by buffering the last upstream value. + // Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results. + .throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) .collect { send(getAll(sql, parameters = parameters, mapper = mapper)) } From 473af1a2be044c7e767adf402143d614b7ef30ea Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Mar 2025 12:30:20 +0200 Subject: [PATCH 20/20] use synchronized set --- .../kotlin/com/powersync/PsSqlDriver.kt | 18 ++------- .../com/powersync/utils/AtomicMutableSet.kt | 37 +++++++------------ 2 files changed, 16 insertions(+), 39 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 9f7b27f8..77c6f658 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -3,11 +3,9 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver import com.powersync.utils.AtomicMutableSet import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.async import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.launch internal class PsSqlDriver( private val driver: SqlDriver, @@ -20,15 +18,11 @@ internal class PsSqlDriver( private val pendingUpdates = AtomicMutableSet() fun updateTable(tableName: String) { - scope.launch { - pendingUpdates.add(tableName) - } + pendingUpdates.add(tableName) } fun clearTableUpdates() { - scope.launch { - pendingUpdates.clear() - } + pendingUpdates.clear() } // Flows on any table change @@ -38,12 +32,6 @@ internal class PsSqlDriver( .asSharedFlow() suspend fun fireTableUpdates() { - // Use the same scope as the async table updates, this should help with queuing - val job = - scope.async { - val updates = pendingUpdates.toSet(true) - tableUpdatesFlow.emit(updates) - } - job.await() + tableUpdatesFlow.emit(pendingUpdates.toSetAndClear()) } } diff --git a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt index 215f65f1..536ddd1a 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt @@ -1,37 +1,26 @@ package com.powersync.utils -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -public class AtomicMutableSet { - private val mutex = Mutex() - private val set = mutableSetOf() +import kotlinx.atomicfu.locks.SynchronizedObject +import kotlinx.atomicfu.locks.synchronized - public suspend fun add(element: T): Boolean = - mutex.withLock { - set.add(element) - } +public class AtomicMutableSet : SynchronizedObject() { + private val set = mutableSetOf() - public suspend fun remove(element: T): Boolean = - mutex.withLock { - set.remove(element) + public fun add(element: T): Boolean = + synchronized(this) { + return set.add(element) } - public suspend fun clear(): Unit = - mutex.withLock { + // Synchronized clear method + public fun clear(): Unit = + synchronized(this) { set.clear() } - public suspend fun contains(element: T): Boolean = - mutex.withLock { - set.contains(element) - } - - public suspend fun toSet(clear: Boolean = false): Set = - mutex.withLock { + public fun toSetAndClear(): Set = + synchronized(this) { val copied = set.toList().toSet() - if (clear) { - set.clear() - } + set.clear() copied } }