Skip to content

Commit ee1efc0

Browse files
committed
Support IN operator on parameter data
1 parent 54bfaa1 commit ee1efc0

File tree

6 files changed

+98
-23
lines changed

6 files changed

+98
-23
lines changed

packages/sync-rules/src/streams/filter.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,10 @@ export class Subquery {
271271
const lookups: ParameterLookup[] = [];
272272

273273
for (const [variant, id] of innerVariants) {
274-
const instantiation = variant.findStaticInstantiation(parameters);
275-
if (instantiation == null) {
276-
continue;
274+
const instantiations = variant.findStaticInstantiations(parameters);
275+
for (const instantiation of instantiations) {
276+
lookups.push(ParameterLookup.normalized(context.streamName, id, instantiation));
277277
}
278-
279-
lookups.push(ParameterLookup.normalized(context.streamName, id, instantiation));
280278
}
281279

282280
return lookups;
@@ -438,7 +436,20 @@ export class CompareRowValueWithStreamParameter extends FilterOperator {
438436
lookup: {
439437
type: 'static',
440438
fromRequest(parameters) {
441-
return filters.parametersToLookupValue(parameters);
439+
const value = filters.parametersToLookupValue(parameters);
440+
if (filters.expands) {
441+
if (typeof value != 'string') {
442+
return [];
443+
}
444+
let values: SqliteJsonValue[] = JSON.parse(value);
445+
if (!Array.isArray(values)) {
446+
return [];
447+
}
448+
449+
return values;
450+
} else {
451+
return [value];
452+
}
442453
}
443454
},
444455
filterRow(options) {

packages/sync-rules/src/streams/from_sql.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,12 @@ class SyncStreamCompiler {
7676

7777
const tools = new SqlTools({
7878
table: alias,
79-
parameterTables: ['subscription_parameters', 'token_parameters'],
79+
parameterTables: [],
8080
valueTables: [alias],
8181
sql: this.sql,
8282
schema: querySchema,
8383
supportsParameterExpressions: true,
84+
supportsExpandingParameters: true, // needed for table.column IN (subscription.parameters() -> ...)
8485
isStream: true
8586
});
8687
tools.checkSpecificNameCase(tableRef);
@@ -307,7 +308,7 @@ class SyncStreamCompiler {
307308

308309
const right = tools.compileClause(clause.right);
309310

310-
// For cases 3-5, we can actually uses SqlTools.compileClause. Case 3 and 4 are handled specially in there and return
311+
// For cases 3-5, we can actually use SqlTools.compileClause. Case 3 and 4 are handled specially in there and return
311312
// a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value
312313
// or a parameter-value clause which we can wrap in EvaluateSimpleCondition.
313314
const combined = tools.compileInClause(clause.left, left, clause.right, right);
@@ -366,7 +367,7 @@ class SyncStreamCompiler {
366367
// Create a new tools instance for this - the subquery does not have access to the outer one.
367368
const tools = new SqlTools({
368369
table: alias,
369-
parameterTables: ['subscription_parameters', 'token_parameters'],
370+
parameterTables: [],
370371
valueTables: [alias],
371372
sql: this.sql,
372373
schema: querySchema,

packages/sync-rules/src/streams/functions.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,29 @@ const subscription_parameters: SqlParameterFunction = {
1717
usesUnauthenticatedRequestParameters: true
1818
};
1919

20+
const connection_parameters: SqlParameterFunction = {
21+
debugName: 'connection.parameters',
22+
call(parameters: ParameterValueSet) {
23+
return parameters.rawUserParameters;
24+
},
25+
getReturnType() {
26+
return ExpressionType.TEXT;
27+
},
28+
detail: 'Unauthenticated connection parameters as JSON',
29+
documentation:
30+
'Returns parameters passed by the client as a JSON string. These parameters are not authenticated - any value can be passed in by the client.',
31+
usesAuthenticatedRequestParameters: false,
32+
usesUnauthenticatedRequestParameters: true
33+
};
34+
2035
export const STREAM_FUNCTIONS: Record<string, Record<string, SqlParameterFunction>> = {
2136
subscription: {
22-
parameter: subscription_parameters
37+
parameters: subscription_parameters
38+
},
39+
connection: {
40+
parameters: connection_parameters
2341
},
24-
request: {
42+
token: {
2543
user_id: request_user_id,
2644
jwt: request_jwt
2745
}

packages/sync-rules/src/streams/parameter.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,14 @@ export interface SubqueryLookups {
5555
*/
5656
export interface StaticLookup {
5757
type: 'static';
58-
fromRequest(parameters: ParameterValueSet): SqliteValue | null;
58+
59+
/**
60+
* The value this lookup evaluates to for a specific request.
61+
*
62+
* This is typically a singleton array, e.g. the user's id in `WHERE owner_id = token.user_id()`. To desugar `IN`
63+
* queries on parameter data, this can also return multiple values though: `WHERE owner_id IN subscription.parameters() -> 'user_ids'`.
64+
*/
65+
fromRequest(parameters: ParameterValueSet): SqliteValue[];
5966
}
6067

6168
/**

packages/sync-rules/src/streams/variant.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ export class StreamVariant {
146146
const staticBuckets: ResolvedBucket[] = [];
147147
if (dynamicParameters.length == 0 && dynamicRequestFilters.length == 0) {
148148
// When we have no dynamic parameters, the partial evaluation is a full instantiation.
149-
staticBuckets.push(this.resolveBucket(stream, instantiation as SqliteJsonValue[], reason));
149+
const instantiations = this.cartesianProductOfParameterInstantiations(instantiation as SqliteJsonValue[][]);
150+
for (const instantiation of instantiations) {
151+
staticBuckets.push(this.resolveBucket(stream, instantiation, reason));
152+
}
150153
}
151154

152155
const variant = this;
@@ -173,7 +176,13 @@ export class StreamVariant {
173176

174177
const perParameterInstantiation: (SqliteJsonValue | BucketParameter)[][] = [];
175178
for (const parameter of instantiation) {
176-
perParameterInstantiation.push([parameter]);
179+
if (Array.isArray(parameter)) {
180+
// Statically-resolved values
181+
perParameterInstantiation.push(parameter);
182+
} else {
183+
// to be instantiated with dynamic lookup
184+
perParameterInstantiation.push([parameter as BucketParameter]);
185+
}
177186
}
178187

179188
for (const lookup of dynamicParameters) {
@@ -189,13 +198,13 @@ export class StreamVariant {
189198
};
190199
}
191200

192-
findStaticInstantiation(params: RequestParameters): SqliteJsonValue[] | null {
201+
findStaticInstantiations(params: RequestParameters): SqliteJsonValue[][] {
193202
if (this.subqueries.length) {
194-
return null;
203+
return [];
195204
}
196205

197206
// This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters.
198-
return this.partiallyEvaluateParameters(params) as SqliteJsonValue[];
207+
return this.partiallyEvaluateParameters(params) as SqliteJsonValue[][];
199208
}
200209

201210
/**
@@ -239,23 +248,25 @@ export class StreamVariant {
239248
* Dynamic parameters that depend on subquery results are not replaced.
240249
* This returns null if there's a {@link StaticRequestFilter} that doesn't match the request.
241250
*/
242-
private partiallyEvaluateParameters(params: RequestParameters): (SqliteJsonValue | BucketParameter)[] | null {
251+
private partiallyEvaluateParameters(params: RequestParameters): (SqliteJsonValue[] | BucketParameter)[] | null {
243252
for (const filter of this.requestFilters) {
244253
if (filter.type == 'static' && !filter.matches(params)) {
245254
return null;
246255
}
247256
}
248257

249-
const instantiation: (SqliteJsonValue | BucketParameter)[] = [];
258+
const instantiation: (SqliteJsonValue[] | BucketParameter)[] = [];
250259
for (const parameter of this.parameters) {
251260
const lookup = parameter.lookup;
252261
if (lookup.type == 'static') {
253-
const value = lookup.fromRequest(params);
254-
if (isJsonValue(value)) {
255-
instantiation.push(value);
256-
} else {
262+
const values = lookup.fromRequest(params)?.filter(isJsonValue);
263+
if (values.length == 0) {
264+
// Parameter not instantiable for this request. Since parameters in a single variant form a conjunction, that
265+
// means the whole request won't find anything here.
257266
return null;
258267
}
268+
269+
instantiation.push(values);
259270
} else {
260271
instantiation.push(parameter);
261272
}

packages/sync-rules/test/src/streams.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,18 @@ describe('streams', () => {
318318
})
319319
).toStrictEqual(['stream|1["b"]']);
320320
});
321+
322+
test('on parameter data', async () => {
323+
const desc = parseStream("SELECT * FROM comments WHERE issue_id IN (subscription.parameters() -> 'issue_id')");
324+
325+
expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i' })).toStrictEqual(['stream|0["i"]']);
326+
expect(
327+
await queryBucketIds(desc, {
328+
token_parameters: { user_id: 'a' },
329+
parameters: { issue_id: ['i1', 'i2'] }
330+
})
331+
).toStrictEqual(['stream|0["i1"]', 'stream|0["i2"]']);
332+
});
321333
});
322334

323335
describe('overlap', () => {
@@ -400,6 +412,21 @@ describe('streams', () => {
400412
)
401413
]);
402414
});
415+
416+
test('subquery with two columns', () => {
417+
const [_, errors] = syncStreamFromSql(
418+
's',
419+
'select * from comments where issue_id in (select id, owner_id from issues where owner_id = request.user_id())',
420+
options
421+
);
422+
423+
expect(errors).toMatchObject([
424+
expect.toBeSqlRuleError(
425+
'This subquery must return exactly one column',
426+
'select id, owner_id from issues where owner_id = request.user_id()'
427+
)
428+
]);
429+
});
403430
});
404431

405432
describe('normalization', () => {

0 commit comments

Comments
 (0)