Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/service-core/src/api/api-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export function createCoreAPIMetrics(engine: MetricsEngine): void {
unit: 'bytes'
});

engine.createCounter({
name: APIMetric.DATA_SENT_BYTES,
description: 'Size of data sent to clients, after compression if applicable',
unit: 'bytes'
});

engine.createCounter({
name: APIMetric.OPERATIONS_SYNCED,
description: 'Number of operations synced'
Expand Down
78 changes: 78 additions & 0 deletions packages/service-core/src/routes/compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { PassThrough, pipeline, Readable, Transform } from 'node:stream';
import type Negotiator from 'negotiator';
import * as zlib from 'node:zlib';
import { RequestTracker } from '../sync/RequestTracker.js';

/**
* Compress a streamed response.
*
* `@fastify/compress` can do something similar, but does not appear to work as well on streamed responses.
* The manual implementation is simple enough, and gives us more control over the low-level details.
*
* @param negotiator Negotiator from the request, to negotiate response encoding
* @param stream plain-text stream
* @returns
*/
export function maybeCompressResponseStream(
negotiator: Negotiator,
stream: Readable,
tracker: RequestTracker
): { stream: Readable; encodingHeaders: { 'content-encoding'?: string } } {
const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' });
if (encoding == 'zstd') {
tracker.setCompressed(encoding);
return {
stream: transform(
stream,
// Available since Node v23.8.0, v22.15.0
// This does the actual compression in a background thread pool.
zlib.createZstdCompress({
// We need to flush the frame after every new input chunk, to avoid delaying data
// in the output stream.
flush: zlib.constants.ZSTD_e_flush,
params: {
// Default compression level is 3. We reduce this slightly to limit CPU overhead
[zlib.constants.ZSTD_c_compressionLevel]: 2
}
}),
tracker
),
encodingHeaders: { 'content-encoding': 'zstd' }
};
} else if (encoding == 'gzip') {
tracker.setCompressed(encoding);
return {
stream: transform(
stream,
zlib.createGzip({
// We need to flush the frame after every new input chunk, to avoid delaying data
// in the output stream.
flush: zlib.constants.Z_SYNC_FLUSH
}),
tracker
),
encodingHeaders: { 'content-encoding': 'gzip' }
};
} else {
return {
stream: stream,
encodingHeaders: {}
};
}
}

function transform(source: Readable, transform: Transform, tracker: RequestTracker) {
// pipe does not forward error events automatically, resulting in unhandled error
// events. This forwards it.
const out = new PassThrough();
const trackingTransform = new Transform({
transform(chunk, encoding, callback) {
tracker.addCompressedDataSent(chunk.length);
callback(null, chunk);
}
});
pipeline(source, transform, trackingTransform, out, (err) => {
if (err) out.destroy(err);
});
return out;
}
2 changes: 1 addition & 1 deletion packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
const serialized = sync.syncLineToBson(data);
responder.onNext({ data: serialized }, false);
requestedN--;
tracker.addDataSynced(serialized.length);
tracker.addPlaintextDataSynced(serialized.length);
}

if (requestedN <= 0 && !signal.aborted) {
Expand Down
15 changes: 9 additions & 6 deletions packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework';
import { RequestParameters } from '@powersync/service-sync-rules';
import { Readable } from 'stream';
import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework';
import Negotiator from 'negotiator';
import { Readable } from 'stream';

import * as sync from '../../sync/sync-index.js';
import * as util from '../../util/util-index.js';
Expand All @@ -10,6 +9,7 @@ import { authUser } from '../auth.js';
import { routeDefinition } from '../router.js';

import { APIMetric } from '@powersync/service-types';
import { maybeCompressResponseStream } from '../compression.js';

export enum SyncRoutes {
STREAM = '/sync/stream'
Expand All @@ -31,9 +31,10 @@ export const syncStreamed = routeDefinition({
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
const clientId = payload.params.client_id;
const streamStart = Date.now();
const negotiator = new Negotiator(payload.request);
// This falls back to JSON unless there's preference for the bson-stream in the Accept header.
const useBson = payload.request.headers.accept
? new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType
? negotiator.mediaType(supportedContentTypes) == concatenatedBsonContentType
: false;

logger.defaultMeta = {
Expand Down Expand Up @@ -81,10 +82,11 @@ export const syncStreamed = routeDefinition({
});

const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines);
const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
const plainStream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
objectMode: false,
highWaterMark: 16 * 1024
});
const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream, tracker);

// Best effort guess on why the stream was closed.
// We use the `??=` operator everywhere, so that we catch the first relevant
Expand Down Expand Up @@ -119,7 +121,8 @@ export const syncStreamed = routeDefinition({
return new router.RouterResponse({
status: 200,
headers: {
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType,
...encodingHeaders
},
data: stream,
afterSend: async (details) => {
Expand Down
29 changes: 27 additions & 2 deletions packages/service-core/src/sync/RequestTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ import { MetricsEngine } from '../metrics/MetricsEngine.js';

import { APIMetric } from '@powersync/service-types';
import { SyncBucketData } from '../util/protocol-types.js';
import { ServiceAssertionError } from '@powersync/lib-services-framework';

/**
* Record sync stats per request stream.
*/
export class RequestTracker {
operationsSynced = 0;
dataSyncedBytes = 0;
dataSentBytes = 0;
operationCounts: OperationCounts = { put: 0, remove: 0, move: 0, clear: 0 };
largeBuckets: Record<string, number> = {};

private encoding: string | undefined = undefined;

constructor(private metrics: MetricsEngine) {
this.metrics = metrics;
}
Expand All @@ -29,18 +33,39 @@ export class RequestTracker {
this.metrics.getCounter(APIMetric.OPERATIONS_SYNCED).add(operations.total);
}

addDataSynced(bytes: number) {
setCompressed(encoding: string) {
this.encoding = encoding;
}

addPlaintextDataSynced(bytes: number) {
this.dataSyncedBytes += bytes;

this.metrics.getCounter(APIMetric.DATA_SYNCED_BYTES).add(bytes);

if (this.encoding == null) {
// This avoids having to create a separate stream just to track this
this.dataSentBytes += bytes;

this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes);
}
}

addCompressedDataSent(bytes: number) {
if (this.encoding == null) {
throw new ServiceAssertionError('No compression encoding set');
}
this.dataSentBytes += bytes;
this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes);
}

getLogMeta() {
return {
operations_synced: this.operationsSynced,
data_synced_bytes: this.dataSyncedBytes,
data_sent_bytes: this.dataSentBytes,
operation_counts: this.operationCounts,
large_buckets: this.largeBuckets
large_buckets: this.largeBuckets,
encoding: this.encoding
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/src/sync/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export async function* transformToBytesTracked(
encoded = data;
}

tracker.addDataSynced(encoded.length);
tracker.addPlaintextDataSynced(encoded.length);
yield encoded;
}
}
Expand Down
59 changes: 59 additions & 0 deletions packages/service-core/test/src/routes/mocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {
BucketStorageFactory,
createCoreAPIMetrics,
MetricsEngine,
OpenTelemetryMetricsFactory,
RouteAPI,
RouterEngine,
ServiceContext,
StorageEngine,
SyncContext,
SyncRulesBucketStorage
} from '@/index.js';
import { MeterProvider } from '@opentelemetry/sdk-metrics';

export function mockServiceContext(storage: Partial<SyncRulesBucketStorage> | null) {
// This is very incomplete - just enough to get the current tests passing.

const storageEngine: StorageEngine = {
activeBucketStorage: {
async getActiveStorage() {
return storage;
}
} as Partial<BucketStorageFactory>
} as any;

const meterProvider = new MeterProvider({
readers: []
});
const meter = meterProvider.getMeter('powersync-tests');
const metricsEngine = new MetricsEngine({
disable_telemetry_sharing: true,
factory: new OpenTelemetryMetricsFactory(meter)
});
createCoreAPIMetrics(metricsEngine);
const service_context: Partial<ServiceContext> = {
syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }),
routerEngine: {
getAPI() {
return {
getParseSyncRulesOptions() {
return { defaultSchema: 'public' };
}
} as Partial<RouteAPI>;
},
addStopHandler() {
return () => {};
}
} as Partial<RouterEngine> as any,
storageEngine,
metricsEngine: metricsEngine,
// Not used
configuration: null as any,
lifeCycleEngine: null as any,
migrations: null as any,
replicationEngine: null as any,
serviceMode: null as any
};
return service_context as ServiceContext;
}
84 changes: 84 additions & 0 deletions packages/service-core/test/src/routes/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js';
import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework';
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js';
import { mockServiceContext } from './mocks.js';

describe('Stream Route', () => {
describe('compressed stream', () => {
it('handles missing sync rules', async () => {
const context: Context = {
logger: logger,
service_context: mockServiceContext(null)
};

const request: BasicRouterRequest = {
headers: {},
hostname: '',
protocol: 'http'
};

const error = (await (syncStreamed.handler({ context, params: {}, request }) as Promise<RouterResponse>).catch(
(e) => e
)) as ServiceError;

expect(error.errorData.status).toEqual(500);
expect(error.errorData.code).toEqual('PSYNC_S2302');
});

it('handles a stream error with compression', async () => {
// This primarily tests that an underlying storage error doesn't result in an uncaught error
// when compressing the stream.

const storage = {
getParsedSyncRules() {
return new SqlSyncRules('bucket_definitions: {}');
},
watchCheckpointChanges: async function* (options) {
throw new Error('Simulated storage error');
}
} as Partial<SyncRulesBucketStorage>;
const serviceContext = mockServiceContext(storage);

const context: Context = {
logger: logger,
service_context: serviceContext,
token_payload: {
exp: new Date().getTime() / 1000 + 10000,
iat: new Date().getTime() / 1000 - 10000,
sub: 'test-user'
}
};

// It may be worth eventually doing this via Fastify to test the full stack

const request: BasicRouterRequest = {
headers: {
'accept-encoding': 'gzip'
},
hostname: '',
protocol: 'http'
};

const response = await (syncStreamed.handler({ context, params: {}, request }) as Promise<RouterResponse>);
expect(response.status).toEqual(200);
const stream = response.data as Readable;
const r = await drainWithTimeout(stream).catch((error) => error);
expect(r.message).toContain('Simulated storage error');
});
});
});

export async function drainWithTimeout(readable: Readable, ms = 2_000) {
const devNull = new Writable({
write(_chunk, _enc, cb) {
cb();
} // discard everything
});

// Throws AbortError if it takes longer than ms, and destroys the stream
await pipeline(readable, devNull, { signal: AbortSignal.timeout(ms) });
}
2 changes: 2 additions & 0 deletions packages/types/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export enum APIMetric {
// Uncompressed size of synced data from PowerSync to Clients
DATA_SYNCED_BYTES = 'powersync_data_synced_bytes_total',
// Potentially-compressed size of data sent from PowerSync to Clients
DATA_SENT_BYTES = 'powersync_data_sent_bytes_total',
// Number of operations synced
OPERATIONS_SYNCED = 'powersync_operations_synced_total',
// Number of concurrent sync connections
Expand Down