diff --git a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts index 5704a48b1..259a09604 100644 --- a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts +++ b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts @@ -58,6 +58,10 @@ export class OPSQLiteConnection extends BaseObserver { }); } + hasUpdates(): boolean { + return this.updateBuffer.length > 0; + } + flushUpdates() { if (!this.updateBuffer.length) { return; diff --git a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts index 60d4a4eeb..582fe2a41 100644 --- a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts +++ b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts @@ -1,4 +1,4 @@ -import { ANDROID_DATABASE_PATH, getDylibPath, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite'; +import { getDylibPath, open, type DB } from '@op-engineering/op-sqlite'; import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common'; import Lock from 'async-lock'; import { Platform } from 'react-native'; @@ -158,6 +158,10 @@ export class OPSQLiteDBAdapter extends BaseObserver implement async readLock(fn: (tx: OPSQLiteConnection) => Promise, options?: DBLockOptions): Promise { await this.initialized; + if (this.readConnections?.length == 0) { + // When opened with no read connections, use the write connection for reads. + return this.writeLock(fn, options); + } return new Promise(async (resolve, reject) => { const execute = async () => { // Find an available connection that is not busy @@ -231,11 +235,11 @@ export class OPSQLiteDBAdapter extends BaseObserver implement } readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.readLock((ctx) => this.internalTransaction(ctx, fn)); + return this.readLock((ctx) => this.internalTransaction(ctx, 'BEGIN', fn)); } writeTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.writeLock((ctx) => this.internalTransaction(ctx, fn)); + return this.writeLock((ctx) => this.internalTransaction(ctx, 'BEGIN EXCLUSIVE', fn)); } getAll(sql: string, parameters?: any[]): Promise { @@ -264,6 +268,7 @@ export class OPSQLiteDBAdapter extends BaseObserver implement protected async internalTransaction( connection: OPSQLiteConnection, + transactionType: 'BEGIN' | 'BEGIN EXCLUSIVE', fn: (tx: Transaction) => Promise ): Promise { let finalized = false; @@ -282,7 +287,7 @@ export class OPSQLiteDBAdapter extends BaseObserver implement return connection.execute('ROLLBACK'); }; try { - await connection.execute('BEGIN'); + await connection.execute(transactionType); const result = await fn({ execute: (query, params) => connection.execute(query, params), executeRaw: (query, params) => connection.executeRaw(query, params), diff --git a/packages/powersync-op-sqlite/src/db/OPSqliteDBOpenFactory.ts b/packages/powersync-op-sqlite/src/db/OPSqliteDBOpenFactory.ts index e6527887d..98349d18d 100644 --- a/packages/powersync-op-sqlite/src/db/OPSqliteDBOpenFactory.ts +++ b/packages/powersync-op-sqlite/src/db/OPSqliteDBOpenFactory.ts @@ -1,12 +1,15 @@ import { DBAdapter, SQLOpenFactory, SQLOpenOptions } from '@powersync/common'; import { OPSQLiteDBAdapter } from './OPSqliteAdapter'; import { DEFAULT_SQLITE_OPTIONS, SqliteOptions } from './SqliteOptions'; +import { OPSQLiteConnection } from './OPSQLiteConnection'; +import { DB } from '@op-engineering/op-sqlite'; export interface OPSQLiteOpenFactoryOptions extends SQLOpenOptions { sqliteOptions?: SqliteOptions; } export class OPSqliteOpenFactory implements SQLOpenFactory { private sqliteOptions: Required; + private adapters: Set = new Set(); constructor(protected options: OPSQLiteOpenFactoryOptions) { this.sqliteOptions = { @@ -16,10 +19,85 @@ export class OPSqliteOpenFactory implements SQLOpenFactory { } openDB(): DBAdapter { - return new OPSQLiteDBAdapter({ + const adapter = new OPSQLiteDBAdapter({ name: this.options.dbFilename, dbLocation: this.options.dbLocation, sqliteOptions: this.sqliteOptions }); + this.adapters.add(adapter); + (adapter as any).abortController.signal.addEventListener('abort', () => { + this.adapters.delete(adapter); + }); + + return adapter; + } + + /** + * Opens a direct op-sqlite DB connection. This can be used concurrently with PowerSyncDatabase. + * + * This can be used to execute synchronous queries, or to access other op-sqlite functionality directly. + * + * Update notifications are propagated to any other PowerSyncDatabase opened with this factory. + * + * If a write statement or transaction is currently open on any of the other adapters, any + * write statements on this connection will block until the others are done. This may create a deadlock, + * since this also blocks the JavaScript thread. For that reason, do any write statements in a + * writeLock() on the PowerSyncDatabase. + * + * Read statements can execute concurrently with write statements, so does not have the same risk. + * + * This is not recommended for most use cases, as synchronous queries block the JavaScript thread, + * and the code is not portable to other platforms. + */ + async openDirectConnection(): Promise { + const adapter = new OPSQLiteDBAdapter({ + name: this.options.dbFilename, + dbLocation: this.options.dbLocation, + sqliteOptions: { + ...this.sqliteOptions, + readConnections: 0, + // Configure the BUSY_TIMEOUT to be very short, since this is a direct connection. + // In general, we should not wait for a lock when using any synchronous queries, + // since any locks won't be released while we lock the JS thread. + lockTimeoutMs: 50 + } + }); + await (adapter as any).initialized; + + const connection = (adapter as any).writeConnection as OPSQLiteConnection; + connection.registerListener({ + tablesUpdated: (updateNotification) => { + // Pass on to all other adapters. + this.adapters.forEach((adapter) => { + adapter.iterateListeners((listener) => { + listener.tablesUpdated?.(updateNotification); + }); + }); + } + }); + const database = (connection as any).DB as DB; + + database.commitHook(() => { + // This is effectively a "pre-commit" hook, so changes may not actually reflect yet. + // To make sure the changes reflect, we first get start a new write transaction (not just a + // write lock, since we need to get a lock on the actual SQLite file). + const firstAdapter = [...this.adapters][0]; + if (firstAdapter != null && connection.hasUpdates()) { + firstAdapter + .writeLock(async (tx) => { + // Slightly less overhead than writeTransaction(). + await tx.execute('BEGIN EXCLUSIVE; ROLLBACK;'); + }) + .catch((e) => { + // Ignore + }) + .finally(() => { + // This triggers the listeners registered above + connection.flushUpdates(); + }); + } + }); + + return database; } } diff --git a/packages/powersync-op-sqlite/src/db/SqliteOptions.ts b/packages/powersync-op-sqlite/src/db/SqliteOptions.ts index 9b962ef0e..c27173ccb 100644 --- a/packages/powersync-op-sqlite/src/db/SqliteOptions.ts +++ b/packages/powersync-op-sqlite/src/db/SqliteOptions.ts @@ -53,6 +53,11 @@ export interface SqliteOptions { path: string; entryPoint?: string; }>; + + /** + * Number of read-only connections to use. Defaults to 5. + */ + readConnections?: number; } export enum TemporaryStorageOption { @@ -89,5 +94,6 @@ export const DEFAULT_SQLITE_OPTIONS: Required = { temporaryStorage: TemporaryStorageOption.MEMORY, lockTimeoutMs: 30000, encryptionKey: null, + readConnections: 5, extensions: [] };