Skip to content

Commit efbd000

Browse files
committed
Support new sync stream syntax
1 parent 3c0a98b commit efbd000

22 files changed

+2706
-169
lines changed

packages/service-core/src/routes/endpoints/sync-rules.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework';
2-
import { SqlSyncRules, SyncRulesErrors } from '@powersync/service-sync-rules';
2+
import { SqlBucketDescriptor, SqlSyncRules, SyncRulesErrors } from '@powersync/service-sync-rules';
33
import type { FastifyPluginAsync } from 'fastify';
44
import * as t from 'ts-codec';
55

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,29 @@ import { ColumnDefinition } from './ExpressionType.js';
44
import { SourceTableInterface } from './SourceTableInterface.js';
55
import { SqlTools } from './sql_filters.js';
66
import { TablePattern } from './TablePattern.js';
7-
import { QueryParameters, QuerySchema, SourceSchema, SourceSchemaTable, SqliteJsonRow, SqliteRow } from './types.js';
7+
import {
8+
EvaluationResult,
9+
QueryParameters,
10+
QuerySchema,
11+
SourceSchema,
12+
SourceSchemaTable,
13+
SqliteJsonRow,
14+
SqliteRow
15+
} from './types.js';
816
import { filterJsonRow } from './utils.js';
17+
import { castAsText } from './sql_functions.js';
918

1019
export interface RowValueExtractor {
1120
extract(tables: QueryParameters, into: SqliteRow): void;
1221
getTypes(schema: QuerySchema, into: Record<string, ColumnDefinition>): void;
1322
}
1423

