Skip to content

Commit 2321234

Browse files
committed
Ensured that there's a single active transaction within the connection
1 parent 2f1754b commit 2321234

File tree

5 files changed

+94
-76
lines changed

5 files changed

+94
-76
lines changed

src/packages/dumbo/src/core/connections/connection.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ export type CreateConnectionOptions<
4545
close: (client: DbClient) => Promise<void>;
4646
initTransaction: (
4747
connection: () => ConnectionType,
48-
) => (client: Promise<DbClient>) => DatabaseTransaction<Connector, DbClient>;
48+
) => (
49+
client: Promise<DbClient>,
50+
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
51+
) => DatabaseTransaction<Connector, DbClient>;
4952
executor: () => Executor;
5053
};
5154

src/packages/dumbo/src/core/connections/transaction.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,27 @@ export const transactionFactoryWithDbClient = <
7070
connect: () => Promise<DbClient>,
7171
initTransaction: (
7272
client: Promise<DbClient>,
73+
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
7374
) => DatabaseTransaction<Connector, DbClient>,
74-
): DatabaseTransactionFactory<Connector, DbClient> => ({
75-
transaction: () => initTransaction(connect()),
76-
withTransaction: (handle) =>
77-
executeInTransaction(initTransaction(connect()), handle),
78-
});
75+
): DatabaseTransactionFactory<Connector, DbClient> => {
76+
let currentTransaction: DatabaseTransaction<Connector, DbClient> | undefined =
77+
undefined;
78+
79+
const getOrInitCurrentTransaction = () =>
80+
currentTransaction ??
81+
(currentTransaction = initTransaction(connect(), {
82+
close: () => {
83+
currentTransaction = undefined;
84+
return Promise.resolve();
85+
},
86+
}));
87+
88+
return {
89+
transaction: getOrInitCurrentTransaction,
90+
withTransaction: (handle) =>
91+
executeInTransaction(getOrInitCurrentTransaction(), handle),
92+
};
93+
};
7994

8095
const wrapInConnectionClosure = async <
8196
ConnectionType extends Connection = Connection,

