Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions backend/src/mastra/agents/investigate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ RULES:
- You have at most 6 tool calls total. Budget them: 1 fetch + 1 search + 1 fetch + 1 insert = done.
- ALWAYS insert a row, even if some fields are incomplete. Use "" for unknown fields. Partial real data is better than no row.
- Never fabricate values. Use "" for anything you cannot verify.
- For every field value you extract and fill in "data", you MUST record the cell-level provenance (the source URL, the search query used to find it, and the exact text snippet context showing the value) in the "provenance" parameter of insert_row/update_row.
- insert_row rejects duplicates based on primary key columns. If you get a "Duplicate" error, do NOT retry — report INSERTED: false and move on.

TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces:
search_web: {"query": "your search terms"}
fetch_page: {"url": "https://example.com"}
insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://url-you-fetched.com"], "row_summary": "one line about this entity", "how_found": "step by step guide on how to extract the data so an agent in the future can do it too"}
insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://url-you-fetched.com"], "provenance": {${columnNames.map((n) => `"${n}": {"url": "https://url-you-fetched.com", "query": "search query used", "snippet": "exact context snippet from page"}`).join(", ")}}, "row_summary": "one line about this entity", "how_found": "step by step guide on how to extract the data so an agent in the future can do it too"}

WORKFLOW:
1. Fetch 1-2 of the provided URLs to get real data (if URLs were given).
2. If you need more, run ONE search and fetch the best result.
3. Call insert_row with whatever real data you have. Use "" for missing fields.
Include "sources" (URLs you fetched), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "<insert url>", 2. Look for the pricing field, and title name field, 3. etc...)
Include "sources" (URLs you fetched), "provenance" (mapping of column names to their detailed source details), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "<insert url>", 2. Look for the pricing field, and title name field, 3. etc...)
4. Write your final response:
INSERTED: true/false
SUMMARY: one line
Expand Down
28 changes: 26 additions & 2 deletions backend/src/mastra/tools/dataset-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ export function buildPopulateTools(
.array(z.string())
.optional()
.describe("URLs you visited or used to gather data for this row"),
provenance: z
.record(
z.string(),
z.object({
url: z.string(),
query: z.string().optional(),
snippet: z.string().optional(),
})
)
.optional()
.describe("Mapping of column names to their detailed source provenance (url, query, snippet)"),
row_summary: z
.string()
.optional()
Expand All @@ -141,7 +152,7 @@ export function buildPopulateTools(
.describe("Brief description of how you found and verified this data"),
}),
outputSchema: writeResultSchema,
execute: async ({ data, sources, row_summary, how_found }) => {
execute: async ({ data, sources, provenance, row_summary, how_found }) => {
if (!data || Object.keys(data).length === 0)
return {
success: false,
Expand All @@ -158,6 +169,7 @@ export function buildPopulateTools(
datasetId: authorizedDatasetId,
data: cleanedData,
...(sources !== undefined ? { sources } : {}),
...(provenance !== undefined ? { provenance } : {}),
...(row_summary !== undefined ? { rowSummary: row_summary } : {}),
...(how_found !== undefined ? { howFound: how_found } : {}),
});
Expand Down Expand Up @@ -265,6 +277,17 @@ export function buildPopulateTools(
.array(z.string())
.optional()
.describe("Updated source URLs where this data was verified"),
provenance: z
.record(
z.string(),
z.object({
url: z.string(),
query: z.string().optional(),
snippet: z.string().optional(),
})
)
.optional()
.describe("Updated mapping of column names to their detailed source provenance (url, query, snippet)"),
row_summary: z
.string()
.optional()
Expand All @@ -275,7 +298,7 @@ export function buildPopulateTools(
.describe("Brief description of how the updated data was found"),
}),
outputSchema: writeResultSchema,
execute: async ({ rowId, data, sources, row_summary, how_found }) => {
execute: async ({ rowId, data, sources, provenance, row_summary, how_found }) => {
if (!rowId) return { success: false, error: "rowId is required." };
if (!data || Object.keys(data).length === 0)
return {
Expand All @@ -293,6 +316,7 @@ export function buildPopulateTools(
expectedDatasetId: authorizedDatasetId,
data: cleanedData,
...(sources !== undefined ? { sources } : {}),
...(provenance !== undefined ? { provenance } : {}),
...(row_summary !== undefined ? { rowSummary: row_summary } : {}),
...(how_found !== undefined ? { howFound: how_found } : {}),
});
Expand Down
13 changes: 12 additions & 1 deletion frontend/app/dataset/[id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ export default function DatasetPage() {
column: DatasetColumn;
value: unknown;
sources?: string[];
provenance?: {
url: string;
query?: string;
snippet?: string;
};
} | null>(null);

const datasetId = params.id as Id<"datasets">;
Expand Down Expand Up @@ -95,7 +100,12 @@ export default function DatasetPage() {
const col = dataset.columns.find((c) => c.name === columnName);
if (!col) return;
const row = rows.find((r) => r._id === rowId);
setCellDetail({ column: col, value, sources: row?.sources });
setCellDetail({
column: col,
value,
sources: row?.sources,
provenance: row?.provenance?.[columnName],
});
}, [dataset, rows]);

const openedFired = useRef<string | null>(null);
Expand Down Expand Up @@ -409,6 +419,7 @@ export default function DatasetPage() {
column={cellDetail.column}
value={cellDetail.value}
sources={cellDetail.sources}
provenance={cellDetail.provenance}
/>
)}
</SideSheet>
Expand Down
67 changes: 66 additions & 1 deletion frontend/components/SideSheet.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ interface CellDetailProps {
value: unknown;
/** Row-level sources stored by the populate agent. */
sources?: string[];
/** Cell-level provenance metadata. */
provenance?: {
url: string;
query?: string;
snippet?: string;
};
}

function isValidHttpUrl(src: string): boolean {
Expand All @@ -130,7 +136,7 @@ function isValidHttpUrl(src: string): boolean {
}
}

export function CellDetail({ column, value, sources }: CellDetailProps) {
export function CellDetail({ column, value, sources, provenance }: CellDetailProps) {
const [copied, setCopied] = useState(false);
const copyTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const displayValue = value == null || value === "" ? "—" : String(value);
Expand Down Expand Up @@ -192,6 +198,65 @@ export function CellDetail({ column, value, sources }: CellDetailProps) {
</div>
</div>

{/* Cell Provenance */}
{provenance && (
<div className="rounded-xl border border-emerald-500/15 bg-emerald-500/[0.02] p-4 space-y-3.5">
<div className="flex items-center gap-2 text-emerald-700 dark:text-emerald-400 font-medium text-xs">
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<path d="M12 22s8-4 8-10V5l-8-3-8 3v7c0 6 8 10 8 10z"/>
</svg>
<span>Verified Source Origin</span>
</div>

<div className="space-y-3">
{/* Source URL */}
<div>
<p className="text-[10px] font-semibold text-muted uppercase tracking-wider">Source URL</p>
{isValidHttpUrl(provenance.url) ? (
<a
href={provenance.url}
target="_blank"
rel="noopener noreferrer"
className="inline-flex items-center gap-1 text-xs text-link hover:underline break-all mt-0.5"
data-ph-mask-text="true"
>
<IconExternalLink />
{provenance.url}
</a>
) : (
<p className="text-xs text-foreground break-all mt-0.5" data-ph-mask-text="true">
{provenance.url}
</p>
)}
</div>

{/* Search Query */}
{provenance.query && (
<div>
<p className="text-[10px] font-semibold text-muted uppercase tracking-wider">Search Query Used</p>
<div className="inline-flex items-center gap-1 px-1.5 py-0.5 rounded bg-foreground/[0.04] border border-border/60 text-xs text-foreground/80 mt-1" data-ph-mask-text="true">
<svg width="10" height="10" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2.5" strokeLinecap="round" strokeLinejoin="round" className="opacity-60">
<circle cx="11" cy="11" r="8"/><path d="m21 21-4.3-4.3"/>
</svg>
<span>{provenance.query}</span>
</div>
</div>
)}

{/* Text Snippet */}
{provenance.snippet && (
<div>
<p className="text-[10px] font-semibold text-muted uppercase tracking-wider mb-1">Snippet Context</p>
<div className="relative rounded-lg border border-border bg-background px-3 py-2 text-xs italic text-foreground/80 leading-relaxed" data-ph-mask-text="true">
<span className="absolute left-2.5 top-1.5 text-foreground/10 text-2xl font-serif leading-none">&ldquo;</span>
<p className="pl-4 pr-1">{provenance.snippet}</p>
</div>
</div>
)}
</div>
</div>
)}

{/* Sources */}
{sources && sources.length > 0 && (
<div>
Expand Down
13 changes: 12 additions & 1 deletion frontend/components/table/DataRow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ function DataRowImpl({
const value = row.original.data[col.name];
const isPending = pendingRowIds.has(row.original._id);
const isFlashing = flashingCells.has(`${row.original._id}:${col.name}`);
const hasProvenance = !!row.original.provenance?.[col.name];
return (
<div
key={col.name}
Expand All @@ -131,7 +132,17 @@ function DataRowImpl({
padding: "var(--table-cell-py) var(--table-cell-px)",
}}
>
<CellValue value={value} type={col.type} />
<div className="flex items-center gap-1.5 min-w-0 pr-6">
<div className="truncate">
<CellValue value={value} type={col.type} />
</div>
{hasProvenance && (
<span
title="Source provenance available"
className="inline-flex shrink-0 w-1.5 h-1.5 rounded-full bg-emerald-500/70"
/>
)}
</div>
<button
type="button"
onClick={(e) => {
Expand Down
8 changes: 8 additions & 0 deletions frontend/components/table/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,13 @@ export interface DatasetRow {
_creationTime: number;
data: Record<string, unknown>;
sources?: string[];
provenance?: Record<
string,
{
url: string;
query?: string;
snippet?: string;
}
>;
updateStatus?: "pending";
}
14 changes: 9 additions & 5 deletions frontend/components/table/use-row-change-detection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ export function useRowChangeDetection(rows: DatasetRow[]) {
prevRowsRef.current = nextMap;

if (newFlashes.size > 0) {
setFlashingCells((prev) => {
const merged = new Set(prev);
for (const key of newFlashes) merged.add(key);
return merged;
});
const updateTimer = setTimeout(() => {
setFlashingCells((prev) => {
const merged = new Set(prev);
for (const key of newFlashes) merged.add(key);
return merged;
});
flashTimersRef.current.delete(updateTimer);
}, 0);
flashTimersRef.current.add(updateTimer);

const timer = setTimeout(() => {
setFlashingCells((prev) => {
Expand Down
21 changes: 21 additions & 0 deletions frontend/convex/datasetRows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ export const insert = internalMutation({
datasetId: v.id("datasets"),
data: v.record(v.string(), v.any()),
sources: v.optional(v.array(v.string())),
provenance: v.optional(
v.record(
v.string(),
v.object({
url: v.string(),
query: v.optional(v.string()),
snippet: v.optional(v.string()),
})
)
),
rowSummary: v.optional(v.string()),
howFound: v.optional(v.string()),
},
Expand Down Expand Up @@ -150,6 +160,16 @@ export const update = internalMutation({
expectedDatasetId: v.id("datasets"),
data: v.record(v.string(), v.any()),
sources: v.optional(v.array(v.string())),
provenance: v.optional(
v.record(
v.string(),
v.object({
url: v.string(),
query: v.optional(v.string()),
snippet: v.optional(v.string()),
})
)
),
rowSummary: v.optional(v.string()),
howFound: v.optional(v.string()),
},
Expand Down Expand Up @@ -182,6 +202,7 @@ export const update = internalMutation({
updateStatus: undefined,
};
if (args.sources !== undefined) patch.sources = args.sources;
if (args.provenance !== undefined) patch.provenance = args.provenance;
if (args.rowSummary !== undefined) patch.rowSummary = args.rowSummary;
if (args.howFound !== undefined) patch.howFound = args.howFound;
await ctx.db.patch(args.id, patch);
Expand Down
54 changes: 38 additions & 16 deletions frontend/convex/runStats.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { internalMutation, internalQuery } from "./_generated/server.js";
import { v } from "convex/values";

const DEFAULT_PAGE_SIZE = 50;
const MAX_PAGE_SIZE = 200;

/**
* Insert a populate-run metrics record.
*
Expand Down Expand Up @@ -68,33 +71,52 @@ export const getByWorkflowRunId = internalQuery({
});

/**
* List all runs for a dataset, newest first.
* TODO: paginate — .collect() loads all docs into memory and will degrade
* as run history grows. Add cursor-based pagination when this is exposed
* to the frontend or run counts become large.
* List runs for a dataset, newest first.
* Cursor-based pagination keeps memory bounded as run history grows.
*/
export const listByDataset = internalQuery({
args: { datasetId: v.string() },
args: {
datasetId: v.string(),
cursor: v.optional(v.string()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const runs = await ctx.db
const limit = Math.min(args.limit ?? DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add validation to ensure limit is positive.

The current limit calculation allows negative or zero values to pass through if explicitly provided in args.limit. This could cause unexpected pagination behavior or errors in the underlying paginate() call.

🛡️ Suggested fix to enforce positive limit
-    const limit = Math.min(args.limit ?? DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE);
+    const limit = Math.min(
+      Math.max(args.limit ?? DEFAULT_PAGE_SIZE, 1),
+      MAX_PAGE_SIZE
+    );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const limit = Math.min(args.limit ?? DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE);
const limit = Math.min(
Math.max(args.limit ?? DEFAULT_PAGE_SIZE, 1),
MAX_PAGE_SIZE
);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@frontend/convex/runStats.ts` at line 84, The limit calculation currently
allows non-positive values from args.limit to pass through; update the code
around the variable limit (the line using args.limit, DEFAULT_PAGE_SIZE,
MAX_PAGE_SIZE) to validate and clamp args.limit to a positive integer before
applying Math.min—e.g., coerce args.limit to a number, ensure it's at least 1
(or fall back to DEFAULT_PAGE_SIZE) and then cap with MAX_PAGE_SIZE so
paginate() always receives a positive limit; modify the expression that computes
limit to perform this validation.

const { page, isDone, continueCursor } = await ctx.db
.query("runStats")
.withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId))
.collect();
return runs.sort((a, b) => b.startedAt - a.startedAt);
.withIndex("by_dataset_started_at", (q) =>
q.eq("datasetId", args.datasetId),
)
.order("desc")
.paginate({
cursor: args.cursor ?? null,
numItems: limit,
});

return { runs: page, isDone, continueCursor };
},
});

/**
* List all runs for a user, newest first.
* TODO: paginate — same concern as listByDataset above.
* List runs for a user, newest first.
*/
export const listByUser = internalQuery({
args: { userId: v.string() },
args: {
userId: v.string(),
cursor: v.optional(v.string()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const runs = await ctx.db
const limit = Math.min(args.limit ?? DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add validation to ensure limit is positive.

Same issue as in listByDataset: the limit calculation allows negative or zero values to pass through, which could cause unexpected behavior.

🛡️ Suggested fix to enforce positive limit
-    const limit = Math.min(args.limit ?? DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE);
+    const limit = Math.min(
+      Math.max(args.limit ?? DEFAULT_PAGE_SIZE, 1),
+      MAX_PAGE_SIZE
+    );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const limit = Math.min(args.limit ?? DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE);
const limit = Math.min(
Math.max(args.limit ?? DEFAULT_PAGE_SIZE, 1),
MAX_PAGE_SIZE
);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@frontend/convex/runStats.ts` at line 110, The current limit computation in
runStats.ts (the const limit = Math.min(args.limit ?? DEFAULT_PAGE_SIZE,
MAX_PAGE_SIZE);) doesn't prevent zero or negative values; update the logic that
computes limit (using args.limit, DEFAULT_PAGE_SIZE and MAX_PAGE_SIZE) to
enforce a positive integer (e.g., coerce to a number and clamp with a lower
bound of 1) before using it, and validate or sanitize args.limit so limit is
always >= 1 and <= MAX_PAGE_SIZE.

const { page, isDone, continueCursor } = await ctx.db
.query("runStats")
.withIndex("by_user", (q) => q.eq("userId", args.userId))
.collect();
return runs.sort((a, b) => b.startedAt - a.startedAt);
.withIndex("by_user_started_at", (q) => q.eq("userId", args.userId))
.order("desc")
.paginate({
cursor: args.cursor ?? null,
numItems: limit,
});

return { runs: page, isDone, continueCursor };
},
});
Loading