diff --git a/.changeset/includes-aggregates.md b/.changeset/includes-aggregates.md new file mode 100644 index 000000000..c846a6a60 --- /dev/null +++ b/.changeset/includes-aggregates.md @@ -0,0 +1,5 @@ +--- +'@tanstack/db': patch +--- + +fix: support aggregates (e.g. count) in child/includes subqueries with per-parent scoping diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 595d277ae..0309816c8 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -80,6 +80,7 @@ export function processGroupBy( havingClauses?: Array, selectClause?: Select, fnHavingClauses?: Array<(row: any) => any>, + mainSource?: string, ): NamespacedAndKeyedStream { // Handle empty GROUP BY (single-group aggregation) if (groupByClause.length === 0) { @@ -110,8 +111,15 @@ export function processGroupBy( } } - // Use a constant key for single group - const keyExtractor = () => ({ __singleGroup: true }) + // Use a constant key for single group. + // When mainSource is set (includes mode), include __correlationKey so that + // rows from different parents aggregate separately. + const keyExtractor = mainSource + ? ([, row]: [string, NamespacedRow]) => ({ + __singleGroup: true, + __correlationKey: (row as any)?.[mainSource]?.__correlationKey, + }) + : () => ({ __singleGroup: true }) // Apply the groupBy operator with single group pipeline = pipeline.pipe( @@ -139,14 +147,24 @@ export function processGroupBy( ) } - // Use a single key for the result and update $selected - return [ - `single_group`, - { - ...aggregatedRow, - $selected: finalResults, - }, - ] as [unknown, Record] + // Use a single key for the result and update $selected. + // When in includes mode, restore the namespaced source structure with + // __correlationKey so output extraction can route results per-parent. + const correlationKey = mainSource + ? (aggregatedRow as any).__correlationKey + : undefined + const resultKey = + correlationKey !== undefined + ? `single_group_${serializeValue(correlationKey)}` + : `single_group` + const resultRow: Record = { + ...aggregatedRow, + $selected: finalResults, + } + if (mainSource && correlationKey !== undefined) { + resultRow[mainSource] = { __correlationKey: correlationKey } + } + return [resultKey, resultRow] as [unknown, Record] }), ) @@ -196,7 +214,9 @@ export function processGroupBy( compileExpression(e), ) - // Create a key extractor function using simple __key_X format + // Create a key extractor function using simple __key_X format. + // When mainSource is set (includes mode), include __correlationKey so that + // rows from different parents with the same group key aggregate separately. const keyExtractor = ([, row]: [ string, NamespacedRow & { $selected?: any }, @@ -214,6 +234,10 @@ export function processGroupBy( key[`__key_${i}`] = value } + if (mainSource) { + key.__correlationKey = (row as any)?.[mainSource]?.__correlationKey + } + return key } @@ -278,25 +302,32 @@ export function processGroupBy( } } - // Generate a simple key for the live collection using group values - let finalKey: unknown - if (groupByClause.length === 1) { - finalKey = aggregatedRow[`__key_0`] - } else { - const keyParts: Array = [] - for (let i = 0; i < groupByClause.length; i++) { - keyParts.push(aggregatedRow[`__key_${i}`]) - } - finalKey = serializeValue(keyParts) + // Generate a simple key for the live collection using group values. + // When in includes mode, include the correlation key so that groups + // from different parents don't collide. + const correlationKey = mainSource + ? (aggregatedRow as any).__correlationKey + : undefined + const keyParts: Array = [] + for (let i = 0; i < groupByClause.length; i++) { + keyParts.push(aggregatedRow[`__key_${i}`]) } - - return [ - finalKey, - { - ...aggregatedRow, - $selected: finalResults, - }, - ] as [unknown, Record] + if (correlationKey !== undefined) { + keyParts.push(correlationKey) + } + const finalKey = + keyParts.length === 1 ? keyParts[0] : serializeValue(keyParts) + + // When in includes mode, restore the namespaced source structure with + // __correlationKey so output extraction can route results per-parent. + const resultRow: Record = { + ...aggregatedRow, + $selected: finalResults, + } + if (mainSource && correlationKey !== undefined) { + resultRow[mainSource] = { __correlationKey: correlationKey } + } + return [finalKey, resultRow] as [unknown, Record] }), ) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 6834fea0d..99c6fee16 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -372,7 +372,10 @@ export function compileQuery( ) } - // Process the GROUP BY clause if it exists + // Process the GROUP BY clause if it exists. + // When in includes mode (parentKeyStream), pass mainSource so that groupBy + // preserves __correlationKey for per-parent aggregation. + const groupByMainSource = parentKeyStream ? mainSource : undefined if (query.groupBy && query.groupBy.length > 0) { pipeline = processGroupBy( pipeline, @@ -380,6 +383,7 @@ export function compileQuery( query.having, query.select, query.fnHaving, + groupByMainSource, ) } else if (query.select) { // Check if SELECT contains aggregates but no GROUP BY (implicit single-group aggregation) @@ -394,6 +398,7 @@ export function compileQuery( query.having, query.select, query.fnHaving, + groupByMainSource, ) } } diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index c3b912242..1e5fc6b07 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it } from 'vitest' import { + count, createLiveQueryCollection, eq, toArray, @@ -1512,4 +1513,218 @@ describe(`includes subqueries`, () => { expect(issue11.comments).toEqual([{ id: 110, body: `Great feature` }]) }) }) + + // Aggregates in child queries: the aggregate (e.g. count) should be computed + // per-parent, not globally across all parents. Currently, the correlation key + // is lost after GROUP BY, causing all child rows to aggregate into a single + // global result rather than per-parent results. + describe(`aggregates in child queries`, () => { + describe(`single-group aggregate: count issues per project (as Collection)`, () => { + function buildAggregateQuery() { + return createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issueCount: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ total: count(i.id) })), + })), + ) + } + + it(`each project gets its own aggregate result`, async () => { + const collection = buildAggregateQuery() + await collection.preload() + + // Alpha has 2 issues + const alpha = collection.get(1) as any + expect(childItems(alpha.issueCount, `total`)).toEqual([{ total: 2 }]) + + // Beta has 1 issue + const beta = collection.get(2) as any + expect(childItems(beta.issueCount, `total`)).toEqual([{ total: 1 }]) + + // Gamma has 0 issues — no matching rows means empty Collection + const gamma = collection.get(3) as any + expect(childItems(gamma.issueCount, `total`)).toEqual([]) + }) + + it(`adding an issue updates the count for that parent`, async () => { + const collection = buildAggregateQuery() + await collection.preload() + + // Gamma starts with 0 issues + expect( + childItems((collection.get(3) as any).issueCount, `total`), + ).toEqual([]) + + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 30, projectId: 3, title: `Gamma issue` }, + }) + issues.utils.commit() + + // Gamma now has 1 issue + expect( + childItems((collection.get(3) as any).issueCount, `total`), + ).toEqual([{ total: 1 }]) + + // Alpha should still have 2 + expect( + childItems((collection.get(1) as any).issueCount, `total`), + ).toEqual([{ total: 2 }]) + }) + + it(`removing an issue updates the count for that parent`, async () => { + const collection = buildAggregateQuery() + await collection.preload() + + // Alpha starts with 2 issues + expect( + childItems((collection.get(1) as any).issueCount, `total`), + ).toEqual([{ total: 2 }]) + + issues.utils.begin() + issues.utils.write({ + type: `delete`, + value: sampleIssues.find((i) => i.id === 10)!, + }) + issues.utils.commit() + + // Alpha now has 1 issue + expect( + childItems((collection.get(1) as any).issueCount, `total`), + ).toEqual([{ total: 1 }]) + + // Beta should still have 1 + expect( + childItems((collection.get(2) as any).issueCount, `total`), + ).toEqual([{ total: 1 }]) + }) + }) + + describe(`single-group aggregate: count issues per project (as toArray)`, () => { + function buildAggregateToArrayQuery() { + return createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issueCount: toArray( + q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ total: count(i.id) })), + ), + })), + ) + } + + it(`each project gets its own aggregate result as an array`, async () => { + const collection = buildAggregateToArrayQuery() + await collection.preload() + + // Alpha has 2 issues + const alpha = collection.get(1) as any + expect(alpha.issueCount).toEqual([{ total: 2 }]) + + // Beta has 1 issue + const beta = collection.get(2) as any + expect(beta.issueCount).toEqual([{ total: 1 }]) + + // Gamma has 0 issues — empty array + const gamma = collection.get(3) as any + expect(gamma.issueCount).toEqual([]) + }) + }) + + describe(`nested aggregate: count comments per issue (as Collection)`, () => { + function buildNestedAggregateQuery() { + return createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + commentCount: q + .from({ c: comments }) + .where(({ c }) => eq(c.issueId, i.id)) + .select(({ c }) => ({ total: count(c.id) })), + })), + })), + ) + } + + it(`each issue gets its own comment count`, async () => { + const collection = buildNestedAggregateQuery() + await collection.preload() + + // Alpha's issues + const alpha = collection.get(1) as any + const issue10 = alpha.issues.get(10) + expect(childItems(issue10.commentCount, `total`)).toEqual([ + { total: 2 }, + ]) + + const issue11 = alpha.issues.get(11) + // Issue 11 has 0 comments — empty Collection + expect(childItems(issue11.commentCount, `total`)).toEqual([]) + + // Beta's issue + const beta = collection.get(2) as any + const issue20 = beta.issues.get(20) + expect(childItems(issue20.commentCount, `total`)).toEqual([ + { total: 1 }, + ]) + }) + }) + + describe(`nested aggregate: count comments per issue (as toArray)`, () => { + function buildNestedAggregateToArrayQuery() { + return createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + commentCount: toArray( + q + .from({ c: comments }) + .where(({ c }) => eq(c.issueId, i.id)) + .select(({ c }) => ({ total: count(c.id) })), + ), + })), + })), + ) + } + + it(`each issue gets its own comment count as an array`, async () => { + const collection = buildNestedAggregateToArrayQuery() + await collection.preload() + + // Alpha's issues + const alpha = collection.get(1) as any + const issue10 = alpha.issues.get(10) + expect(issue10.commentCount).toEqual([{ total: 2 }]) + + const issue11 = alpha.issues.get(11) + // Issue 11 has 0 comments — empty array + expect(issue11.commentCount).toEqual([]) + + // Beta's issue + const beta = collection.get(2) as any + const issue20 = beta.issues.get(20) + expect(issue20.commentCount).toEqual([{ total: 1 }]) + }) + }) + }) })