Skip to content
Merged
59 changes: 54 additions & 5 deletions src/packages/dumbo/src/storage/sqlite/core/connections/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export type SQLitePoolConnectionOptions<
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
> = {
connector: ConnectorType;
transactionCounter: TransactionNestingCounter;
allowNestedTransactions: boolean;
type: 'PoolClient';
connect: Promise<SQLitePoolClient>;
close: (client: SQLitePoolClient) => Promise<void>;
Expand All @@ -55,6 +57,8 @@ export type SQLiteClientConnectionOptions<
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
> = {
connector: ConnectorType;
transactionCounter: TransactionNestingCounter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have transactionCounter as part of options? Shouldn’t be that created internally in the connection and kept there?🤔

allowNestedTransactions: boolean;
type: 'Client';
connect: Promise<SQLiteClient>;
close: (client: SQLiteClient) => Promise<void>;
Expand All @@ -79,31 +83,74 @@ export const sqliteClientConnection = <
>(
options: SQLiteClientConnectionOptions<ConnectorType>,
): SQLiteClientConnection<ConnectorType> => {
const { connect, close } = options;
const { connect, close, allowNestedTransactions, transactionCounter } =
options;

return createConnection({
connector: options.connector,
connect,
close,
initTransaction: (connection) =>
sqliteTransaction(options.connector, connection),
initTransaction: (connection) => {
return sqliteTransaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: I think that we can simplify that by keeping transaction a singleton inside the client as SQLIte cannot have more than one open transaction. Then we could setup counter internally inside transaction without the need to pass it from the connection.

options.connector,
connection,
transactionCounter,
allowNestedTransactions,
);
},
executor: () => sqliteSQLExecutor(options.connector),
});
};

export type TransactionNestingCounter = {
increment: () => void;
decrement: () => void;
reset: () => void;
level: number;
};

export const transactionNestingCounter = (): TransactionNestingCounter => {
let transactionLevel = 0;

return {
reset: () => {
transactionLevel = 0;
},
increment: () => {
transactionLevel++;
},
decrement: () => {
transactionLevel--;

if (transactionLevel < 0) {
throw new Error('Transaction level is out of bounds');
}
},
get level() {
return transactionLevel;
},
};
};

export const sqlitePoolClientConnection = <
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
>(
options: SQLitePoolConnectionOptions<ConnectorType>,
): SQLitePoolClientConnection<ConnectorType> => {
const { connect, close } = options;
const { connect, close, transactionCounter, allowNestedTransactions } =
options;

return createConnection({
connector: options.connector,
connect,
close,
initTransaction: (connection) =>
sqliteTransaction(options.connector, connection),
sqliteTransaction(
options.connector,
connection,
transactionCounter,
allowNestedTransactions ?? false,
),
executor: () => sqliteSQLExecutor(options.connector),
});
};
Expand All @@ -113,11 +160,13 @@ export function sqliteConnection<
>(
options: SQLitePoolConnectionOptions<ConnectorType>,
): SQLitePoolClientConnection;

export function sqliteConnection<
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
>(
options: SQLiteClientConnectionOptions<ConnectorType>,
): SQLiteClientConnection;

export function sqliteConnection<
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
>(
Expand Down
25 changes: 23 additions & 2 deletions src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
sqliteClientProvider,
sqliteConnection,
SQLiteConnectionString,
transactionNestingCounter,
type SQLiteClient,
type SQLiteClientConnection,
type SQLiteConnectorType,
Expand Down Expand Up @@ -58,12 +59,14 @@ export const sqliteSingletonClientPool = <
options: {
connector: ConnectorType;
database?: string | undefined;
allowNestedTransactions?: boolean;
} & SQLiteFileNameOrConnectionString,
): SQLiteAmbientClientPool<ConnectorType> => {
const { connector } = options;
let connection: SQLiteClientConnection | undefined = undefined;

const getConnection = () => {
const transactionCounter = transactionNestingCounter();
if (connection) return connection;

const connect = sqliteClientProvider(connector).then(
Expand All @@ -80,6 +83,8 @@ export const sqliteSingletonClientPool = <
connector,
type: 'Client',
connect,
transactionCounter,
allowNestedTransactions: options.allowNestedTransactions ?? false,
close: () => Promise.resolve(),
}));
};
Expand Down Expand Up @@ -110,6 +115,7 @@ export const sqliteAlwaysNewClientPool = <
return createConnectionPool({
connector: connector,
getConnection: () => {
const transactionCounter = transactionNestingCounter();
const connect = sqliteClientProvider(connector).then(
async (sqliteClient) => {
const client = sqliteClient(options);
Expand All @@ -124,6 +130,8 @@ export const sqliteAlwaysNewClientPool = <
connector,
type: 'Client',
connect,
transactionCounter,
allowNestedTransactions: false,
close: (client) => client.close(),
});
},
Expand All @@ -135,16 +143,20 @@ export const sqliteAmbientClientPool = <
>(options: {
connector: ConnectorType;
client: SQLiteClient;
allowNestedTransactions: boolean;
}): SQLiteAmbientClientPool<ConnectorType> => {
const { client, connector } = options;
const { client, connector, allowNestedTransactions } = options;

const transactionCounter = transactionNestingCounter();
const getConnection = () => {
const connect = Promise.resolve(client);

return sqliteConnection({
connector,
type: 'Client',
connect,
transactionCounter,
allowNestedTransactions,
close: () => Promise.resolve(),
});
};
Expand Down Expand Up @@ -178,6 +190,7 @@ export type SQLitePoolPooledOptions<
connector: ConnectorType;
pooled?: true;
singleton?: boolean;
allowNestedTransactions?: boolean;
};

export type SQLitePoolNotPooledOptions<
Expand All @@ -188,11 +201,13 @@ export type SQLitePoolNotPooledOptions<
pooled?: false;
client: SQLiteClient;
singleton?: true;
allowNestedTransactions?: boolean;
}
| {
connector: ConnectorType;
pooled?: boolean;
singleton?: boolean;
allowNestedTransactions?: boolean;
}
| {
connector: ConnectorType;
Expand All @@ -201,6 +216,7 @@ export type SQLitePoolNotPooledOptions<
| SQLiteClientConnection<ConnectorType>;
pooled?: false;
singleton?: true;
allowNestedTransactions?: boolean;
};

export type SQLitePoolOptions<
Expand All @@ -218,6 +234,7 @@ export function sqlitePool<
options: SQLitePoolNotPooledOptions<ConnectorType> &
SQLiteFileNameOrConnectionString,
): SQLiteAmbientClientPool<ConnectorType>;

