From 5626a0d0af201d948dab78ded29412c8fb3dd785 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 3 Jul 2025 22:23:16 +0100 Subject: [PATCH 01/11] First pass --- .../storage/sqlite/core/connections/index.ts | 47 ++++++- .../src/storage/sqlite/core/pool/pool.ts | 5 + .../storage/sqlite/core/transactions/index.ts | 42 +++++- .../transactions/transactions.int.spec.ts | 130 ++++++++++++++++++ 4 files changed, 216 insertions(+), 8 deletions(-) create mode 100644 src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts diff --git a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts index 733dcfe0..652a0ad9 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts @@ -46,6 +46,7 @@ export type SQLitePoolConnectionOptions< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, > = { connector: ConnectorType; + transactionCounter: TransactionNestingCounter; type: 'PoolClient'; connect: Promise; close: (client: SQLitePoolClient) => Promise; @@ -55,6 +56,7 @@ export type SQLiteClientConnectionOptions< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, > = { connector: ConnectorType; + transactionCounter: TransactionNestingCounter; type: 'Client'; connect: Promise; close: (client: SQLiteClient) => Promise; @@ -85,25 +87,62 @@ export const sqliteClientConnection = < connector: options.connector, connect, close, - initTransaction: (connection) => - sqliteTransaction(options.connector, connection), + initTransaction: (connection) => { + return sqliteTransaction( + options.connector, + connection, + options.transactionCounter, + ); + }, executor: () => sqliteSQLExecutor(options.connector), }); }; +export type TransactionNestingCounter = { + increment: () => void; + decrement: () => void; + reset: () => void; + level: number; +}; + +export const transactionNestingCounter = (): TransactionNestingCounter => { + let transactionLevel = 0; + + return { + reset: () => { + transactionLevel = 0; + }, + increment: () => { + console.log('incrementing'); + transactionLevel++; + console.log('Transaction level incremented to', transactionLevel); + }, + decrement: () => { + transactionLevel--; + + if (transactionLevel < 0) { + throw new Error('Transaction level is out of bounds'); + } + }, + get level() { + return transactionLevel; + }, + }; +}; + export const sqlitePoolClientConnection = < ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, >( options: SQLitePoolConnectionOptions, ): SQLitePoolClientConnection => { - const { connect, close } = options; + const { connect, close, transactionCounter } = options; return createConnection({ connector: options.connector, connect, close, initTransaction: (connection) => - sqliteTransaction(options.connector, connection), + sqliteTransaction(options.connector, connection, transactionCounter), executor: () => sqliteSQLExecutor(options.connector), }); }; diff --git a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts index 20d8ee75..c035224d 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts @@ -3,6 +3,7 @@ import { sqliteClientProvider, sqliteConnection, SQLiteConnectionString, + transactionNestingCounter, type SQLiteClient, type SQLiteClientConnection, type SQLiteConnectorType, @@ -64,6 +65,7 @@ export const sqliteSingletonClientPool = < let connection: SQLiteClientConnection | undefined = undefined; const getConnection = () => { + const transactionCounter = transactionNestingCounter(); if (connection) return connection; const connect = sqliteClientProvider(connector).then( @@ -80,6 +82,7 @@ export const sqliteSingletonClientPool = < connector, type: 'Client', connect, + transactionCounter, close: () => Promise.resolve(), })); }; @@ -110,6 +113,7 @@ export const sqliteAlwaysNewClientPool = < return createConnectionPool({ connector: connector, getConnection: () => { + const transactionCounter = transactionNestingCounter(); const connect = sqliteClientProvider(connector).then( async (sqliteClient) => { const client = sqliteClient(options); @@ -124,6 +128,7 @@ export const sqliteAlwaysNewClientPool = < connector, type: 'Client', connect, + transactionCounter, close: (client) => client.close(), }); }, diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts index 54c533d0..ea1e2e1b 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts @@ -5,7 +5,10 @@ import { type DatabaseTransaction, } from '../../../../core'; import { sqliteSQLExecutor } from '../../core/execute'; -import type { SQLiteClientOrPoolClient } from '../connections'; +import type { + SQLiteClientOrPoolClient, + TransactionNestingCounter, +} from '../connections'; export type SQLiteTransaction< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, @@ -18,6 +21,7 @@ export const sqliteTransaction = >( connector: ConnectorType, connection: () => Connection, + transactionCounter: TransactionNestingCounter, ) => ( getClient: Promise, @@ -25,19 +29,49 @@ export const sqliteTransaction = ): DatabaseTransaction => ({ connection: connection(), connector, - begin: async () => { + begin: async function () { const client = await getClient; + + if (transactionCounter.level >= 1) { + try { + transactionCounter.increment(); + await client.query( + `SAVEPOINT transaction${transactionCounter.level}`, + ); + } catch (error) { + console.log('Rolling back begin commit'); + await this.rollback(error); + throw error; + } + return; + } + + transactionCounter.increment(); await client.query('BEGIN TRANSACTION'); }, - commit: async () => { + commit: async function () { const client = await getClient; + if (transactionCounter.level > 1) { + try { + await client.query(`RELEASE transaction${transactionCounter.level}`); + transactionCounter.decrement(); + } catch (error) { + console.log(error); + await this.rollback(error); + } + + return; + } + await client.query('COMMIT'); + transactionCounter.reset(); if (options?.close) await options?.close(client); }, - rollback: async (error?: unknown) => { + rollback: async function (error?: unknown) { const client = await getClient; + transactionCounter.reset(); await client.query('ROLLBACK'); if (options?.close) await options?.close(client, error); diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts new file mode 100644 index 00000000..db2c8811 --- /dev/null +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts @@ -0,0 +1,130 @@ +import assert from 'assert'; +import fs from 'fs'; +import { afterEach, describe, it } from 'node:test'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import { InMemorySQLiteDatabase, sqlitePool } from '..'; +import { rawSql } from '../../../../core'; + +void describe('SQLite Transactions', () => { + const inMemoryfileName: string = InMemorySQLiteDatabase; + + const testDatabasePath = path.resolve( + path.dirname(fileURLToPath(import.meta.url)), + ); + const fileName = path.resolve(testDatabasePath, 'test-transactions.db'); + + const testCases = [ + { testName: 'in-memory', fileName: inMemoryfileName }, + // { testName: 'file', fileName: fileName }, + ]; + + afterEach(() => { + if (!fs.existsSync(fileName)) { + return; + } + try { + fs.unlinkSync(fileName); + } catch (error) { + console.log('Error deleting file:', error); + } + }); + + for (const { testName, fileName } of testCases) { + void describe(`transactions with ${testName} database`, () => { + void it('commits a nested transaction with pool', async () => { + const pool = sqlitePool({ connector: 'SQLite:sqlite3', fileName }); + const connection = await pool.connection(); + + try { + await connection.execute.query( + rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), + ); + + const result = await connection.withTransaction<{ + id: null | string; + }>(async () => { + await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', + ), + ); + + const result = await connection.withTransaction<{ + id: null | string; + }>(async () => { + const result = await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + ), + ); + return { success: true, result: result.rows[0]?.id ?? null }; + }); + + return { success: true, result: result }; + }); + + assert.strictEqual(result, 1); + + const rows = await connection.execute.query( + rawSql('SELECT COUNT(*) as count FROM test_table'), + ); + + assert.strictEqual(rows.rows[0].count, 2); + } finally { + await connection.close(); + await pool.close(); + } + }); + void it('commits a nested transaction with singleton pool', async () => { + const pool = sqlitePool({ + connector: 'SQLite:sqlite3', + fileName, + singleton: true, + }); + const connection = await pool.connection(); + const connection2 = await pool.connection(); + + try { + await connection.execute.query( + rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), + ); + + const result = await connection.withTransaction<{ + id: null | string; + }>(async () => { + await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', + ), + ); + + const result = await connection2.withTransaction<{ + id: null | string; + }>(async () => { + const result = await connection2.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + ), + ); + return { success: true, result: result.rows[0]?.id ?? null }; + }); + + return { success: true, result: result }; + }); + + assert.strictEqual(result, 1); + + const rows = await connection.execute.query( + rawSql('SELECT COUNT(*) as count FROM test_table'), + ); + + assert.strictEqual(rows.rows[0].count, 2); + } finally { + await connection.close(); + await pool.close(); + } + }); + }); + } +}); From 64454d91b9449465d061664a45cff1e7c2a18905 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 4 Jul 2025 22:29:28 +0100 Subject: [PATCH 02/11] Adjusted the methods inside of begin and commit, and let the transaction handle the try catch. Added decrement for rollback so it only gets call at the very end --- .../storage/sqlite/core/transactions/index.ts | 27 ++--- .../transactions/transactions.int.spec.ts | 107 ++++++++++++++++++ 2 files changed, 116 insertions(+), 18 deletions(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts index ea1e2e1b..c8dd0091 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts @@ -33,16 +33,8 @@ export const sqliteTransaction = const client = await getClient; if (transactionCounter.level >= 1) { - try { - transactionCounter.increment(); - await client.query( - `SAVEPOINT transaction${transactionCounter.level}`, - ); - } catch (error) { - console.log('Rolling back begin commit'); - await this.rollback(error); - throw error; - } + transactionCounter.increment(); + await client.query(`SAVEPOINT transaction${transactionCounter.level}`); return; } @@ -53,13 +45,8 @@ export const sqliteTransaction = const client = await getClient; if (transactionCounter.level > 1) { - try { - await client.query(`RELEASE transaction${transactionCounter.level}`); - transactionCounter.decrement(); - } catch (error) { - console.log(error); - await this.rollback(error); - } + await client.query(`RELEASE transaction${transactionCounter.level}`); + transactionCounter.decrement(); return; } @@ -71,7 +58,11 @@ export const sqliteTransaction = }, rollback: async function (error?: unknown) { const client = await getClient; - transactionCounter.reset(); + if (transactionCounter.level > 1) { + transactionCounter.decrement(); + return; + } + await client.query('ROLLBACK'); if (options?.close) await options?.close(client, error); diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts index db2c8811..ba61721f 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts @@ -125,6 +125,113 @@ void describe('SQLite Transactions', () => { await pool.close(); } }); + + void it('transactions errors inside the nested inner transaction for a singleton should try catch and roll back everything', async () => { + const pool = sqlitePool({ + connector: 'SQLite:sqlite3', + fileName, + singleton: true, + }); + const connection = await pool.connection(); + const connection2 = await pool.connection(); + + try { + await connection.execute.query( + rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), + ); + + try { + const result = await connection.withTransaction<{ + id: null | string; + }>(async () => { + await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', + ), + ); + + const result = await connection2.withTransaction<{ + id: null | string; + }>(async () => { + throw new Error('Intentionally throwing'); + return { success: true, result: result.rows[0]?.id ?? null }; + }); + + return { success: true, result: result }; + }); + } catch (error) { + assert.strictEqual( + (error as Error).message, + 'Intentionally throwing', + ); + } + const rows = await connection.execute.query( + rawSql('SELECT COUNT(*) as count FROM test_table'), + ); + + assert.strictEqual(rows.rows[0].count, 0); + } finally { + await connection.close(); + await pool.close(); + } + }); + void it('transactions errors inside the outer transaction for a singleton should try catch and roll back everything', async () => { + const pool = sqlitePool({ + connector: 'SQLite:sqlite3', + fileName, + singleton: true, + }); + const connection = await pool.connection(); + const connection2 = await pool.connection(); + + try { + await connection.execute.query( + rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), + ); + + try { + await connection.withTransaction<{ + id: null | string; + }>(async () => { + await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + ), + ); + + const result = await connection2.withTransaction<{ + id: null | string; + }>(async () => { + const result = await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', + ), + ); + return { success: true, result: result.rows[0]?.id ?? null }; + }); + + throw new Error('Intentionally throwing'); + }); + } catch (error) { + // make sure the rror is the correct one. catch but let it continue so it doesnt trigger + // the outer errors + assert.strictEqual( + (error as Error).message, + 'Intentionally throwing', + ); + } + const rows = await connection.execute.query( + rawSql('SELECT COUNT(*) as count FROM test_table'), + ); + + console.log('rows', rows); + + assert.strictEqual(rows.rows[0].count, 0); + } finally { + await connection.close(); + await pool.close(); + } + }); }); } }); From 06792dc2a7c0e84e6d8acf16ef09dcd264f7ffd4 Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 6 Jul 2025 21:44:32 +0100 Subject: [PATCH 03/11] Added ability to enable transaction nesting --- .../storage/sqlite/core/connections/index.ts | 20 ++- .../src/storage/sqlite/core/pool/pool.ts | 25 +++- .../storage/sqlite/core/transactions/index.ts | 37 ++++-- .../transactions/transactions.int.spec.ts | 117 +++++++++++++++++- 4 files changed, 178 insertions(+), 21 deletions(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts index 652a0ad9..bdf6af24 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts @@ -47,6 +47,7 @@ export type SQLitePoolConnectionOptions< > = { connector: ConnectorType; transactionCounter: TransactionNestingCounter; + allowNestedTransactions: boolean; type: 'PoolClient'; connect: Promise; close: (client: SQLitePoolClient) => Promise; @@ -57,6 +58,7 @@ export type SQLiteClientConnectionOptions< > = { connector: ConnectorType; transactionCounter: TransactionNestingCounter; + allowNestedTransactions: boolean; type: 'Client'; connect: Promise; close: (client: SQLiteClient) => Promise; @@ -81,7 +83,8 @@ export const sqliteClientConnection = < >( options: SQLiteClientConnectionOptions, ): SQLiteClientConnection => { - const { connect, close } = options; + const { connect, close, allowNestedTransactions, transactionCounter } = + options; return createConnection({ connector: options.connector, @@ -91,7 +94,8 @@ export const sqliteClientConnection = < return sqliteTransaction( options.connector, connection, - options.transactionCounter, + transactionCounter, + allowNestedTransactions, ); }, executor: () => sqliteSQLExecutor(options.connector), @@ -135,14 +139,20 @@ export const sqlitePoolClientConnection = < >( options: SQLitePoolConnectionOptions, ): SQLitePoolClientConnection => { - const { connect, close, transactionCounter } = options; + const { connect, close, transactionCounter, allowNestedTransactions } = + options; return createConnection({ connector: options.connector, connect, close, initTransaction: (connection) => - sqliteTransaction(options.connector, connection, transactionCounter), + sqliteTransaction( + options.connector, + connection, + transactionCounter, + allowNestedTransactions ?? false, + ), executor: () => sqliteSQLExecutor(options.connector), }); }; @@ -152,11 +162,13 @@ export function sqliteConnection< >( options: SQLitePoolConnectionOptions, ): SQLitePoolClientConnection; + export function sqliteConnection< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, >( options: SQLiteClientConnectionOptions, ): SQLiteClientConnection; + export function sqliteConnection< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, >( diff --git a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts index c035224d..0067730a 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts @@ -41,6 +41,7 @@ export const sqliteAmbientConnectionPool = < connection: | SQLitePoolClientConnection | SQLiteClientConnection; + allowNestedTransactions: boolean; }): SQLiteAmbientConnectionPool => { const { connection, connector: connectorType } = options; @@ -59,6 +60,7 @@ export const sqliteSingletonClientPool = < options: { connector: ConnectorType; database?: string | undefined; + allowNestedTransactions?: boolean; } & SQLiteFileNameOrConnectionString, ): SQLiteAmbientClientPool => { const { connector } = options; @@ -83,6 +85,7 @@ export const sqliteSingletonClientPool = < type: 'Client', connect, transactionCounter, + allowNestedTransactions: options.allowNestedTransactions ?? false, close: () => Promise.resolve(), })); }; @@ -140,9 +143,11 @@ export const sqliteAmbientClientPool = < >(options: { connector: ConnectorType; client: SQLiteClient; + allowNestedTransactions: boolean; }): SQLiteAmbientClientPool => { - const { client, connector } = options; + const { client, connector, allowNestedTransactions } = options; + const transactionCounter = transactionNestingCounter(); const getConnection = () => { const connect = Promise.resolve(client); @@ -150,6 +155,8 @@ export const sqliteAmbientClientPool = < connector, type: 'Client', connect, + transactionCounter, + allowNestedTransactions, close: () => Promise.resolve(), }); }; @@ -183,6 +190,7 @@ export type SQLitePoolPooledOptions< connector: ConnectorType; pooled?: true; singleton?: boolean; + allowNestedTransactions?: boolean; }; export type SQLitePoolNotPooledOptions< @@ -193,11 +201,13 @@ export type SQLitePoolNotPooledOptions< pooled?: false; client: SQLiteClient; singleton?: true; + allowNestedTransactions?: boolean; } | { connector: ConnectorType; pooled?: boolean; singleton?: boolean; + allowNestedTransactions?: boolean; } | { connector: ConnectorType; @@ -206,6 +216,7 @@ export type SQLitePoolNotPooledOptions< | SQLiteClientConnection; pooled?: false; singleton?: true; + allowNestedTransactions?: boolean; }; export type SQLitePoolOptions< @@ -223,6 +234,7 @@ export function sqlitePool< options: SQLitePoolNotPooledOptions & SQLiteFileNameOrConnectionString, ): SQLiteAmbientClientPool; + export function sqlitePool< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, >( @@ -236,12 +248,17 @@ export function sqlitePool< // setSQLiteTypeParser(serializer ?? JSONSerializer); if ('client' in options && options.client) - return sqliteAmbientClientPool({ connector, client: options.client }); + return sqliteAmbientClientPool({ + connector, + client: options.client, + allowNestedTransactions: options.allowNestedTransactions ?? false, + }); if ('connection' in options && options.connection) return sqliteAmbientConnectionPool({ connector, connection: options.connection, + allowNestedTransactions: options.allowNestedTransactions ?? false, }); if ( @@ -249,7 +266,9 @@ export function sqlitePool< options.fileName === InMemorySQLiteDatabase || options.connectionString === InMemorySQLiteDatabase ) - return sqliteSingletonClientPool(options); + return sqliteSingletonClientPool({ + ...options, + }); return sqliteAlwaysNewClientPool(options); } diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts index c8dd0091..6d451f6f 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts @@ -22,6 +22,7 @@ export const sqliteTransaction = connector: ConnectorType, connection: () => Connection, transactionCounter: TransactionNestingCounter, + allowNestedTransactions: boolean, ) => ( getClient: Promise, @@ -32,35 +33,45 @@ export const sqliteTransaction = begin: async function () { const client = await getClient; - if (transactionCounter.level >= 1) { + if (allowNestedTransactions) { + if (transactionCounter.level >= 1) { + transactionCounter.increment(); + await client.query( + `SAVEPOINT transaction${transactionCounter.level}`, + ); + return; + } + transactionCounter.increment(); - await client.query(`SAVEPOINT transaction${transactionCounter.level}`); - return; } - transactionCounter.increment(); await client.query('BEGIN TRANSACTION'); }, commit: async function () { const client = await getClient; - if (transactionCounter.level > 1) { - await client.query(`RELEASE transaction${transactionCounter.level}`); - transactionCounter.decrement(); + if (allowNestedTransactions) { + if (transactionCounter.level > 1) { + await client.query(`RELEASE transaction${transactionCounter.level}`); + transactionCounter.decrement(); - return; - } + return; + } + transactionCounter.reset(); + } await client.query('COMMIT'); - transactionCounter.reset(); if (options?.close) await options?.close(client); }, rollback: async function (error?: unknown) { const client = await getClient; - if (transactionCounter.level > 1) { - transactionCounter.decrement(); - return; + + if (allowNestedTransactions) { + if (transactionCounter.level > 1) { + transactionCounter.decrement(); + return; + } } await client.query('ROLLBACK'); diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts index ba61721f..d183661f 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts @@ -33,7 +33,11 @@ void describe('SQLite Transactions', () => { for (const { testName, fileName } of testCases) { void describe(`transactions with ${testName} database`, () => { void it('commits a nested transaction with pool', async () => { - const pool = sqlitePool({ connector: 'SQLite:sqlite3', fileName }); + const pool = sqlitePool({ + connector: 'SQLite:sqlite3', + fileName, + allowNestedTransactions: true, + }); const connection = await pool.connection(); try { @@ -76,11 +80,120 @@ void describe('SQLite Transactions', () => { await pool.close(); } }); + + void it('should try catch and roll back everything when the inner transaction errors for a pooled connection', async () => { + const pool = sqlitePool({ + connector: 'SQLite:sqlite3', + fileName, + allowNestedTransactions: true, + }); + const connection = await pool.connection(); + const connection2 = await pool.connection(); + + try { + await connection.execute.query( + rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), + ); + + try { + const result = await connection.withTransaction<{ + id: null | string; + }>(async () => { + await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', + ), + ); + + const result = await connection2.withTransaction<{ + id: null | string; + }>(async () => { + throw new Error('Intentionally throwing'); + return { success: true, result: result.rows[0]?.id ?? null }; + }); + + return { success: true, result: result }; + }); + } catch (error) { + assert.strictEqual( + (error as Error).message, + 'Intentionally throwing', + ); + } + const rows = await connection.execute.query( + rawSql('SELECT COUNT(*) as count FROM test_table'), + ); + + assert.strictEqual(rows.rows[0].count, 0); + } finally { + await connection.close(); + await pool.close(); + } + }); + void it('should try catch and roll back everything when the outer transactions errors for a pooled connection', async () => { + const pool = sqlitePool({ + connector: 'SQLite:sqlite3', + fileName, + allowNestedTransactions: true, + }); + const connection = await pool.connection(); + const connection2 = await pool.connection(); + + try { + await connection.execute.query( + rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), + ); + + try { + await connection.withTransaction<{ + id: null | string; + }>(async () => { + await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + ), + ); + + const result = await connection2.withTransaction<{ + id: null | string; + }>(async () => { + const result = await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', + ), + ); + return { success: true, result: result.rows[0]?.id ?? null }; + }); + + throw new Error('Intentionally throwing'); + }); + } catch (error) { + // make sure the rror is the correct one. catch but let it continue so it doesnt trigger + // the outer errors + assert.strictEqual( + (error as Error).message, + 'Intentionally throwing', + ); + } + const rows = await connection.execute.query( + rawSql('SELECT COUNT(*) as count FROM test_table'), + ); + + console.log('rows', rows); + + assert.strictEqual(rows.rows[0].count, 0); + } finally { + await connection.close(); + await pool.close(); + } + }); + void it('commits a nested transaction with singleton pool', async () => { const pool = sqlitePool({ connector: 'SQLite:sqlite3', fileName, singleton: true, + allowNestedTransactions: true, }); const connection = await pool.connection(); const connection2 = await pool.connection(); @@ -131,6 +244,7 @@ void describe('SQLite Transactions', () => { connector: 'SQLite:sqlite3', fileName, singleton: true, + allowNestedTransactions: true, }); const connection = await pool.connection(); const connection2 = await pool.connection(); @@ -180,6 +294,7 @@ void describe('SQLite Transactions', () => { connector: 'SQLite:sqlite3', fileName, singleton: true, + allowNestedTransactions: true, }); const connection = await pool.connection(); const connection2 = await pool.connection(); From 6bf5faa3645f297351b9c2f974b0ac01b7fbfa79 Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 6 Jul 2025 21:51:56 +0100 Subject: [PATCH 04/11] Removed redundant parameter --- src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts index 0067730a..b238145f 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts @@ -41,7 +41,6 @@ export const sqliteAmbientConnectionPool = < connection: | SQLitePoolClientConnection | SQLiteClientConnection; - allowNestedTransactions: boolean; }): SQLiteAmbientConnectionPool => { const { connection, connector: connectorType } = options; @@ -258,7 +257,6 @@ export function sqlitePool< return sqliteAmbientConnectionPool({ connector, connection: options.connection, - allowNestedTransactions: options.allowNestedTransactions ?? false, }); if ( @@ -266,9 +264,7 @@ export function sqlitePool< options.fileName === InMemorySQLiteDatabase || options.connectionString === InMemorySQLiteDatabase ) - return sqliteSingletonClientPool({ - ...options, - }); + return sqliteSingletonClientPool(options); return sqliteAlwaysNewClientPool(options); } From cf57ad9fe19d3cae257bc3927724510987dda297 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 8 Jul 2025 20:53:43 +0100 Subject: [PATCH 05/11] Removed console.logs --- src/packages/dumbo/src/storage/sqlite/core/connections/index.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts index bdf6af24..4e6e2020 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts @@ -117,9 +117,7 @@ export const transactionNestingCounter = (): TransactionNestingCounter => { transactionLevel = 0; }, increment: () => { - console.log('incrementing'); transactionLevel++; - console.log('Transaction level incremented to', transactionLevel); }, decrement: () => { transactionLevel--; From 9b11789f48ffba62cbc983d6737ef5679a26fb70 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 8 Jul 2025 22:24:04 +0100 Subject: [PATCH 06/11] fixing types --- .../src/storage/sqlite/core/pool/pool.ts | 1 + .../transactions/transactions.int.spec.ts | 109 ++++++++---------- 2 files changed, 47 insertions(+), 63 deletions(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts index b238145f..24e8ec85 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts @@ -131,6 +131,7 @@ export const sqliteAlwaysNewClientPool = < type: 'Client', connect, transactionCounter, + allowNestedTransactions: false, close: (client) => client.close(), }); }, diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts index d183661f..22fa3c06 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts @@ -45,27 +45,25 @@ void describe('SQLite Transactions', () => { rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), ); - const result = await connection.withTransaction<{ - id: null | string; - }>(async () => { + const result = await connection.withTransaction(async () => { await connection.execute.query( rawSql( 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', ), ); - const result = await connection.withTransaction<{ - id: null | string; - }>(async () => { - const result = await connection.execute.query( - rawSql( - 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', - ), - ); - return { success: true, result: result.rows[0]?.id ?? null }; - }); + const result = await connection.withTransaction( + async () => { + const result = await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + ), + ); + return (result.rows[0]?.id as number) ?? null; + }, + ); - return { success: true, result: result }; + return result; }); assert.strictEqual(result, 1); @@ -74,7 +72,7 @@ void describe('SQLite Transactions', () => { rawSql('SELECT COUNT(*) as count FROM test_table'), ); - assert.strictEqual(rows.rows[0].count, 2); + assert.strictEqual(rows.rows[0]?.count, 2); } finally { await connection.close(); await pool.close(); @@ -96,23 +94,16 @@ void describe('SQLite Transactions', () => { ); try { - const result = await connection.withTransaction<{ - id: null | string; - }>(async () => { + await connection.withTransaction(async () => { await connection.execute.query( rawSql( 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', ), ); - const result = await connection2.withTransaction<{ - id: null | string; - }>(async () => { + await connection2.withTransaction(() => { throw new Error('Intentionally throwing'); - return { success: true, result: result.rows[0]?.id ?? null }; }); - - return { success: true, result: result }; }); } catch (error) { assert.strictEqual( @@ -124,7 +115,7 @@ void describe('SQLite Transactions', () => { rawSql('SELECT COUNT(*) as count FROM test_table'), ); - assert.strictEqual(rows.rows[0].count, 0); + assert.strictEqual(rows.rows[0]?.count, 0); } finally { await connection.close(); await pool.close(); @@ -154,15 +145,13 @@ void describe('SQLite Transactions', () => { ), ); - const result = await connection2.withTransaction<{ - id: null | string; - }>(async () => { + await connection2.withTransaction(async () => { const result = await connection.execute.query( rawSql( 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', ), ); - return { success: true, result: result.rows[0]?.id ?? null }; + return (result.rows[0]?.id as number) ?? null; }); throw new Error('Intentionally throwing'); @@ -179,9 +168,7 @@ void describe('SQLite Transactions', () => { rawSql('SELECT COUNT(*) as count FROM test_table'), ); - console.log('rows', rows); - - assert.strictEqual(rows.rows[0].count, 0); + assert.strictEqual(rows.rows[0]?.count, 0); } finally { await connection.close(); await pool.close(); @@ -203,36 +190,36 @@ void describe('SQLite Transactions', () => { rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), ); - const result = await connection.withTransaction<{ - id: null | string; - }>(async () => { - await connection.execute.query( - rawSql( - 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', - ), - ); - - const result = await connection2.withTransaction<{ - id: null | string; - }>(async () => { - const result = await connection2.execute.query( + const result = await connection.withTransaction( + async () => { + await connection.execute.query( rawSql( - 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', ), ); - return { success: true, result: result.rows[0]?.id ?? null }; - }); - return { success: true, result: result }; - }); + const result = await connection2.withTransaction( + async () => { + const result = await connection2.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + ), + ); + return (result.rows[0]?.id as number) ?? null; + }, + ); + + return result; + }, + ); assert.strictEqual(result, 1); - const rows = await connection.execute.query( + const rows = await connection.execute.query<{ count: number }>( rawSql('SELECT COUNT(*) as count FROM test_table'), ); - assert.strictEqual(rows.rows[0].count, 2); + assert.strictEqual(rows.rows[0]?.count, 2); } finally { await connection.close(); await pool.close(); @@ -255,7 +242,7 @@ void describe('SQLite Transactions', () => { ); try { - const result = await connection.withTransaction<{ + await connection.withTransaction<{ id: null | string; }>(async () => { await connection.execute.query( @@ -266,9 +253,8 @@ void describe('SQLite Transactions', () => { const result = await connection2.withTransaction<{ id: null | string; - }>(async () => { + }>(() => { throw new Error('Intentionally throwing'); - return { success: true, result: result.rows[0]?.id ?? null }; }); return { success: true, result: result }; @@ -279,11 +265,12 @@ void describe('SQLite Transactions', () => { 'Intentionally throwing', ); } + const rows = await connection.execute.query( rawSql('SELECT COUNT(*) as count FROM test_table'), ); - assert.strictEqual(rows.rows[0].count, 0); + assert.strictEqual(rows.rows[0]?.count, 0); } finally { await connection.close(); await pool.close(); @@ -314,15 +301,13 @@ void describe('SQLite Transactions', () => { ), ); - const result = await connection2.withTransaction<{ - id: null | string; - }>(async () => { + await connection2.withTransaction(async () => { const result = await connection.execute.query( rawSql( 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', ), ); - return { success: true, result: result.rows[0]?.id ?? null }; + return (result.rows[0]?.id as number) ?? null; }); throw new Error('Intentionally throwing'); @@ -339,9 +324,7 @@ void describe('SQLite Transactions', () => { rawSql('SELECT COUNT(*) as count FROM test_table'), ); - console.log('rows', rows); - - assert.strictEqual(rows.rows[0].count, 0); + assert.strictEqual(rows.rows[0]?.count, 0); } finally { await connection.close(); await pool.close(); From 08ab7f495423d229743e3526934bc07c64ab20c5 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 18 Jul 2025 23:25:37 +0100 Subject: [PATCH 07/11] Removed a lot of redundant transaction counter instantiations --- .../src/storage/sqlite/core/connections/index.ts | 12 ++++++------ .../dumbo/src/storage/sqlite/core/pool/pool.ts | 7 ------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts index 4e6e2020..0ad890b6 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts @@ -18,6 +18,7 @@ export type SQLiteClient = { }; export type SQLitePoolClient = { + close(): Promise; release: () => void; command: (sql: string, values?: Parameters[]) => Promise; query: (sql: string, values?: Parameters[]) => Promise; @@ -46,7 +47,6 @@ export type SQLitePoolConnectionOptions< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, > = { connector: ConnectorType; - transactionCounter: TransactionNestingCounter; allowNestedTransactions: boolean; type: 'PoolClient'; connect: Promise; @@ -57,7 +57,6 @@ export type SQLiteClientConnectionOptions< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, > = { connector: ConnectorType; - transactionCounter: TransactionNestingCounter; allowNestedTransactions: boolean; type: 'Client'; connect: Promise; @@ -83,8 +82,9 @@ export const sqliteClientConnection = < >( options: SQLiteClientConnectionOptions, ): SQLiteClientConnection => { - const { connect, close, allowNestedTransactions, transactionCounter } = - options; + const { connect, close, allowNestedTransactions } = options; + + const transactionCounter = transactionNestingCounter(); return createConnection({ connector: options.connector, @@ -137,9 +137,9 @@ export const sqlitePoolClientConnection = < >( options: SQLitePoolConnectionOptions, ): SQLitePoolClientConnection => { - const { connect, close, transactionCounter, allowNestedTransactions } = - options; + const { connect, close, allowNestedTransactions } = options; + const transactionCounter = transactionNestingCounter(); return createConnection({ connector: options.connector, connect, diff --git a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts index 24e8ec85..95673542 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts @@ -3,7 +3,6 @@ import { sqliteClientProvider, sqliteConnection, SQLiteConnectionString, - transactionNestingCounter, type SQLiteClient, type SQLiteClientConnection, type SQLiteConnectorType, @@ -66,7 +65,6 @@ export const sqliteSingletonClientPool = < let connection: SQLiteClientConnection | undefined = undefined; const getConnection = () => { - const transactionCounter = transactionNestingCounter(); if (connection) return connection; const connect = sqliteClientProvider(connector).then( @@ -83,7 +81,6 @@ export const sqliteSingletonClientPool = < connector, type: 'Client', connect, - transactionCounter, allowNestedTransactions: options.allowNestedTransactions ?? false, close: () => Promise.resolve(), })); @@ -115,7 +112,6 @@ export const sqliteAlwaysNewClientPool = < return createConnectionPool({ connector: connector, getConnection: () => { - const transactionCounter = transactionNestingCounter(); const connect = sqliteClientProvider(connector).then( async (sqliteClient) => { const client = sqliteClient(options); @@ -130,7 +126,6 @@ export const sqliteAlwaysNewClientPool = < connector, type: 'Client', connect, - transactionCounter, allowNestedTransactions: false, close: (client) => client.close(), }); @@ -147,7 +142,6 @@ export const sqliteAmbientClientPool = < }): SQLiteAmbientClientPool => { const { client, connector, allowNestedTransactions } = options; - const transactionCounter = transactionNestingCounter(); const getConnection = () => { const connect = Promise.resolve(client); @@ -155,7 +149,6 @@ export const sqliteAmbientClientPool = < connector, type: 'Client', connect, - transactionCounter, allowNestedTransactions, close: () => Promise.resolve(), }); From 8ccc3026c296fc300eb70a2584ed0f3f567a1436 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 18 Jul 2025 23:57:39 +0100 Subject: [PATCH 08/11] Uncommented file based tests --- .../storage/sqlite/core/transactions/transactions.int.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts index 22fa3c06..150947c3 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts @@ -16,7 +16,7 @@ void describe('SQLite Transactions', () => { const testCases = [ { testName: 'in-memory', fileName: inMemoryfileName }, - // { testName: 'file', fileName: fileName }, + { testName: 'file', fileName: fileName }, ]; afterEach(() => { From e2c501b4f85165bad74db601e556b5b1093b67d1 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 18 Jul 2025 23:58:25 +0100 Subject: [PATCH 09/11] Added option parameter to sqliteAlwaysNewClientPool and allowNestedTransaction pass through --- src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts index 95673542..a3fb76aa 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts @@ -105,9 +105,10 @@ export const sqliteAlwaysNewClientPool = < options: { connector: ConnectorType; database?: string | undefined; + allowNestedTransactions?: boolean; } & SQLiteFileNameOrConnectionString, ): SQLiteAmbientClientPool => { - const { connector } = options; + const { connector, allowNestedTransactions } = options; return createConnectionPool({ connector: connector, @@ -126,7 +127,7 @@ export const sqliteAlwaysNewClientPool = < connector, type: 'Client', connect, - allowNestedTransactions: false, + allowNestedTransactions: allowNestedTransactions ?? false, close: (client) => client.close(), }); }, From 6c3b181c74b05441dcf60a524944c37604d48bb3 Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 19 Jul 2025 00:03:58 +0100 Subject: [PATCH 10/11] Added test to fail if nested transaction is attempted but is not enabled --- .../transactions/transactions.int.spec.ts | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts index 150947c3..d9542b3c 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/transactions.int.spec.ts @@ -78,6 +78,49 @@ void describe('SQLite Transactions', () => { await pool.close(); } }); + void it('should fail with an error if transaction nested is false', async () => { + const pool = sqlitePool({ + connector: 'SQLite:sqlite3', + fileName, + allowNestedTransactions: false, + }); + const connection = await pool.connection(); + + try { + await connection.execute.query( + rawSql('CREATE TABLE test_table (id INTEGER, value TEXT)'), + ); + + await connection.withTransaction(async () => { + await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (2, "test") RETURNING id', + ), + ); + + const result = await connection.withTransaction( + async () => { + const result = await connection.execute.query( + rawSql( + 'INSERT INTO test_table (id, value) VALUES (1, "test") RETURNING id', + ), + ); + return (result.rows[0]?.id as number) ?? null; + }, + ); + + return result; + }); + } catch (error) { + assert.strictEqual( + (error as Error).message, + 'SQLITE_ERROR: cannot start a transaction within a transaction', + ); + } finally { + await connection.close(); + await pool.close(); + } + }); void it('should try catch and roll back everything when the inner transaction errors for a pooled connection', async () => { const pool = sqlitePool({ From da677ee286a1fee3b05ebaba382051674556d3ea Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Tue, 29 Jul 2025 11:25:55 +0200 Subject: [PATCH 11/11] Ensured that there's a single active transaction within the connection --- .../dumbo/src/core/connections/connection.ts | 5 +- .../dumbo/src/core/connections/transaction.ts | 25 ++++- .../postgresql/pg/connections/transaction.ts | 16 ++- .../storage/sqlite/core/connections/index.ts | 19 +--- .../storage/sqlite/core/transactions/index.ts | 105 ++++++++++-------- 5 files changed, 94 insertions(+), 76 deletions(-) diff --git a/src/packages/dumbo/src/core/connections/connection.ts b/src/packages/dumbo/src/core/connections/connection.ts index c8e6cc85..dae9e618 100644 --- a/src/packages/dumbo/src/core/connections/connection.ts +++ b/src/packages/dumbo/src/core/connections/connection.ts @@ -45,7 +45,10 @@ export type CreateConnectionOptions< close: (client: DbClient) => Promise; initTransaction: ( connection: () => ConnectionType, - ) => (client: Promise) => DatabaseTransaction; + ) => ( + client: Promise, + options?: { close: (client: DbClient, error?: unknown) => Promise }, + ) => DatabaseTransaction; executor: () => Executor; }; diff --git a/src/packages/dumbo/src/core/connections/transaction.ts b/src/packages/dumbo/src/core/connections/transaction.ts index e6b6683e..7fefba1d 100644 --- a/src/packages/dumbo/src/core/connections/transaction.ts +++ b/src/packages/dumbo/src/core/connections/transaction.ts @@ -70,12 +70,27 @@ export const transactionFactoryWithDbClient = < connect: () => Promise, initTransaction: ( client: Promise, + options?: { close: (client: DbClient, error?: unknown) => Promise }, ) => DatabaseTransaction, -): DatabaseTransactionFactory => ({ - transaction: () => initTransaction(connect()), - withTransaction: (handle) => - executeInTransaction(initTransaction(connect()), handle), -}); +): DatabaseTransactionFactory => { + let currentTransaction: DatabaseTransaction | undefined = + undefined; + + const getOrInitCurrentTransaction = () => + currentTransaction ?? + (currentTransaction = initTransaction(connect(), { + close: () => { + currentTransaction = undefined; + return Promise.resolve(); + }, + })); + + return { + transaction: getOrInitCurrentTransaction, + withTransaction: (handle) => + executeInTransaction(getOrInitCurrentTransaction(), handle), + }; +}; const wrapInConnectionClosure = async < ConnectionType extends Connection = Connection, diff --git a/src/packages/dumbo/src/storage/postgresql/pg/connections/transaction.ts b/src/packages/dumbo/src/storage/postgresql/pg/connections/transaction.ts index 2755413b..9dfc16ce 100644 --- a/src/packages/dumbo/src/storage/postgresql/pg/connections/transaction.ts +++ b/src/packages/dumbo/src/storage/postgresql/pg/connections/transaction.ts @@ -30,15 +30,19 @@ export const nodePostgresTransaction = commit: async () => { const client = await getClient; - await client.query('COMMIT'); - - if (options?.close) await options?.close(client); + try { + await client.query('COMMIT'); + } finally { + if (options?.close) await options?.close(client); + } }, rollback: async (error?: unknown) => { const client = await getClient; - await client.query('ROLLBACK'); - - if (options?.close) await options?.close(client, error); + try { + await client.query('ROLLBACK'); + } finally { + if (options?.close) await options?.close(client, error); + } }, execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: () => getClient, diff --git a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts index 0ad890b6..70151e17 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/connections/index.ts @@ -18,7 +18,6 @@ export type SQLiteClient = { }; export type SQLitePoolClient = { - close(): Promise; release: () => void; command: (sql: string, values?: Parameters[]) => Promise; query: (sql: string, values?: Parameters[]) => Promise; @@ -47,20 +46,20 @@ export type SQLitePoolConnectionOptions< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, > = { connector: ConnectorType; - allowNestedTransactions: boolean; type: 'PoolClient'; connect: Promise; close: (client: SQLitePoolClient) => Promise; + allowNestedTransactions: boolean; }; export type SQLiteClientConnectionOptions< ConnectorType extends SQLiteConnectorType = SQLiteConnectorType, > = { connector: ConnectorType; - allowNestedTransactions: boolean; type: 'Client'; connect: Promise; close: (client: SQLiteClient) => Promise; + allowNestedTransactions: boolean; }; export type SQLiteClientConnection< @@ -84,20 +83,12 @@ export const sqliteClientConnection = < ): SQLiteClientConnection => { const { connect, close, allowNestedTransactions } = options; - const transactionCounter = transactionNestingCounter(); - return createConnection({ connector: options.connector, connect, close, - initTransaction: (connection) => { - return sqliteTransaction( - options.connector, - connection, - transactionCounter, - allowNestedTransactions, - ); - }, + initTransaction: (connection) => + sqliteTransaction(options.connector, connection, allowNestedTransactions), executor: () => sqliteSQLExecutor(options.connector), }); }; @@ -139,7 +130,6 @@ export const sqlitePoolClientConnection = < ): SQLitePoolClientConnection => { const { connect, close, allowNestedTransactions } = options; - const transactionCounter = transactionNestingCounter(); return createConnection({ connector: options.connector, connect, @@ -148,7 +138,6 @@ export const sqlitePoolClientConnection = < sqliteTransaction( options.connector, connection, - transactionCounter, allowNestedTransactions ?? false, ), executor: () => sqliteSQLExecutor(options.connector), diff --git a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts index 6d451f6f..4a38d7ee 100644 --- a/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts +++ b/src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts @@ -5,9 +5,9 @@ import { type DatabaseTransaction, } from '../../../../core'; import { sqliteSQLExecutor } from '../../core/execute'; -import type { - SQLiteClientOrPoolClient, - TransactionNestingCounter, +import { + transactionNestingCounter, + type SQLiteClientOrPoolClient, } from '../connections'; export type SQLiteTransaction< @@ -21,64 +21,71 @@ export const sqliteTransaction = >( connector: ConnectorType, connection: () => Connection, - transactionCounter: TransactionNestingCounter, allowNestedTransactions: boolean, ) => ( getClient: Promise, options?: { close: (client: DbClient, error?: unknown) => Promise }, - ): DatabaseTransaction => ({ - connection: connection(), - connector, - begin: async function () { - const client = await getClient; + ): DatabaseTransaction => { + const transactionCounter = transactionNestingCounter(); + return { + connection: connection(), + connector, + begin: async function () { + const client = await getClient; + + if (allowNestedTransactions) { + if (transactionCounter.level >= 1) { + transactionCounter.increment(); + await client.query( + `SAVEPOINT transaction${transactionCounter.level}`, + ); + return; + } - if (allowNestedTransactions) { - if (transactionCounter.level >= 1) { transactionCounter.increment(); - await client.query( - `SAVEPOINT transaction${transactionCounter.level}`, - ); - return; } - transactionCounter.increment(); - } + await client.query('BEGIN TRANSACTION'); + }, + commit: async function () { + const client = await getClient; - await client.query('BEGIN TRANSACTION'); - }, - commit: async function () { - const client = await getClient; + try { + if (allowNestedTransactions) { + if (transactionCounter.level > 1) { + await client.query( + `RELEASE transaction${transactionCounter.level}`, + ); + transactionCounter.decrement(); - if (allowNestedTransactions) { - if (transactionCounter.level > 1) { - await client.query(`RELEASE transaction${transactionCounter.level}`); - transactionCounter.decrement(); + return; + } - return; + transactionCounter.reset(); + } + await client.query('COMMIT'); + } finally { + if (options?.close) await options?.close(client); } + }, + rollback: async function (error?: unknown) { + const client = await getClient; + try { + if (allowNestedTransactions) { + if (transactionCounter.level > 1) { + transactionCounter.decrement(); + return; + } + } - transactionCounter.reset(); - } - await client.query('COMMIT'); - - if (options?.close) await options?.close(client); - }, - rollback: async function (error?: unknown) { - const client = await getClient; - - if (allowNestedTransactions) { - if (transactionCounter.level > 1) { - transactionCounter.decrement(); - return; + await client.query('ROLLBACK'); + } finally { + if (options?.close) await options?.close(client, error); } - } - - await client.query('ROLLBACK'); - - if (options?.close) await options?.close(client, error); - }, - execute: sqlExecutor(sqliteSQLExecutor(connector), { - connect: () => getClient, - }), - }); + }, + execute: sqlExecutor(sqliteSQLExecutor(connector), { + connect: () => getClient, + }), + }; + };