src/packages/dumbo/src/storage/postgresql/pg/connections/transaction.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,19 @@ export const nodePostgresTransaction =
3030
commit: async () => {
3131
const client = await getClient;
3232

33-
await client.query('COMMIT');
34-
35-
if (options?.close) await options?.close(client);
33+
try {
34+
await client.query('COMMIT');
35+
} finally {
36+
if (options?.close) await options?.close(client);
37+
}
3638
},
3739
rollback: async (error?: unknown) => {
3840
const client = await getClient;
39-
await client.query('ROLLBACK');
40-
41-
if (options?.close) await options?.close(client, error);
41+
try {
42+
await client.query('ROLLBACK');
43+
} finally {
44+
if (options?.close) await options?.close(client, error);
45+
}
4246
},
4347
execute: sqlExecutor(nodePostgresSQLExecutor(), {
4448
connect: () => getClient,

src/packages/dumbo/src/storage/sqlite/core/connections/index.ts

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export type SQLiteClient = {
1818
};
1919

2020
export type SQLitePoolClient = {
21-
close(): Promise<void>;
2221
release: () => void;
2322
command: (sql: string, values?: Parameters[]) => Promise<void>;
2423
query: <T>(sql: string, values?: Parameters[]) => Promise<T[]>;
@@ -47,20 +46,20 @@ export type SQLitePoolConnectionOptions<
4746
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
4847
> = {
4948
connector: ConnectorType;
50-
allowNestedTransactions: boolean;
5149
type: 'PoolClient';
5250
connect: Promise<SQLitePoolClient>;
5351
close: (client: SQLitePoolClient) => Promise<void>;
52+
allowNestedTransactions: boolean;
5453
};
5554

5655
export type SQLiteClientConnectionOptions<
5756
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
5857
> = {
5958
connector: ConnectorType;
60-
allowNestedTransactions: boolean;
6159
type: 'Client';
6260
connect: Promise<SQLiteClient>;
6361
close: (client: SQLiteClient) => Promise<void>;
62+
allowNestedTransactions: boolean;
6463
};
6564

6665
export type SQLiteClientConnection<
@@ -84,20 +83,12 @@ export const sqliteClientConnection = <
8483
): SQLiteClientConnection<ConnectorType> => {
8584
const { connect, close, allowNestedTransactions } = options;
8685

87-
const transactionCounter = transactionNestingCounter();
88-
8986
return createConnection({
9087
connector: options.connector,
9188
connect,
9289
close,
93-
initTransaction: (connection) => {
94-
return sqliteTransaction(
95-
options.connector,
96-
connection,
97-
transactionCounter,
98-
allowNestedTransactions,
99-
);
100-
},
90+
initTransaction: (connection) =>
91+
sqliteTransaction(options.connector, connection, allowNestedTransactions),
10192
executor: () => sqliteSQLExecutor(options.connector),
10293
});
10394
};
@@ -139,7 +130,6 @@ export const sqlitePoolClientConnection = <
139130
): SQLitePoolClientConnection<ConnectorType> => {
140131
const { connect, close, allowNestedTransactions } = options;
141132

142-
const transactionCounter = transactionNestingCounter();
143133
return createConnection({
144134
connector: options.connector,
145135
connect,
@@ -148,7 +138,6 @@ export const sqlitePoolClientConnection = <
148138
sqliteTransaction(
149139
options.connector,
150140
connection,
151-
transactionCounter,
152141
allowNestedTransactions ?? false,
153142
),
154143
executor: () => sqliteSQLExecutor(options.connector),

src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import {
55
type DatabaseTransaction,
66
} from '../../../../core';
77
import { sqliteSQLExecutor } from '../../core/execute';
8-
import type {
9-
SQLiteClientOrPoolClient,
10-
TransactionNestingCounter,
8+
import {
9+
transactionNestingCounter,
10+
type SQLiteClientOrPoolClient,
1111
} from '../connections';
1212

1313
export type SQLiteTransaction<
@@ -21,64 +21,71 @@ export const sqliteTransaction =
2121
>(
2222
connector: ConnectorType,
2323
connection: () => Connection<ConnectorType, DbClient>,
24-
transactionCounter: TransactionNestingCounter,
2524
allowNestedTransactions: boolean,
2625
) =>
2726
(
2827
getClient: Promise<DbClient>,
2928
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
30-
): DatabaseTransaction<ConnectorType, DbClient> => ({
31-
connection: connection(),
32-
connector,
33-
begin: async function () {
34-
const client = await getClient;
29+
): DatabaseTransaction<ConnectorType, DbClient> => {
30+
const transactionCounter = transactionNestingCounter();
31+
return {
32+
connection: connection(),
33+
connector,
34+
begin: async function () {
35+
const client = await getClient;
36+
37+
if (allowNestedTransactions) {
38+
if (transactionCounter.level >= 1) {
39+
transactionCounter.increment();
40+
await client.query(
41+
`SAVEPOINT transaction${transactionCounter.level}`,
42+
);
43+
return;
44+
}
3545

36-
if (allowNestedTransactions) {
37-
if (transactionCounter.level >= 1) {
3846
transactionCounter.increment();
39-
await client.query(
40-
`SAVEPOINT transaction${transactionCounter.level}`,
41-
);
42-
return;
4347
}
4448

45-
transactionCounter.increment();
46-
}
49+
await client.query('BEGIN TRANSACTION');
50+
},
51+
commit: async function () {
52+
const client = await getClient;
4753

48-
await client.query('BEGIN TRANSACTION');
49-
},
50-
commit: async function () {
51-
const client = await getClient;
54+
try {
55+
if (allowNestedTransactions) {
56+
if (transactionCounter.level > 1) {
57+
await client.query(
58+
`RELEASE transaction${transactionCounter.level}`,
59+
);
60+
transactionCounter.decrement();
5261

53-
if (allowNestedTransactions) {
54-
if (transactionCounter.level > 1) {
55-
await client.query(`RELEASE transaction${transactionCounter.level}`);
56-
transactionCounter.decrement();
62+
return;
63+
}
5764

58-
return;
65+
transactionCounter.reset();
66+
}
67+
await client.query('COMMIT');
68+
} finally {
69+
if (options?.close) await options?.close(client);
5970
}
71+
},
72+
rollback: async function (error?: unknown) {
73+
const client = await getClient;
74+
try {
75+
if (allowNestedTransactions) {
76+
if (transactionCounter.level > 1) {
77+
transactionCounter.decrement();
78+
return;
79+
}
80+
}
6081

61-
transactionCounter.reset();
62-
}
63-
await client.query('COMMIT');
64-
65-
if (options?.close) await options?.close(client);
66-
},
67-
rollback: async function (error?: unknown) {
68-
const client = await getClient;
69-
70-
if (allowNestedTransactions) {
71-
if (transactionCounter.level > 1) {
72-
transactionCounter.decrement();
73-
return;
82+
await client.query('ROLLBACK');
83+
} finally {
84+
if (options?.close) await options?.close(client, error);
7485
}
75-
}
76-
77-
await client.query('ROLLBACK');
78-
79-
if (options?.close) await options?.close(client, error);
80-
},
81-
execute: sqlExecutor(sqliteSQLExecutor(connector), {
82-
connect: () => getClient,
83-
}),
84-
});
86+
},
87+
execute: sqlExecutor(sqliteSQLExecutor(connector), {
88+
connect: () => getClient,
89+
}),
90+
};
91+
};

0 commit comments

Comments
 (0)