Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
});
}

hasUpdates(): boolean {
return this.updateBuffer.length > 0;
}

flushUpdates() {
if (!this.updateBuffer.length) {
return;
Expand Down
13 changes: 9 additions & 4 deletions packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ANDROID_DATABASE_PATH, getDylibPath, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite';
import { getDylibPath, open, type DB } from '@op-engineering/op-sqlite';
import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common';
import Lock from 'async-lock';
import { Platform } from 'react-native';
Expand Down Expand Up @@ -158,6 +158,10 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement

async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
await this.initialized;
if (this.readConnections?.length == 0) {
// When opened with no read connections, use the write connection for reads.
return this.writeLock(fn, options);
}
return new Promise(async (resolve, reject) => {
const execute = async () => {
// Find an available connection that is not busy
Expand Down Expand Up @@ -231,11 +235,11 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
}

readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.readLock((ctx) => this.internalTransaction(ctx, fn));
return this.readLock((ctx) => this.internalTransaction(ctx, 'BEGIN', fn));
}

writeTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.writeLock((ctx) => this.internalTransaction(ctx, fn));
return this.writeLock((ctx) => this.internalTransaction(ctx, 'BEGIN EXCLUSIVE', fn));
}

getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
Expand Down Expand Up @@ -264,6 +268,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement

protected async internalTransaction<T>(
connection: OPSQLiteConnection,
transactionType: 'BEGIN' | 'BEGIN EXCLUSIVE',
fn: (tx: Transaction) => Promise<T>
): Promise<T> {
let finalized = false;
Expand All @@ -282,7 +287,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
return connection.execute('ROLLBACK');
};
try {
await connection.execute('BEGIN');
await connection.execute(transactionType);
const result = await fn({
execute: (query, params) => connection.execute(query, params),
executeRaw: (query, params) => connection.executeRaw(query, params),
Expand Down
80 changes: 79 additions & 1 deletion packages/powersync-op-sqlite/src/db/OPSqliteDBOpenFactory.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { DBAdapter, SQLOpenFactory, SQLOpenOptions } from '@powersync/common';
import { OPSQLiteDBAdapter } from './OPSqliteAdapter';
import { DEFAULT_SQLITE_OPTIONS, SqliteOptions } from './SqliteOptions';
import { OPSQLiteConnection } from './OPSQLiteConnection';
import { DB } from '@op-engineering/op-sqlite';

export interface OPSQLiteOpenFactoryOptions extends SQLOpenOptions {
sqliteOptions?: SqliteOptions;
}
export class OPSqliteOpenFactory implements SQLOpenFactory {
private sqliteOptions: Required<SqliteOptions>;
private adapters: Set<OPSQLiteDBAdapter> = new Set();

constructor(protected options: OPSQLiteOpenFactoryOptions) {
this.sqliteOptions = {
Expand All @@ -16,10 +19,85 @@ export class OPSqliteOpenFactory implements SQLOpenFactory {
}

openDB(): DBAdapter {
return new OPSQLiteDBAdapter({
const adapter = new OPSQLiteDBAdapter({
name: this.options.dbFilename,
dbLocation: this.options.dbLocation,
sqliteOptions: this.sqliteOptions
});
this.adapters.add(adapter);
(adapter as any).abortController.signal.addEventListener('abort', () => {
this.adapters.delete(adapter);
});

return adapter;
}

/**
* Opens a direct op-sqlite DB connection. This can be used concurrently with PowerSyncDatabase.
*
* This can be used to execute synchronous queries, or to access other op-sqlite functionality directly.
*
* Update notifications are propagated to any other PowerSyncDatabase opened with this factory.
*
* If a write statement or transaction is currently open on any of the other adapters, any
* write statements on this connection will block until the others are done. This may create a deadlock,
* since this also blocks the JavaScript thread. For that reason, do any write statements in a
* writeLock() on the PowerSyncDatabase.
*
* Read statements can execute concurrently with write statements, so does not have the same risk.
*
* This is not recommended for most use cases, as synchronous queries block the JavaScript thread,
* and the code is not portable to other platforms.
*/
async openDirectConnection(): Promise<DB> {
const adapter = new OPSQLiteDBAdapter({
name: this.options.dbFilename,
dbLocation: this.options.dbLocation,
sqliteOptions: {
...this.sqliteOptions,
readConnections: 0,
// Configure the BUSY_TIMEOUT to be very short, since this is a direct connection.
// In general, we should not wait for a lock when using any synchronous queries,
// since any locks won't be released while we lock the JS thread.
lockTimeoutMs: 50
}
});
await (adapter as any).initialized;

const connection = (adapter as any).writeConnection as OPSQLiteConnection;
connection.registerListener({
tablesUpdated: (updateNotification) => {
// Pass on to all other adapters.
this.adapters.forEach((adapter) => {
adapter.iterateListeners((listener) => {
listener.tablesUpdated?.(updateNotification);
});
});
}
});
const database = (connection as any).DB as DB;

database.commitHook(() => {
// This is effectively a "pre-commit" hook, so changes may not actually reflect yet.
// To make sure the changes reflect, we first get start a new write transaction (not just a
// write lock, since we need to get a lock on the actual SQLite file).
const firstAdapter = [...this.adapters][0];
if (firstAdapter != null && connection.hasUpdates()) {
firstAdapter
.writeLock(async (tx) => {
// Slightly less overhead than writeTransaction().
await tx.execute('BEGIN EXCLUSIVE; ROLLBACK;');
})
.catch((e) => {
// Ignore
})
.finally(() => {
// This triggers the listeners registered above
connection.flushUpdates();
});
}
});

return database;
}
}
6 changes: 6 additions & 0 deletions packages/powersync-op-sqlite/src/db/SqliteOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ export interface SqliteOptions {
path: string;
entryPoint?: string;
}>;

/**
* Number of read-only connections to use. Defaults to 5.
*/
readConnections?: number;
}

export enum TemporaryStorageOption {
Expand Down Expand Up @@ -89,5 +94,6 @@ export const DEFAULT_SQLITE_OPTIONS: Required<SqliteOptions> = {
temporaryStorage: TemporaryStorageOption.MEMORY,
lockTimeoutMs: 30000,
encryptionKey: null,
readConnections: 5,
extensions: []
};