-
Notifications
You must be signed in to change notification settings - Fork 109
feat: add pg task store #269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add pg task store #269
Conversation
Summary of ChangesHello @aryaraj132, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the application's task management capabilities by introducing a robust PostgreSQL-backed task store. This new store provides persistent storage for tasks, addressing the need for data durability and advanced querying. Concurrently, the core Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new PostgreSQL-based task store, which is a great feature for providing persistent storage. The implementation is a good start, but I've identified several critical security and correctness issues, primarily in the list method of the PostgresTaskStore. These include SQL injection vulnerabilities and incorrect query construction that could lead to runtime errors. I've also provided suggestions to improve robustness and maintainability, such as using transactions for migrations and avoiding redundant data storage. Please address the critical issues before merging.
| let jsonbQuery = ''; | ||
| if (metadataSearch) { | ||
| jsonbQuery = `task_data ->'metadata' @> jsonb_build_object(${Object.entries(metadataSearch) | ||
| .map(([key, _value], index) => `'${key}', $${index + 1}::text`) | ||
| .join(',')})`; | ||
| } | ||
|
|
||
| let additionalQuery = ''; | ||
| if (status) { | ||
| additionalQuery = `AND (task_data ->'status' ->> 'state') IN (${status?.map((status) => `'${status}'`).join(',')})`; | ||
| } | ||
|
|
||
| const params = [...Object.values(metadataSearch)]; | ||
|
|
||
| const query = `WITH filtered_tasks AS ( | ||
| SELECT * | ||
| FROM ${A2A_DB_NAMES.TASKS} WHERE ${jsonbQuery} ${additionalQuery} | ||
| ), | ||
| paginated_tasks AS ( | ||
| SELECT * | ||
| FROM filtered_tasks | ||
| ORDER BY created_at DESC | ||
| LIMIT ${parseInt(pageSize)} OFFSET ${(parseInt(page) - 1) * parseInt(pageSize)} | ||
| ), | ||
| total_filtered_count AS ( | ||
| SELECT COUNT(*) AS count FROM filtered_tasks | ||
| ) | ||
| SELECT | ||
| ( | ||
| SELECT json_agg(pt ORDER BY pt.created_at DESC) | ||
| FROM paginated_tasks pt | ||
| ) AS tasks, | ||
| total_filtered_count.count | ||
| FROM total_filtered_count;`; | ||
|
|
||
| const res = await this.pool.query(query, params); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query construction in the list method has several critical issues:
- SQL Injection Vulnerability: Both
metadataSearchkeys (line 136) andstatusvalues (line 142) are directly interpolated into the SQL query string. This makes the application vulnerable to SQL injection attacks. All external input should be treated as untrusted and passed as query parameters. - Incorrect
WHEREclause construction: TheWHEREclause is built in a way that can lead to SQL syntax errors. For instance, ifmetadataSearchis not provided butstatusis, the query will containWHERE AND ..., which is invalid.
I suggest refactoring the query and parameter building logic to use parameterized queries correctly for all dynamic values and to construct the WHERE clause safely. This addresses both the security vulnerabilities and the logic bug.
const params: unknown[] = [];
const whereClauses: string[] = [];
if (metadataSearch && Object.keys(metadataSearch).length > 0) {
params.push(JSON.stringify(metadataSearch));
whereClauses.push(`task_data->'metadata' @> $${params.length}::jsonb`);
}
if (status && status.length > 0) {
params.push(status);
whereClauses.push(`(task_data->'status'->>'state') = ANY($${params.length}::text[])`);
}
const whereClause = whereClauses.length > 0 ? `WHERE ${whereClauses.join(' AND ')}` : '';
const pageNum = parseInt(page, 10);
const pageSizeNum = parseInt(pageSize, 10);
params.push(pageSizeNum);
params.push((pageNum - 1) * pageSizeNum);
const query = `WITH filtered_tasks AS (
SELECT *
FROM ${A2A_DB_NAMES.TASKS} ${whereClause}
),
paginated_tasks AS (
SELECT *
FROM filtered_tasks
ORDER BY created_at DESC
LIMIT $${params.length - 1} OFFSET $${params.length}
),
total_filtered_count AS (
SELECT COUNT(*) AS count FROM filtered_tasks
)
SELECT
(
SELECT json_agg(pt ORDER BY pt.created_at DESC)
FROM paginated_tasks pt
) AS tasks,
total_filtered_count.count
FROM total_filtered_count;`;
const res = await this.pool.query(query, params);| return { | ||
| result: res.rows[0].tasks | ||
| ? res.rows[0].tasks.map((row: { id: string; task_data: Task }) => { | ||
| const reconstructedTask: Task = { | ||
| id: row.id, | ||
| ...row.task_data, | ||
| }; | ||
| return reconstructedTask; | ||
| }) | ||
| : [], | ||
| page, | ||
| pageSize, | ||
| totalNumberOfTasks: res.rows[0].count, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list method's return type does not match the TaskStore interface it implements. The interface expects Promise<{ result: Task[]; totalNumberOfTasks: number }>, but this implementation returns page and pageSize as well. This violates the interface contract and will likely cause type errors in consuming code.
return {
result: res.rows[0].tasks
? res.rows[0].tasks.map((row: { id: string; task_data: Task }) => {
const reconstructedTask: Task = {
id: row.id,
...row.task_data,
};
return reconstructedTask;
})
: [],
totalNumberOfTasks: res.rows[0].count,
};| for (let v = version + 1; v < MIGRATIONS.length; v += 1) { | ||
| await client.query(MIGRATIONS[v]); | ||
| await client.query(`INSERT INTO ${A2A_DB_NAMES.MIGRATION} (v) VALUES ($1)`, [v]); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The migration loop does not use a transaction. If a migration script runs successfully but the subsequent INSERT into the migrations table fails, the database will be in an inconsistent state. On the next run, the setup will not re-run the successful migration, but the version will not be recorded. It's a best practice to wrap each migration and its version update in a transaction to ensure atomicity.
for (let v = version + 1; v < MIGRATIONS.length; v += 1) {
await client.query('BEGIN');
try {
await client.query(MIGRATIONS[v]);
await client.query(`INSERT INTO ${A2A_DB_NAMES.MIGRATION} (v) VALUES ($1)`, [v]);
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
}
}| const taskId = task.id; | ||
|
|
||
| if (!taskId) { | ||
| throw new Error('Task object must contain task id properties for persistence.'); | ||
| } | ||
|
|
||
| await this.pool.query( | ||
| `INSERT INTO ${A2A_DB_NAMES.TASKS} (id, task_data) | ||
| VALUES ($1, $2) | ||
| ON CONFLICT (id) DO UPDATE SET | ||
| task_data = $2, | ||
| updated_at = NOW()`, | ||
| [taskId, task] | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The save method stores the entire task object in the task_data JSONB column, which includes the id property. Since the id is already stored in a dedicated id column, this is redundant. To avoid data duplication and keep the task_data column clean, I suggest destructuring the id from the task object and storing only the remaining data.
const { id: taskId, ...taskData } = task;
if (!taskId) {
throw new Error('Task object must contain task id properties for persistence.');
}
await this.pool.query(
`INSERT INTO ${A2A_DB_NAMES.TASKS} (id, task_data)
VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET
task_data = $2,
updated_at = NOW()`,
[taskId, taskData]
);|
Related to #213 |
Description
Thank you for opening a Pull Request!
Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
CONTRIBUTINGGuide.fix:which represents bug fixes, and correlates to a SemVer patch.feat:represents a new feature, and correlates to a SemVer minor.feat!:, orfix!:,refactor!:, etc., which represent a breaking change (indicated by the!) and will result in a SemVer major.Fixes #114 🦕