11package com.powersync.bucket
22
3- import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull
4- import co.touchlab.kermit.Logger
5- import com.powersync.db.internal.PsInternalDatabase
3+ import com.powersync.db.crud.CrudEntry
64import com.powersync.sync.SyncDataBatch
75import com.powersync.sync.SyncLocalDatabaseResult
8- import co.touchlab.stately.concurrency.AtomicBoolean
9- import kotlinx.serialization.encodeToString
10- import com.powersync.db.internal.InternalTable
11- import com.powersync.utils.JsonUtil
126
13- internal class BucketStorage (
14- private val db : PsInternalDatabase ,
15- private val logger : Logger
16- ) {
17- private val tableNames: MutableSet <String > = mutableSetOf ()
18- private var hasCompletedSync = AtomicBoolean (false )
19- private var pendingBucketDeletes = AtomicBoolean (false )
20-
21- /* *
22- * Count up, and do a compact on startup.
23- */
24- private var compactCounter = COMPACT_OPERATION_INTERVAL
25-
26- companion object {
27- const val MAX_OP_ID = " 9223372036854775807"
28- const val COMPACT_OPERATION_INTERVAL = 1_000
29- }
30-
31- init {
32- readTableNames()
33- }
34-
35- private fun readTableNames () {
36- tableNames.clear()
37- // Query to get existing table names
38- val names = db.getExistingTableNames(" ps_data_*" )
39-
40- tableNames.addAll(names)
41- }
42-
43- fun getMaxOpId (): String {
44- return MAX_OP_ID
45- }
46-
47- suspend fun getClientId (): String {
48- val id = db.getOptional(" SELECT powersync_client_id() as client_id" ) {
49- it.getString(0 )!!
50- }
51- return id ? : throw IllegalStateException (" Client ID not found" )
52- }
53-
54- suspend fun hasCrud (): Boolean {
55- return db.queries.hasCrud().awaitAsOneOrNull() == 1L
56- }
57-
58- suspend fun updateLocalTarget (checkpointCallback : suspend () -> String ): Boolean {
59- db.getOptional(
60- " SELECT target_op FROM ${InternalTable .BUCKETS } WHERE name = '\$ local' AND target_op = ?" ,
61- parameters = listOf (MAX_OP_ID ),
62- mapper = { cursor -> cursor.getLong(0 )!! }
63- )
64- ? : // Nothing to update
65- return false
66-
67- val seqBefore =
68- db.getOptional(" SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable .CRUD } '" ) {
69- it.getLong(0 )!!
70- } ? : // Nothing to update
71- return false
72-
73- val opId = checkpointCallback()
74-
75- logger.i { " [updateLocalTarget] Updating target to checkpoint $opId " }
76-
77- return db.writeTransaction {
78- if (hasCrud()) {
79- logger.w { " [updateLocalTarget] ps crud is not empty" }
80- return @writeTransaction false
81- }
82-
83- val seqAfter =
84- db.getOptional(" SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable .CRUD } '" ) {
85- it.getLong(0 )!!
86- }
87- ? : // assert isNotEmpty
88- throw AssertionError (" Sqlite Sequence should not be empty" )
89-
90- if (seqAfter != seqBefore) {
91- logger.d(" seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore " )
92- // New crud data may have been uploaded since we got the checkpoint. Abort.
93- return @writeTransaction false
94- }
95-
96- db.execute(
97- " UPDATE ${InternalTable .BUCKETS } SET target_op = CAST(? as INTEGER) WHERE name='\$ local'" ,
98- listOf (opId)
99- )
100-
101- return @writeTransaction true
102- }
103- }
104-
105- suspend fun saveSyncData (syncDataBatch : SyncDataBatch ) {
106- db.writeTransaction { tx ->
107- val jsonString = JsonUtil .json.encodeToString(syncDataBatch)
108- tx.execute(
109- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
110- listOf (" save" , jsonString)
111- )
112- }
113- this .compactCounter + = syncDataBatch.buckets.sumOf { it.data.size }
114- }
115-
116- suspend fun getBucketStates (): List <BucketState > {
117- return db.getAll(
118- " SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable .BUCKETS } WHERE pending_delete = 0" ,
119- mapper = { cursor ->
120- BucketState (
121- bucket = cursor.getString(0 )!! ,
122- opId = cursor.getString(1 )!!
123- )
124- })
125- }
126-
127- suspend fun removeBuckets (bucketsToDelete : List <String >) {
128- bucketsToDelete.forEach { bucketName ->
129- deleteBucket(bucketName)
130- }
131- }
132-
133-
134- private suspend fun deleteBucket (bucketName : String ) {
135-
136- db.writeTransaction{ tx ->
137- tx.execute(
138- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
139- listOf (" delete_bucket" , bucketName)
140- )
141- }
142-
143- Logger .d(" [deleteBucket] Done deleting" )
144-
145- this .pendingBucketDeletes.value = true
146- }
147-
148- suspend fun hasCompletedSync (): Boolean {
149- if (hasCompletedSync.value) {
150- return true
151- }
152-
153- val completedSync = db.getOptional(
154- " SELECT powersync_last_synced_at()" ,
155- mapper = { cursor ->
156- cursor.getString(0 )!!
157- })
158-
159- return if (completedSync != null ) {
160- hasCompletedSync.value = true
161- true
162- } else {
163- false
164- }
165- }
166-
167- suspend fun syncLocalDatabase (targetCheckpoint : Checkpoint ): SyncLocalDatabaseResult {
168- val result = validateChecksums(targetCheckpoint)
169-
170- if (! result.checkpointValid) {
171- logger.w { " [SyncLocalDatabase] Checksums failed for ${result.checkpointFailures} " }
172- result.checkpointFailures?.forEach { bucketName ->
173- deleteBucket(bucketName)
174- }
175- result.ready = false
176- return result
177- }
178-
179- val bucketNames = targetCheckpoint.checksums.map { it.bucket }
180-
181- db.writeTransaction { tx ->
182- tx.execute(
183- " UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))" ,
184- listOf (targetCheckpoint.lastOpId, JsonUtil .json.encodeToString(bucketNames))
185- )
186-
187- if (targetCheckpoint.writeCheckpoint != null ) {
188- tx.execute(
189- " UPDATE ps_buckets SET last_op = ? WHERE name = '\$ local'" ,
190- listOf (targetCheckpoint.writeCheckpoint),
191- )
192- }
193- }
194-
195- val valid = updateObjectsFromBuckets()
196-
197- if (! valid) {
198- return SyncLocalDatabaseResult (
199- ready = false ,
200- checkpointValid = true ,
201- )
202- }
203-
204- this .forceCompact()
205-
206- return SyncLocalDatabaseResult (
207- ready = true ,
208- )
209- }
210-
211- private suspend fun validateChecksums (checkpoint : Checkpoint ): SyncLocalDatabaseResult {
212- val res = db.getOptional(
213- " SELECT powersync_validate_checkpoint(?) AS result" ,
214- parameters = listOf (JsonUtil .json.encodeToString(checkpoint)),
215- mapper = { cursor ->
216- cursor.getString(0 )!!
217- })
218- ? : // no result
219- return SyncLocalDatabaseResult (
220- ready = false ,
221- checkpointValid = false ,
222- )
223-
224- return JsonUtil .json.decodeFromString<SyncLocalDatabaseResult >(res)
225- }
226-
227- /* *
228- * Atomically update the local state.
229- *
230- * This includes creating new tables, dropping old tables, and copying data over from the oplog.
231- */
232- private suspend fun updateObjectsFromBuckets (): Boolean {
233- return db.writeTransaction { tx ->
234-
235- tx.execute(
236- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
237- listOf (" sync_local" , " " )
238- )
239-
240- val res = tx.get(" select last_insert_rowid()" ) { cursor ->
241- cursor.getLong(0 )!!
242- }
243-
244- return @writeTransaction res == 1L
245- }
246- }
247-
248- private suspend fun forceCompact () {
249- // Reset counter
250- this .compactCounter = COMPACT_OPERATION_INTERVAL
251- this .pendingBucketDeletes.value = true
252-
253- this .autoCompact()
254- }
255-
256-
257- private suspend fun autoCompact () {
258- // 1. Delete buckets
259- deletePendingBuckets()
260-
261- // 2. Clear REMOVE operations, only keeping PUT ones
262- clearRemoveOps()
263- }
264-
265- private suspend fun deletePendingBuckets () {
266- if (! this .pendingBucketDeletes.value) {
267- return
268- }
269-
270- db.writeTransaction { tx ->
271- tx.execute(
272- " INSERT INTO powersync_operations(op, data) VALUES (?, ?)" , listOf (" delete_pending_buckets" ," " )
273- )
274-
275- // Executed once after start-up, and again when there are pending deletes.
276- pendingBucketDeletes.value = false
277- }
278- }
279-
280- private suspend fun clearRemoveOps () {
281- if (this .compactCounter < COMPACT_OPERATION_INTERVAL ) {
282- return
283- }
284-
285- db.writeTransaction { tx ->
286- tx.execute(
287- " INSERT INTO powersync_operations(op, data) VALUES (?, ?)" ,
288- listOf (" clear_remove_ops" , " " )
289- )
290- }
291- this .compactCounter = 0
292- }
293-
294- @Suppress(" UNUSED_PARAMETER" )
295- fun setTargetCheckpoint (checkpoint : Checkpoint ) {
296- // No-op for now
297- }
298- }
7+ internal interface BucketStorage {
8+ fun getMaxOpId (): String
9+ suspend fun getClientId (): String
10+ suspend fun nextCrudItem (): CrudEntry ?
11+ suspend fun hasCrud (): Boolean
12+ suspend fun updateLocalTarget (checkpointCallback : suspend () -> String ): Boolean
13+ suspend fun saveSyncData (syncDataBatch : SyncDataBatch )
14+ suspend fun getBucketStates (): List <BucketState >
15+ suspend fun removeBuckets (bucketsToDelete : List <String >)
16+ suspend fun hasCompletedSync (): Boolean
17+ suspend fun syncLocalDatabase (targetCheckpoint : Checkpoint ): SyncLocalDatabaseResult
18+ fun setTargetCheckpoint (checkpoint : Checkpoint )
19+ }
0 commit comments