Skip to content

Commit 92391ab

Browse files
committed
Introduce BucketSource interface
1 parent 21200fc commit 92391ab

File tree

9 files changed

+189
-392
lines changed

9 files changed

+189
-392
lines changed

packages/service-core/src/routes/endpoints/sync-rules.ts

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -202,34 +202,7 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {
202202

203203
return {
204204
valid: true,
205-
bucket_definitions: rules.bucketDescriptors.map((d) => {
206-
let all_parameter_queries = [...d.parameterQueries.values()].flat();
207-
let all_data_queries = [...d.dataQueries.values()].flat();
208-
return {
209-
name: d.name,
210-
bucket_parameters: d.bucketParameters,
211-
global_parameter_queries: d.globalParameterQueries.map((q) => {
212-
return {
213-
sql: q.sql
214-
};
215-
}),
216-
parameter_queries: all_parameter_queries.map((q) => {
217-
return {
218-
sql: q.sql,
219-
table: q.sourceTable,
220-
input_parameters: q.inputParameters
221-
};
222-
}),
223-
224-
data_queries: all_data_queries.map((q) => {
225-
return {
226-
sql: q.sql,
227-
table: q.sourceTable,
228-
columns: q.columnOutputNames()
229-
};
230-
})
231-
};
232-
}),
205+
bucket_definitions: rules.bucketSources.map((source) => source.debugRepresentation()),
233206
source_tables: resolved_tables,
234207
data_tables: rules.debugGetOutputTables()
235208
};

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import {
22
BucketDescription,
33
BucketPriority,
4-
isValidPriority,
4+
BucketSource,
5+
BucketSourceType,
56
RequestedStream,
67
RequestParameters,
78
ResolvedBucket,
@@ -22,7 +23,6 @@ import { JSONBig } from '@powersync/service-jsonbig';
2223
import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js';
2324
import { SyncContext } from './SyncContext.js';
2425
import { getIntersection, hasIntersection } from './util.js';
25-
import { SqlBucketDescriptor, SqlBucketDescriptorType } from '@powersync/service-sync-rules/src/SqlBucketDescriptor.js';
2626

2727
export interface BucketChecksumStateOptions {
2828
syncContext: SyncContext;
@@ -234,14 +234,13 @@ export class BucketChecksumState {
234234
this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length });
235235
};
236236
bucketsToFetch = allBuckets;
237-
this.parameterState.syncRules.bucketDescriptors;
238237

239238
const subscriptions: util.StreamDescription[] = [];
240-
for (const desc of this.parameterState.syncRules.bucketDescriptors) {
241-
if (desc.type == SqlBucketDescriptorType.STREAM && this.parameterState.isSubscribedToStream(desc)) {
239+
for (const source of this.parameterState.syncRules.bucketSources) {
240+
if (source.type == BucketSourceType.SYNC_STREAM && this.parameterState.isSubscribedToStream(source)) {
242241
subscriptions.push({
243-
name: desc.name,
244-
is_default: desc.subscribedToByDefault
242+
name: source.name,
243+
is_default: source.subscribedToByDefault
245244
});
246245
}
247246
}
@@ -455,7 +454,7 @@ export class BucketParameterState {
455454
};
456455
}
457456

458-
isSubscribedToStream(desc: SqlBucketDescriptor): boolean {
457+
isSubscribedToStream(desc: BucketSource): boolean {
459458
return (desc.subscribedToByDefault && this.includeDefaultStreams) || this.subscribedStreamNames.has(desc.name);
460459
}
461460

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { BucketParameterQuerier, ParameterLookup } from './BucketParameterQuerier.js';
2+
import { ColumnDefinition } from './ExpressionType.js';
3+
import { SourceTableInterface } from './SourceTableInterface.js';
4+
import { GetQuerierOptions } from './SqlSyncRules.js';
5+
import { TablePattern } from './TablePattern.js';
6+
import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, SourceSchema, SqliteRow } from './types.js';
7+
8+
/**
9+
* An interface declaring
10+
*
11+
* - which buckets the sync service should create when processing change streams from the database.
12+
* - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
13+
* - which buckets a given connection has access to.
14+
*
15+
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
16+
* definitions that only consist of a single query.
17+
*/
18+
export interface BucketSource {
19+
name: string;
20+
type: BucketSourceType;
21+
22+
subscribedToByDefault: boolean;
23+
24+
/**
25+
* Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should
26+
* be associated with.
27+
*
28+
* The returned {@link ParameterLookup} can be referenced by {@link pushBucketParameterQueriers} to allow the storage
29+
* system to find buckets.
30+
*/
31+
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[];
32+
33+
/**
34+
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
35+
* data for rows to add to buckets.
36+
*/
37+
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
38+
39+
/**
40+
* Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to.
41+
*
42+
* @param result The target array to insert queriers into.
43+
* @param options Options, including parameters that may affect the buckets loaded by this source.
44+
*/
45+
pushBucketParameterQueriers(result: BucketParameterQuerier[], options: GetQuerierOptions): void;
46+
47+
/**
48+
* Whether {@link pushBucketParameterQueriers} may include a querier where
49+
* {@link BucketParameterQuerier.hasDynamicBuckets} is true.
50+
*
51+
* This is mostly used for testing.
52+
*/
53+
hasDynamicBucketQueries(): boolean;
54+
55+
getSourceTables(): Set<TablePattern>;
56+
57+
/** Whether the table possibly affects the buckets resolved by this source. */
58+
tableSyncsParameters(table: SourceTableInterface): boolean;
59+
60+
/** Whether the table possibly affects the contents of buckets resolved by this source. */
61+
tableSyncsData(table: SourceTableInterface): boolean;
62+
63+
/**
64+
* Given a static schema, infer all logical tables and associated columns that appear in buckets defined by this
65+
* source.
66+
*
67+
* This is use to generate the client-side schema.
68+
*/
69+
resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>): void;
70+
71+
debugWriteOutputTables(result: Record<string, { query: string }[]>): void;
72+
73+
debugRepresentation(): any;
74+
}
75+
76+
export enum BucketSourceType {
77+
SYNC_RULE,
78+
SYNC_STREAM
79+
}
80+
81+
export type ResultSetDescription = { name: string; columns: ColumnDefinition[] };

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 78 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import { BucketDescription, BucketInclusionReason, ResolvedBucket } from './BucketDescription.js';
1+
import { BucketInclusionReason, ResolvedBucket } from './BucketDescription.js';
22
import { BucketParameterQuerier, mergeBucketParameterQueriers } from './BucketParameterQuerier.js';
3+
import { BucketSource, BucketSourceType, ResultSetDescription } from './BucketSource.js';
4+
import { ColumnDefinition } from './ExpressionType.js';
35
import { IdSequence } from './IdSequence.js';
46
import { SourceTableInterface } from './SourceTableInterface.js';
57
import { SqlDataQuery } from './SqlDataQuery.js';
68
import { SqlParameterQuery } from './SqlParameterQuery.js';
79
import { GetQuerierOptions, SyncRulesOptions } from './SqlSyncRules.js';
810
import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
9-
import { StreamQuery } from './StreamQuery.js';
1011
import { TablePattern } from './TablePattern.js';
1112
import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js';
1213
import { SqlRuleError } from './errors.js';
@@ -16,8 +17,8 @@ import {
1617
EvaluationResult,
1718
QueryParseOptions,
1819
RequestParameters,
19-
SqliteRow,
20-
StreamParseOptions
20+
SourceSchema,
21+
SqliteRow
2122
} from './types.js';
2223

2324
export interface QueryParseResult {
@@ -29,23 +30,20 @@ export interface QueryParseResult {
2930
errors: SqlRuleError[];
3031
}
3132

32-
export enum SqlBucketDescriptorType {
33-
SYNC_RULE,
34-
STREAM
35-
}
36-
37-
export class SqlBucketDescriptor {
33+
export class SqlBucketDescriptor implements BucketSource {
3834
name: string;
3935
bucketParameters?: string[];
40-
type: SqlBucketDescriptorType;
41-
subscribedToByDefault: boolean;
4236

43-
constructor(name: string, type: SqlBucketDescriptorType) {
37+
constructor(name: string) {
4438
this.name = name;
45-
this.type = type;
39+
}
4640

47-
// Sync-rule style buckets are subscribed to by default, streams are opt-in unless their definition says otherwise.
48-
this.subscribedToByDefault = type == SqlBucketDescriptorType.SYNC_RULE;
41+
get type(): BucketSourceType {
42+
return BucketSourceType.SYNC_RULE;
43+
}
44+
45+
public get subscribedToByDefault(): boolean {
46+
return true;
4947
}
5048

5149
/**
@@ -94,24 +92,6 @@ export class SqlBucketDescriptor {
9492
};
9593
}
9694

97-
addUnifiedStreamQuery(sql: string, options: StreamParseOptions): QueryParseResult {
98-
const [query, errors] = StreamQuery.fromSql(this.name, sql, options);
99-
for (const parameterQuery of query.inferredParameters) {
100-
if (parameterQuery instanceof StaticSqlParameterQuery) {
101-
this.globalParameterQueries.push(parameterQuery);
102-
} else {
103-
this.parameterQueries.push(parameterQuery);
104-
}
105-
}
106-
this.dataQueries.push(query.data);
107-
this.subscribedToByDefault = options.default ?? false;
108-
109-
return {
110-
parsed: true,
111-
errors
112-
};
113-
}
114-
11595
evaluateRow(options: EvaluateRowOptions): EvaluationResult[] {
11696
let results: EvaluationResult[] = [];
11797
for (let query of this.dataQueries) {
@@ -137,20 +117,16 @@ export class SqlBucketDescriptor {
137117
/**
138118
* @deprecated Use `pushBucketParameterQueriers` instead and merge at the top-level.
139119
*/
140-
getBucketParameterQuerier(options: GetQuerierOptions, parameters: RequestParameters): BucketParameterQuerier {
120+
getBucketParameterQuerier(options: GetQuerierOptions): BucketParameterQuerier {
141121
const queriers: BucketParameterQuerier[] = [];
142-
this.pushBucketParameterQueriers(queriers, options, parameters);
122+
this.pushBucketParameterQueriers(queriers, options);
143123

144124
return mergeBucketParameterQueriers(queriers);
145125
}
146126

147-
pushBucketParameterQueriers(
148-
result: BucketParameterQuerier[],
149-
options: GetQuerierOptions,
150-
parameters: RequestParameters
151-
) {
152-
const reasons = [this.bucketInclusionReason(options)];
153-
const staticBuckets = this.getStaticBucketDescriptions(parameters, reasons);
127+
pushBucketParameterQueriers(result: BucketParameterQuerier[], options: GetQuerierOptions) {
128+
const reasons = [this.bucketInclusionReason()];
129+
const staticBuckets = this.getStaticBucketDescriptions(options.globalParameters, reasons);
154130
const staticQuerier = {
155131
staticBuckets,
156132
hasDynamicBuckets: false,
@@ -163,7 +139,9 @@ export class SqlBucketDescriptor {
163139
return;
164140
}
165141

166-
const dynamicQueriers = this.parameterQueries.map((query) => query.getBucketParameterQuerier(parameters, reasons));
142+
const dynamicQueriers = this.parameterQueries.map((query) =>
143+
query.getBucketParameterQuerier(options.globalParameters, reasons)
144+
);
167145
result.push(...dynamicQueriers);
168146
}
169147

@@ -198,12 +176,8 @@ export class SqlBucketDescriptor {
198176
return result;
199177
}
200178

201-
private bucketInclusionReason(parameters: GetQuerierOptions): BucketInclusionReason {
202-
if (this.type == SqlBucketDescriptorType.STREAM && !this.subscribedToByDefault) {
203-
return { subscription: this.name };
204-
} else {
205-
return 'default';
206-
}
179+
private bucketInclusionReason(): BucketInclusionReason {
180+
return 'default';
207181
}
208182

209183
tableSyncsData(table: SourceTableInterface): boolean {
@@ -223,4 +197,58 @@ export class SqlBucketDescriptor {
223197
}
224198
return false;
225199
}
200+
201+
resolveResultSets(schema: SourceSchema, tables: Record<string, Record<string, ColumnDefinition>>) {
202+
for (let query of this.dataQueries) {
203+
const outTables = query.getColumnOutputs(schema);
204+
for (let table of outTables) {
205+
tables[table.name] ??= {};
206+
for (let column of table.columns) {
207+
if (column.name != 'id') {
208+
tables[table.name][column.name] ??= column;
209+
}
210+
}
211+
}
212+
}
213+
}
214+
215+
debugWriteOutputTables(result: Record<string, { query: string }[]>): void {
216+
for (let q of this.dataQueries) {
217+
result[q.table!] ??= [];
218+
const r = {
219+
query: q.sql
220+
};
221+
222+
result[q.table!].push(r);
223+
}
224+
}
225+
226+
debugRepresentation() {
227+
let all_parameter_queries = [...this.parameterQueries.values()].flat();
228+
let all_data_queries = [...this.dataQueries.values()].flat();
229+
return {
230+
name: this.name,
231+
type: this.type.toString(),
232+
bucket_parameters: this.bucketParameters,
233+
global_parameter_queries: this.globalParameterQueries.map((q) => {
234+
return {
235+
sql: q.sql
236+
};
237+
}),
238+
parameter_queries: all_parameter_queries.map((q) => {
239+
return {
240+
sql: q.sql,
241+
table: q.sourceTable,
242+
input_parameters: q.inputParameters
243+
};
244+
}),
245+
data_queries: all_data_queries.map((q) => {
246+
return {
247+
sql: q.sql,
248+
table: q.sourceTable,
249+
columns: q.columnOutputNames()
250+
};
251+
})
252+
};
253+
}
226254
}

0 commit comments

Comments
 (0)