@@ -9,13 +9,15 @@ import {
99 UpdateNotification ,
1010 isBatchedUpdateNotification
1111} from '../db/DBAdapter.js' ;
12+ import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js' ;
1213import { SyncPriorityStatus , SyncStatus } from '../db/crud/SyncStatus.js' ;
1314import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js' ;
1415import { Schema } from '../db/schema/Schema.js' ;
1516import { BaseObserver } from '../utils/BaseObserver.js' ;
1617import { ControlledExecutor } from '../utils/ControlledExecutor.js' ;
17- import { mutexRunExclusive } from '../utils/mutex.js' ;
1818import { throttleTrailing } from '../utils/async.js' ;
19+ import { mutexRunExclusive } from '../utils/mutex.js' ;
20+ import { ConnectionManager } from './ConnectionManager.js' ;
1921import { SQLOpenFactory , SQLOpenOptions , isDBAdapter , isSQLOpenFactory , isSQLOpenOptions } from './SQLOpenFactory.js' ;
2022import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js' ;
2123import { runOnSchemaChange } from './runOnSchemaChange.js' ;
@@ -32,7 +34,6 @@ import {
3234 type PowerSyncConnectionOptions ,
3335 type RequiredAdditionalConnectionOptions
3436} from './sync/stream/AbstractStreamingSyncImplementation.js' ;
35- import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js' ;
3637
3738export interface DisconnectAndClearOptions {
3839 /** When set to false, data in local-only tables is preserved. */
@@ -165,17 +166,22 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
165166 */
166167 currentStatus : SyncStatus ;
167168
168- syncStreamImplementation ?: StreamingSyncImplementation ;
169169 sdkVersion : string ;
170170
171171 protected bucketStorageAdapter : BucketStorageAdapter ;
172- private syncStatusListenerDisposer ?: ( ) => void ;
173172 protected _isReadyPromise : Promise < void > ;
173+ protected connectionManager : ConnectionManager ;
174+
175+ get syncStreamImplementation ( ) {
176+ return this . connectionManager . syncStreamImplementation ;
177+ }
174178
175179 protected _schema : Schema ;
176180
177181 private _database : DBAdapter ;
178182
183+ protected runExclusiveMutex : Mutex ;
184+
179185 constructor ( options : PowerSyncDatabaseOptionsWithDBAdapter ) ;
180186 constructor ( options : PowerSyncDatabaseOptionsWithOpenFactory ) ;
181187 constructor ( options : PowerSyncDatabaseOptionsWithSettings ) ;
@@ -206,7 +212,33 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
206212 this . _schema = schema ;
207213 this . ready = false ;
208214 this . sdkVersion = '' ;
215+ this . runExclusiveMutex = new Mutex ( ) ;
209216 // Start async init
217+ this . connectionManager = new ConnectionManager ( {
218+ createSyncImplementation : async ( connector , options ) => {
219+ await this . waitForReady ( ) ;
220+
221+ return this . runExclusive ( async ( ) => {
222+ const sync = this . generateSyncStreamImplementation ( connector , this . resolvedConnectionOptions ( options ) ) ;
223+ const onDispose = sync . registerListener ( {
224+ statusChanged : ( status ) => {
225+ this . currentStatus = new SyncStatus ( {
226+ ...status . toJSON ( ) ,
227+ hasSynced : this . currentStatus ?. hasSynced || ! ! status . lastSyncedAt
228+ } ) ;
229+ this . iterateListeners ( ( cb ) => cb . statusChanged ?.( this . currentStatus ) ) ;
230+ }
231+ } ) ;
232+ await sync . waitForReady ( ) ;
233+
234+ return {
235+ sync,
236+ onDispose
237+ } ;
238+ } ) ;
239+ } ,
240+ logger : this . logger
241+ } ) ;
210242 this . _isReadyPromise = this . initialize ( ) ;
211243 }
212244
@@ -425,34 +457,19 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
425457 } ;
426458 }
427459
460+ /**
461+ * Locking mechanism for exclusively running critical portions of connect/disconnect operations.
462+ * Locking here is mostly only important on web for multiple tab scenarios.
463+ */
464+ protected runExclusive < T > ( callback : ( ) => Promise < T > ) : Promise < T > {
465+ return this . runExclusiveMutex . runExclusive ( callback ) ;
466+ }
467+
428468 /**
429469 * Connects to stream of events from the PowerSync instance.
430470 */
431471 async connect ( connector : PowerSyncBackendConnector , options ?: PowerSyncConnectionOptions ) {
432- await this . waitForReady ( ) ;
433-
434- // close connection if one is open
435- await this . disconnect ( ) ;
436- if ( this . closed ) {
437- throw new Error ( 'Cannot connect using a closed client' ) ;
438- }
439-
440- const resolvedConnectOptions = this . resolvedConnectionOptions ( options ) ;
441-
442- this . syncStreamImplementation = this . generateSyncStreamImplementation ( connector , resolvedConnectOptions ) ;
443- this . syncStatusListenerDisposer = this . syncStreamImplementation . registerListener ( {
444- statusChanged : ( status ) => {
445- this . currentStatus = new SyncStatus ( {
446- ...status . toJSON ( ) ,
447- hasSynced : this . currentStatus ?. hasSynced || ! ! status . lastSyncedAt
448- } ) ;
449- this . iterateListeners ( ( cb ) => cb . statusChanged ?.( this . currentStatus ) ) ;
450- }
451- } ) ;
452-
453- await this . syncStreamImplementation . waitForReady ( ) ;
454- this . syncStreamImplementation . triggerCrudUpload ( ) ;
455- await this . syncStreamImplementation . connect ( options ) ;
472+ return this . connectionManager . connect ( connector , options ) ;
456473 }
457474
458475 /**
@@ -461,11 +478,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
461478 * Use {@link connect} to connect again.
462479 */
463480 async disconnect ( ) {
464- await this . waitForReady ( ) ;
465- await this . syncStreamImplementation ?. disconnect ( ) ;
466- this . syncStatusListenerDisposer ?.( ) ;
467- await this . syncStreamImplementation ?. dispose ( ) ;
468- this . syncStreamImplementation = undefined ;
481+ return this . connectionManager . disconnect ( ) ;
469482 }
470483
471484 /**
@@ -512,7 +525,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
512525 await this . disconnect ( ) ;
513526 }
514527
515- await this . syncStreamImplementation ?. dispose ( ) ;
528+ await this . connectionManager . close ( ) ;
516529 await this . database . close ( ) ;
517530 this . closed = true ;
518531 }
0 commit comments