24+
export interface EvaluateRowOptions {
25+
table: SourceTableInterface;
26+
row: SqliteRow;
27+
bucketIds: (params: QueryParameters) => string[];
28+
}
29+
1530
export interface BaseSqlDataQueryOptions {
1631
sourceTable: TablePattern;
1732
table: string;
@@ -21,7 +36,6 @@ export interface BaseSqlDataQueryOptions {
2136
descriptorName: string;
2237
bucketParameters: string[];
2338
tools: SqlTools;
24-
2539
errors?: SqlRuleError[];
2640
}
2741

@@ -149,6 +163,39 @@ export class BaseSqlDataQuery {
149163
return result;
150164
}
151165

166+
evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] {
167+
try {
168+
const { table, row, bucketIds } = options;
169+
170+
const tables = { [this.table]: this.addSpecialParameters(table, row) };
171+
const resolvedBucketIds = bucketIds(tables);
172+
173+
const data = this.transformRow(tables);
174+
let id = data.id;
175+
if (typeof id != 'string') {
176+
// While an explicit cast would be better, this covers against very common
177+
// issues when initially testing out sync, for example when the id column is an
178+
// auto-incrementing integer.
179+
// If there is no id column, we use a blank id. This will result in the user syncing
180+
// a single arbitrary row for this table - better than just not being able to sync
181+
// anything.
182+
id = castAsText(id) ?? '';
183+
}
184+
const outputTable = this.getOutputName(table.name);
185+
186+
return resolvedBucketIds.map((bucketId) => {
187+
return {
188+
bucket: bucketId,
189+
table: outputTable,
190+
id: id,
191+
data
192+
} as EvaluationResult;
193+
});
194+
} catch (e) {
195+
return [{ error: e.message ?? `Evaluating data query failed` }];
196+
}
197+
}
198+
152199
protected transformRow(tables: QueryParameters): SqliteJsonRow {
153200
let result: SqliteRow = {};
154201
for (let extractor of this.extractors) {

packages/sync-rules/src/SqlDataQuery.ts

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -186,36 +186,15 @@ export class SqlDataQuery extends BaseSqlDataQuery {
186186
}
187187

188188
evaluateRow(table: SourceTableInterface, row: SqliteRow): EvaluationResult[] {
189-
try {
190-
const tables = { [this.table]: this.addSpecialParameters(table, row) };
191-
const bucketParameters = this.filter.filterRow(tables);
192-
const bucketIds = bucketParameters.map((params) =>
193-
getBucketId(this.descriptorName, this.bucketParameters, params)
194-
);
195-
196-
const data = this.transformRow(tables);
197-
let id = data.id;
198-
if (typeof id != 'string') {
199-
// While an explicit cast would be better, this covers against very common
200-
// issues when initially testing out sync, for example when the id column is an
201-
// auto-incrementing integer.
202-
// If there is no id column, we use a blank id. This will result in the user syncing
203-
// a single arbitrary row for this table - better than just not being able to sync
204-
// anything.
205-
id = castAsText(id) ?? '';
189+
const query = this;
190+
191+
return this.evaluateRowWithOptions({
192+
table,
193+
row,
194+
bucketIds(tables) {
195+
const bucketParameters = query.filter.filterRow(tables);
196+
return bucketParameters.map((params) => getBucketId(query.descriptorName, query.bucketParameters, params));
206197
}
207-
const outputTable = this.getOutputName(table.name);
208-
209-
return bucketIds.map((bucketId) => {
210-
return {
211-
bucket: bucketId,
212-
table: outputTable,
213-
id: id,
214-
data
215-
} as EvaluationResult;
216-
});
217-
} catch (e) {
218-
return [{ error: e.message ?? `Evaluating data query failed` }];
219-
}
198+
});
220199
}
221200
}

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ import {
2222
SourceSchema,
2323
SqliteJsonRow,
2424
SqliteRow,
25+
StreamParseOptions,
2526
SyncRules
2627
} from './types.js';
2728
import { BucketSource } from './BucketSource.js';
29+
import { SyncStream } from './streams/stream.js';
30+
import { syncStreamFromSql } from './streams/from_sql.js';
2831

2932
const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES');
3033

@@ -137,6 +140,7 @@ export class SqlSyncRules implements SyncRules {
137140

138141
// Bucket definitions using explicit parameter and data queries.
139142
const bucketMap = parsed.get('bucket_definitions') as YAMLMap;
143+
const streamMap = parsed.get('streams') as YAMLMap | null;
140144
const definitionNames = new Set<string>();
141145
const checkUniqueName = (name: string, literal: Scalar) => {
142146
if (definitionNames.has(name)) {
@@ -209,6 +213,39 @@ export class SqlSyncRules implements SyncRules {
209213
rules.bucketSources.push(descriptor);
210214
}
211215

216+
for (const entry of streamMap?.items ?? []) {
217+
const { key: keyScalar, value } = entry as { key: Scalar; value: YAMLMap };
218+
const key = keyScalar.toString();
219+
if (!checkUniqueName(key, keyScalar)) {
220+
continue;
221+
}
222+
223+
const accept_potentially_dangerous_queries =
224+
value.get('accept_potentially_dangerous_queries', true)?.value == true;
225+
226+
const queryOptions: StreamParseOptions = {
227+
...options,
228+
accept_potentially_dangerous_queries,
229+
priority: rules.parsePriority(value),
230+
default: value.get('default', true)?.value == true
231+
};
232+
233+
const data = value.get('query', true) as unknown;
234+
if (data instanceof Scalar) {
235+
rules.withScalar(data, (q) => {
236+
const [parsed, errors] = syncStreamFromSql(key, q, options);
237+
rules.bucketSources.push(parsed);
238+
return {
239+
parsed: true,
240+
errors
241+
};
242+
});
243+
} else {
244+
rules.errors.push(this.tokenError(data, 'Must be a string.'));
245+
continue;
246+
}
247+
}
248+
212249
const eventMap = parsed.get('event_definitions') as YAMLMap;
213250
for (const event of eventMap?.items ?? []) {
214251
const { key, value } = event as { key: Scalar; value: YAMLSeq };

packages/sync-rules/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ export * from './schema-generators/schema-generators.js';
1212
export * from './SourceTableInterface.js';
1313
export * from './sql_filters.js';
1414
export * from './sql_functions.js';
15+
export { SqlBucketDescriptor } from './SqlBucketDescriptor.js';
1516
export * from './SqlDataQuery.js';
1617
export * from './SqlParameterQuery.js';
1718
export * from './SqlSyncRules.js';
1819
export * from './StaticSchema.js';
20+
export { SyncStream } from './streams/stream.js';
21+
export { syncStreamFromSql } from './streams/from_sql.js';
1922
export * from './TablePattern.js';
2023
export * from './types.js';
2124
export * from './utils.js';

packages/sync-rules/src/request_functions.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ const request_parameters: SqlParameterFunction = {
2828
usesUnauthenticatedRequestParameters: true
2929
};
3030

31-
const request_jwt: SqlParameterFunction = {
31+
export const request_jwt: SqlParameterFunction = {
3232
debugName: 'request.jwt',
3333
call(parameters: ParameterValueSet) {
3434
return parameters.rawTokenPayload;
@@ -42,7 +42,7 @@ const request_jwt: SqlParameterFunction = {
4242
usesUnauthenticatedRequestParameters: false
4343
};
4444

45-
const request_user_id: SqlParameterFunction = {
45+
export const request_user_id: SqlParameterFunction = {
4646
debugName: 'request.user_id',
4747
call(parameters: ParameterValueSet) {
4848
return parameters.userId;

0 commit comments

Comments
 (0)