export function sqlitePool<
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
>(
Expand All @@ -231,7 +248,11 @@ export function sqlitePool<
// setSQLiteTypeParser(serializer ?? JSONSerializer);

if ('client' in options && options.client)
return sqliteAmbientClientPool({ connector, client: options.client });
return sqliteAmbientClientPool({
connector,
client: options.client,
allowNestedTransactions: options.allowNestedTransactions ?? false,
});

if ('connection' in options && options.connection)
return sqliteAmbientConnectionPool({
Expand Down
44 changes: 40 additions & 4 deletions src/packages/dumbo/src/storage/sqlite/core/transactions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {
type DatabaseTransaction,
} from '../../../../core';
import { sqliteSQLExecutor } from '../../core/execute';
import type { SQLiteClientOrPoolClient } from '../connections';
import type {
SQLiteClientOrPoolClient,
TransactionNestingCounter,
} from '../connections';

export type SQLiteTransaction<
ConnectorType extends SQLiteConnectorType = SQLiteConnectorType,
Expand All @@ -18,26 +21,59 @@ export const sqliteTransaction =
>(
connector: ConnectorType,
connection: () => Connection<ConnectorType, DbClient>,
transactionCounter: TransactionNestingCounter,
allowNestedTransactions: boolean,
) =>
(
getClient: Promise<DbClient>,
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
): DatabaseTransaction<ConnectorType, DbClient> => ({
connection: connection(),
connector,
begin: async () => {
begin: async function () {
const client = await getClient;

if (allowNestedTransactions) {
if (transactionCounter.level >= 1) {
transactionCounter.increment();
await client.query(
`SAVEPOINT transaction${transactionCounter.level}`,
);
return;
}

transactionCounter.increment();
}

await client.query('BEGIN TRANSACTION');
},
commit: async () => {
commit: async function () {
const client = await getClient;

if (allowNestedTransactions) {
if (transactionCounter.level > 1) {
await client.query(`RELEASE transaction${transactionCounter.level}`);
transactionCounter.decrement();

return;
}

transactionCounter.reset();
}
await client.query('COMMIT');

if (options?.close) await options?.close(client);
},
rollback: async (error?: unknown) => {
rollback: async function (error?: unknown) {
const client = await getClient;

if (allowNestedTransactions) {
if (transactionCounter.level > 1) {
transactionCounter.decrement();
return;
}
}

await client.query('ROLLBACK');

if (options?.close) await options?.close(client, error);
Expand Down
Loading