From ef0e1bd42bb41ba3b15b434b00a49971e50209e2 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 18 Aug 2025 15:22:34 +0200 Subject: [PATCH 1/9] Enable gzip, zstd compression for sync streams. --- .../service-core/src/routes/compression.ts | 54 +++++++++++++++++++ .../src/routes/endpoints/sync-stream.ts | 16 +++--- 2 files changed, 63 insertions(+), 7 deletions(-) create mode 100644 packages/service-core/src/routes/compression.ts diff --git a/packages/service-core/src/routes/compression.ts b/packages/service-core/src/routes/compression.ts new file mode 100644 index 000000000..ac62757b4 --- /dev/null +++ b/packages/service-core/src/routes/compression.ts @@ -0,0 +1,54 @@ +import { Readable } from 'node:stream'; +import type Negotiator from 'negotiator'; +import * as zlib from 'node:zlib'; + +/** + * Compress a streamed response. + * + * `@fastify/compress` can do something similar, but does not appear to work as well on streamed responses. + * The manual implementation is simple enough, and gives us more control over the low-level details. + * + * @param negotiator Negotiator from the request, to negotiate response encoding + * @param stream plain-text stream + * @returns + */ +export function maybeCompressResponseStream( + negotiator: Negotiator, + stream: Readable +): { stream: Readable; encodingHeaders: Record } { + const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'identity' }); + if (encoding == 'zstd') { + return { + stream: stream.pipe( + // Available since Node v23.8.0, v22.15.0 + // This does the actual compression in a background thread pool. + zlib.createZstdCompress({ + // We need to flush the frame after every new input chunk, to avoid delaying data + // in the output stream. + flush: zlib.constants.ZSTD_e_flush, + params: { + // Default compression level is 3. We reduce this slightly to limit CPU overhead + [zlib.constants.ZSTD_c_compressionLevel]: 2 + } + }) + ), + encodingHeaders: { 'Content-Encoding': 'zstd' } + }; + } else if (encoding == 'gzip') { + return { + stream: stream.pipe( + zlib.createGzip({ + // We need to flush the frame after every new input chunk, to avoid delaying data + // in the output stream. + flush: zlib.constants.Z_SYNC_FLUSH + }) + ), + encodingHeaders: { 'Content-Encoding': 'gzip' } + }; + } else { + return { + stream: stream, + encodingHeaders: {} + }; + } +} diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 8ce2aadf4..a217c39e0 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -1,7 +1,6 @@ -import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework'; -import { RequestParameters } from '@powersync/service-sync-rules'; -import { Readable } from 'stream'; +import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework'; import Negotiator from 'negotiator'; +import { Readable } from 'stream'; import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; @@ -10,6 +9,7 @@ import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; import { APIMetric } from '@powersync/service-types'; +import { maybeCompressResponseStream } from '../compression.js'; export enum SyncRoutes { STREAM = '/sync/stream' @@ -31,10 +31,10 @@ export const syncStreamed = routeDefinition({ const userAgent = headers['x-user-agent'] ?? headers['user-agent']; const clientId = payload.params.client_id; const streamStart = Date.now(); + const negotiator = new Negotiator(payload.request); // This falls back to JSON unless there's preference for the bson-stream in the Accept header. const useBson = - payload.request.headers.accept && - new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType; + payload.request.headers.accept && negotiator.mediaType(supportedContentTypes) == concatenatedBsonContentType; logger.defaultMeta = { ...logger.defaultMeta, @@ -80,10 +80,11 @@ export const syncStreamed = routeDefinition({ }); const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines); - const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), { + const plainStream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), { objectMode: false, highWaterMark: 16 * 1024 }); + const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream); // Best effort guess on why the stream was closed. // We use the `??=` operator everywhere, so that we catch the first relevant @@ -118,7 +119,8 @@ export const syncStreamed = routeDefinition({ return new router.RouterResponse({ status: 200, headers: { - 'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType + 'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType, + ...encodingHeaders }, data: stream, afterSend: async (details) => { From e67ae6178e0c1db8be1edc634cfd0d86639cdcf5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 19 Aug 2025 16:40:34 +0200 Subject: [PATCH 2/9] Actually enable compression. --- packages/service-core/src/routes/compression.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-core/src/routes/compression.ts b/packages/service-core/src/routes/compression.ts index ac62757b4..6c60a22bb 100644 --- a/packages/service-core/src/routes/compression.ts +++ b/packages/service-core/src/routes/compression.ts @@ -16,7 +16,7 @@ export function maybeCompressResponseStream( negotiator: Negotiator, stream: Readable ): { stream: Readable; encodingHeaders: Record } { - const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'identity' }); + const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' }); if (encoding == 'zstd') { return { stream: stream.pipe( From 6d79a1fad61be524fd1f0e02f4431499725353fe Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 21 Aug 2025 15:55:41 +0200 Subject: [PATCH 3/9] Fix error handling in compressed streams. --- .../service-core/src/routes/compression.ts | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/packages/service-core/src/routes/compression.ts b/packages/service-core/src/routes/compression.ts index 6c60a22bb..1f35d34c5 100644 --- a/packages/service-core/src/routes/compression.ts +++ b/packages/service-core/src/routes/compression.ts @@ -1,4 +1,4 @@ -import { Readable } from 'node:stream'; +import { Readable, Transform } from 'node:stream'; import type Negotiator from 'negotiator'; import * as zlib from 'node:zlib'; @@ -19,7 +19,8 @@ export function maybeCompressResponseStream( const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' }); if (encoding == 'zstd') { return { - stream: stream.pipe( + stream: transform( + stream, // Available since Node v23.8.0, v22.15.0 // This does the actual compression in a background thread pool. zlib.createZstdCompress({ @@ -36,7 +37,8 @@ export function maybeCompressResponseStream( }; } else if (encoding == 'gzip') { return { - stream: stream.pipe( + stream: transform( + stream, zlib.createGzip({ // We need to flush the frame after every new input chunk, to avoid delaying data // in the output stream. @@ -52,3 +54,16 @@ export function maybeCompressResponseStream( }; } } + +function transform(source: Readable, transform: Transform) { + // pipe does not forward error events automatically, resulting in unhandled error + // events. Manually forward them. + source.on('error', (err) => transform.destroy(err)); + return source.pipe(transform); + // This would be roughly equivalent: + // const out = new PassThrough(); + // pipeline(source, transform, out, (err) => { + // if (err) out.destroy(err); // propagate to consumer + // }); + // return out; +} From da3da38f5c4e9e0c41e3d74e96e25f3c8681426e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 21 Aug 2025 15:55:54 +0200 Subject: [PATCH 4/9] [WIP] stream tests. --- .../test/src/routes/stream.test.ts | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 packages/service-core/test/src/routes/stream.test.ts diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts new file mode 100644 index 000000000..942a1fdb6 --- /dev/null +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -0,0 +1,97 @@ +import { BasicRouterRequest, Context, MetricsEngine, storage, StorageEngine, SyncContext } from '@/index.js'; +import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js'; +import { SqlSyncRules } from '@powersync/service-sync-rules'; + +describe('Stream Route', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe('compressed stream', () => { + it('handles missing sync rules', async () => { + const storageEngine: StorageEngine = { + activeBucketStorage: { + async getActiveStorage() { + return null; + } + } as any + } as any; + const context: Context = { + logger: logger, + service_context: { + syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }), + routerEngine: {} as any, + storageEngine, + metricsEngine: new MetricsEngine({ disable_telemetry_sharing: true, factory: null as any }), + // Not used + configuration: null as any, + lifeCycleEngine: null as any, + migrations: null as any, + replicationEngine: null as any, + serviceMode: null as any + } + }; + + const request: BasicRouterRequest = { + headers: {}, + hostname: '', + protocol: 'http' + }; + + const error = (await (syncStreamed.handler({ context, params: {}, request }) as Promise).catch( + (e) => e + )) as ServiceError; + + expect(error.errorData.status).toEqual(500); + expect(error.errorData.code).toEqual('PSYNC_S2302'); + }); + + it('handles a stream error', async () => { + const storageEngine: StorageEngine = { + activeBucketStorage: { + async getActiveStorage() { + return { + getParsedSyncRules() { + return new SqlSyncRules('bucket_definitions: {}'); + } + }; + } + } + } as any; + const context: Context = { + logger: logger, + service_context: { + syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }), + routerEngine: { + getAPI() { + return { + getParseSyncRulesOptions() { + return { defaultSchema: 'public' }; + } + } as any; + } + } as any, + storageEngine, + metricsEngine: new MetricsEngine({ disable_telemetry_sharing: true, factory: null as any }), + // Not used + configuration: null as any, + lifeCycleEngine: null as any, + migrations: null as any, + replicationEngine: null as any, + serviceMode: null as any + } + }; + + const request: BasicRouterRequest = { + headers: {}, + hostname: '', + protocol: 'http' + }; + + const response = await (syncStreamed.handler({ context, params: {}, request }) as Promise); + expect(response.status).toEqual(200); + }); + }); +}); From bf5885bc768d7471caa3fa3b58b8839ca0a6124b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 25 Aug 2025 12:45:07 +0200 Subject: [PATCH 5/9] Log encoding used. --- packages/service-core/src/routes/compression.ts | 6 +++--- packages/service-core/src/routes/endpoints/sync-stream.ts | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/service-core/src/routes/compression.ts b/packages/service-core/src/routes/compression.ts index 1f35d34c5..7c5ba815c 100644 --- a/packages/service-core/src/routes/compression.ts +++ b/packages/service-core/src/routes/compression.ts @@ -15,7 +15,7 @@ import * as zlib from 'node:zlib'; export function maybeCompressResponseStream( negotiator: Negotiator, stream: Readable -): { stream: Readable; encodingHeaders: Record } { +): { stream: Readable; encodingHeaders: { 'content-encoding'?: string } } { const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' }); if (encoding == 'zstd') { return { @@ -33,7 +33,7 @@ export function maybeCompressResponseStream( } }) ), - encodingHeaders: { 'Content-Encoding': 'zstd' } + encodingHeaders: { 'content-encoding': 'zstd' } }; } else if (encoding == 'gzip') { return { @@ -45,7 +45,7 @@ export function maybeCompressResponseStream( flush: zlib.constants.Z_SYNC_FLUSH }) ), - encodingHeaders: { 'Content-Encoding': 'gzip' } + encodingHeaders: { 'content-encoding': 'gzip' } }; } else { return { diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index a217c39e0..5d1a4739f 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -132,7 +132,8 @@ export const syncStreamed = routeDefinition({ logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), stream_ms: Date.now() - streamStart, - close_reason: closeReason ?? 'unknown' + close_reason: closeReason ?? 'unknown', + encoding: encodingHeaders['content-encoding'] }); } }); From 256b95a375b8a78b12926d518fb29e6f77abd045 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 25 Aug 2025 16:58:34 +0200 Subject: [PATCH 6/9] Semi-functional test. --- .../service-core/test/src/routes/mocks.ts | 59 +++++++++++ .../test/src/routes/stream.test.ts | 99 ++++++++----------- 2 files changed, 102 insertions(+), 56 deletions(-) create mode 100644 packages/service-core/test/src/routes/mocks.ts diff --git a/packages/service-core/test/src/routes/mocks.ts b/packages/service-core/test/src/routes/mocks.ts new file mode 100644 index 000000000..f6205bf29 --- /dev/null +++ b/packages/service-core/test/src/routes/mocks.ts @@ -0,0 +1,59 @@ +import { + BucketStorageFactory, + createCoreAPIMetrics, + MetricsEngine, + OpenTelemetryMetricsFactory, + RouteAPI, + RouterEngine, + ServiceContext, + StorageEngine, + SyncContext, + SyncRulesBucketStorage +} from '@/index.js'; +import { MeterProvider } from '@opentelemetry/sdk-metrics'; + +export function mockServiceContext(storage: Partial | null) { + // This is very incomplete - just enough to get the current tests passing. + + const storageEngine: StorageEngine = { + activeBucketStorage: { + async getActiveStorage() { + return storage; + } + } as Partial + } as any; + + const meterProvider = new MeterProvider({ + readers: [] + }); + const meter = meterProvider.getMeter('powersync-tests'); + const metricsEngine = new MetricsEngine({ + disable_telemetry_sharing: true, + factory: new OpenTelemetryMetricsFactory(meter) + }); + createCoreAPIMetrics(metricsEngine); + const service_context: Partial = { + syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }), + routerEngine: { + getAPI() { + return { + getParseSyncRulesOptions() { + return { defaultSchema: 'public' }; + } + } as Partial; + }, + addStopHandler() { + return () => {}; + } + } as Partial as any, + storageEngine, + metricsEngine: metricsEngine, + // Not used + configuration: null as any, + lifeCycleEngine: null as any, + migrations: null as any, + replicationEngine: null as any, + serviceMode: null as any + }; + return service_context as ServiceContext; +} diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index 942a1fdb6..52a3e4e9c 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -1,37 +1,18 @@ -import { BasicRouterRequest, Context, MetricsEngine, storage, StorageEngine, SyncContext } from '@/index.js'; +import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js'; import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework'; +import { SqlSyncRules } from '@powersync/service-sync-rules'; +import { Readable, Writable } from 'stream'; +import { pipeline } from 'stream/promises'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js'; -import { SqlSyncRules } from '@powersync/service-sync-rules'; +import { mockServiceContext } from './mocks.js'; describe('Stream Route', () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - describe('compressed stream', () => { it('handles missing sync rules', async () => { - const storageEngine: StorageEngine = { - activeBucketStorage: { - async getActiveStorage() { - return null; - } - } as any - } as any; const context: Context = { logger: logger, - service_context: { - syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }), - routerEngine: {} as any, - storageEngine, - metricsEngine: new MetricsEngine({ disable_telemetry_sharing: true, factory: null as any }), - // Not used - configuration: null as any, - lifeCycleEngine: null as any, - migrations: null as any, - replicationEngine: null as any, - serviceMode: null as any - } + service_context: mockServiceContext(null) }; const request: BasicRouterRequest = { @@ -48,50 +29,56 @@ describe('Stream Route', () => { expect(error.errorData.code).toEqual('PSYNC_S2302'); }); - it('handles a stream error', async () => { - const storageEngine: StorageEngine = { - activeBucketStorage: { - async getActiveStorage() { - return { - getParsedSyncRules() { - return new SqlSyncRules('bucket_definitions: {}'); - } - }; - } + it('handles a stream error with compression', async () => { + // This primarily tests that an underlying storage error doesn't result in an uncaught error + // when compressing the stream. + + const storage = { + getParsedSyncRules() { + return new SqlSyncRules('bucket_definitions: {}'); + }, + watchCheckpointChanges: async function* (options) { + throw new Error('Simulated storage error'); } - } as any; + } as Partial; + const serviceContext = mockServiceContext(storage); + const context: Context = { logger: logger, - service_context: { - syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }), - routerEngine: { - getAPI() { - return { - getParseSyncRulesOptions() { - return { defaultSchema: 'public' }; - } - } as any; - } - } as any, - storageEngine, - metricsEngine: new MetricsEngine({ disable_telemetry_sharing: true, factory: null as any }), - // Not used - configuration: null as any, - lifeCycleEngine: null as any, - migrations: null as any, - replicationEngine: null as any, - serviceMode: null as any + service_context: serviceContext, + token_payload: { + exp: new Date().getTime() / 1000 + 10000, + iat: new Date().getTime() / 1000 - 10000, + sub: 'test-user' } }; + // It may be worth eventually doing this via Fastify to test the full stack + const request: BasicRouterRequest = { - headers: {}, + headers: { + 'accept-encoding': 'gzip' + }, hostname: '', protocol: 'http' }; const response = await (syncStreamed.handler({ context, params: {}, request }) as Promise); expect(response.status).toEqual(200); + const stream = response.data as Readable; + const r = await drainWithTimeout(stream).catch((error) => error); + expect(r.message).toContain('Simulated storage error'); }); }); }); + +export async function drainWithTimeout(readable: Readable, ms = 2_000) { + const devNull = new Writable({ + write(_chunk, _enc, cb) { + cb(); + } // discard everything + }); + + // Throws AbortError if it takes longer than ms, and destroys the stream + await pipeline(readable, devNull, { signal: AbortSignal.timeout(ms) }); +} From 1df8769bd48e50789a4c22300b8561f7e0ebbbe8 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 25 Aug 2025 17:23:14 +0200 Subject: [PATCH 7/9] Add metric for actual bytes sent, powersync_data_sent_bytes_total. --- packages/service-core/src/api/api-metrics.ts | 6 +++ .../service-core/src/routes/compression.ts | 37 ++++++++++++------- .../src/routes/endpoints/socket-route.ts | 2 +- .../src/routes/endpoints/sync-stream.ts | 5 +-- .../service-core/src/sync/RequestTracker.ts | 29 ++++++++++++++- packages/service-core/src/sync/util.ts | 2 +- packages/types/src/metrics.ts | 2 + 7 files changed, 62 insertions(+), 21 deletions(-) diff --git a/packages/service-core/src/api/api-metrics.ts b/packages/service-core/src/api/api-metrics.ts index 5f57cfb14..ee051b409 100644 --- a/packages/service-core/src/api/api-metrics.ts +++ b/packages/service-core/src/api/api-metrics.ts @@ -12,6 +12,12 @@ export function createCoreAPIMetrics(engine: MetricsEngine): void { unit: 'bytes' }); + engine.createCounter({ + name: APIMetric.DATA_SENT_BYTES, + description: 'Size of data sent to clients, after compression if applicable', + unit: 'bytes' + }); + engine.createCounter({ name: APIMetric.OPERATIONS_SYNCED, description: 'Number of operations synced' diff --git a/packages/service-core/src/routes/compression.ts b/packages/service-core/src/routes/compression.ts index 7c5ba815c..d8b473d03 100644 --- a/packages/service-core/src/routes/compression.ts +++ b/packages/service-core/src/routes/compression.ts @@ -1,6 +1,7 @@ -import { Readable, Transform } from 'node:stream'; +import { PassThrough, pipeline, Readable, Transform } from 'node:stream'; import type Negotiator from 'negotiator'; import * as zlib from 'node:zlib'; +import { RequestTracker } from '../sync/RequestTracker.js'; /** * Compress a streamed response. @@ -14,10 +15,12 @@ import * as zlib from 'node:zlib'; */ export function maybeCompressResponseStream( negotiator: Negotiator, - stream: Readable + stream: Readable, + tracker: RequestTracker ): { stream: Readable; encodingHeaders: { 'content-encoding'?: string } } { const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' }); if (encoding == 'zstd') { + tracker.setCompressed(encoding); return { stream: transform( stream, @@ -31,11 +34,13 @@ export function maybeCompressResponseStream( // Default compression level is 3. We reduce this slightly to limit CPU overhead [zlib.constants.ZSTD_c_compressionLevel]: 2 } - }) + }), + tracker ), encodingHeaders: { 'content-encoding': 'zstd' } }; } else if (encoding == 'gzip') { + tracker.setCompressed(encoding); return { stream: transform( stream, @@ -43,7 +48,8 @@ export function maybeCompressResponseStream( // We need to flush the frame after every new input chunk, to avoid delaying data // in the output stream. flush: zlib.constants.Z_SYNC_FLUSH - }) + }), + tracker ), encodingHeaders: { 'content-encoding': 'gzip' } }; @@ -55,15 +61,18 @@ export function maybeCompressResponseStream( } } -function transform(source: Readable, transform: Transform) { +function transform(source: Readable, transform: Transform, tracker: RequestTracker) { // pipe does not forward error events automatically, resulting in unhandled error - // events. Manually forward them. - source.on('error', (err) => transform.destroy(err)); - return source.pipe(transform); - // This would be roughly equivalent: - // const out = new PassThrough(); - // pipeline(source, transform, out, (err) => { - // if (err) out.destroy(err); // propagate to consumer - // }); - // return out; + // events. This forwards it. + const out = new PassThrough(); + const trackingTransform = new Transform({ + transform(chunk, encoding, callback) { + tracker.addCompressedDataSent(chunk.length); + callback(null, chunk); + } + }); + pipeline(source, transform, trackingTransform, out, (err) => { + if (err) out.destroy(err); + }); + return out; } diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index b3badfb0e..53c5dc0ec 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -113,7 +113,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => const serialized = sync.syncLineToBson(data); responder.onNext({ data: serialized }, false); requestedN--; - tracker.addDataSynced(serialized.length); + tracker.addPlaintextDataSynced(serialized.length); } if (requestedN <= 0 && !signal.aborted) { diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 5d1a4739f..3e2adbd50 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -84,7 +84,7 @@ export const syncStreamed = routeDefinition({ objectMode: false, highWaterMark: 16 * 1024 }); - const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream); + const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream, tracker); // Best effort guess on why the stream was closed. // We use the `??=` operator everywhere, so that we catch the first relevant @@ -132,8 +132,7 @@ export const syncStreamed = routeDefinition({ logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), stream_ms: Date.now() - streamStart, - close_reason: closeReason ?? 'unknown', - encoding: encodingHeaders['content-encoding'] + close_reason: closeReason ?? 'unknown' }); } }); diff --git a/packages/service-core/src/sync/RequestTracker.ts b/packages/service-core/src/sync/RequestTracker.ts index 6bda522bb..9e73ea3b5 100644 --- a/packages/service-core/src/sync/RequestTracker.ts +++ b/packages/service-core/src/sync/RequestTracker.ts @@ -2,6 +2,7 @@ import { MetricsEngine } from '../metrics/MetricsEngine.js'; import { APIMetric } from '@powersync/service-types'; import { SyncBucketData } from '../util/protocol-types.js'; +import { ServiceAssertionError } from '@powersync/lib-services-framework'; /** * Record sync stats per request stream. @@ -9,9 +10,12 @@ import { SyncBucketData } from '../util/protocol-types.js'; export class RequestTracker { operationsSynced = 0; dataSyncedBytes = 0; + dataSentBytes = 0; operationCounts: OperationCounts = { put: 0, remove: 0, move: 0, clear: 0 }; largeBuckets: Record = {}; + private encoding: string | undefined = undefined; + constructor(private metrics: MetricsEngine) { this.metrics = metrics; } @@ -29,18 +33,39 @@ export class RequestTracker { this.metrics.getCounter(APIMetric.OPERATIONS_SYNCED).add(operations.total); } - addDataSynced(bytes: number) { + setCompressed(encoding: string) { + this.encoding = encoding; + } + + addPlaintextDataSynced(bytes: number) { this.dataSyncedBytes += bytes; this.metrics.getCounter(APIMetric.DATA_SYNCED_BYTES).add(bytes); + + if (this.encoding == null) { + // This avoids having to create a separate stream just to track this + this.dataSentBytes += bytes; + + this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes); + } + } + + addCompressedDataSent(bytes: number) { + if (this.encoding == null) { + throw new ServiceAssertionError('No compression encoding set'); + } + this.dataSentBytes += bytes; + this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes); } getLogMeta() { return { operations_synced: this.operationsSynced, data_synced_bytes: this.dataSyncedBytes, + data_sent_bytes: this.dataSentBytes, operation_counts: this.operationCounts, - large_buckets: this.largeBuckets + large_buckets: this.largeBuckets, + encoding: this.encoding }; } } diff --git a/packages/service-core/src/sync/util.ts b/packages/service-core/src/sync/util.ts index 6cc0671b0..a458af993 100644 --- a/packages/service-core/src/sync/util.ts +++ b/packages/service-core/src/sync/util.ts @@ -125,7 +125,7 @@ export async function* transformToBytesTracked( encoded = data; } - tracker.addDataSynced(encoded.length); + tracker.addPlaintextDataSynced(encoded.length); yield encoded; } } diff --git a/packages/types/src/metrics.ts b/packages/types/src/metrics.ts index 3f67ef53a..aa8473119 100644 --- a/packages/types/src/metrics.ts +++ b/packages/types/src/metrics.ts @@ -1,6 +1,8 @@ export enum APIMetric { // Uncompressed size of synced data from PowerSync to Clients DATA_SYNCED_BYTES = 'powersync_data_synced_bytes_total', + // Potentially-compressed size of data sent from PowerSync to Clients + DATA_SENT_BYTES = 'powersync_data_sent_bytes_total', // Number of operations synced OPERATIONS_SYNCED = 'powersync_operations_synced_total', // Number of concurrent sync connections From e4605a16e95b4cabb5649fe522f96411c145d9ed Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 26 Aug 2025 08:41:38 +0200 Subject: [PATCH 8/9] Changeset. --- .changeset/spicy-vans-matter.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/spicy-vans-matter.md diff --git a/.changeset/spicy-vans-matter.md b/.changeset/spicy-vans-matter.md new file mode 100644 index 000000000..edbe9c008 --- /dev/null +++ b/.changeset/spicy-vans-matter.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-core': minor +'@powersync/service-types': minor +'@powersync/service-image': minor +--- + +Support gzip and zstd compression in http streams. From a133076f159f1149a274c3fc7a3e7a4906698915 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 27 Aug 2025 10:18:54 +0200 Subject: [PATCH 9/9] Minor refactoring. --- .../service-core/src/routes/compression.ts | 71 +++++++++---------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/packages/service-core/src/routes/compression.ts b/packages/service-core/src/routes/compression.ts index d8b473d03..0fe02f9d7 100644 --- a/packages/service-core/src/routes/compression.ts +++ b/packages/service-core/src/routes/compression.ts @@ -1,5 +1,5 @@ -import { PassThrough, pipeline, Readable, Transform } from 'node:stream'; import type Negotiator from 'negotiator'; +import { PassThrough, pipeline, Readable, Transform } from 'node:stream'; import * as zlib from 'node:zlib'; import { RequestTracker } from '../sync/RequestTracker.js'; @@ -19,54 +19,51 @@ export function maybeCompressResponseStream( tracker: RequestTracker ): { stream: Readable; encodingHeaders: { 'content-encoding'?: string } } { const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' }); - if (encoding == 'zstd') { - tracker.setCompressed(encoding); - return { - stream: transform( - stream, - // Available since Node v23.8.0, v22.15.0 - // This does the actual compression in a background thread pool. - zlib.createZstdCompress({ - // We need to flush the frame after every new input chunk, to avoid delaying data - // in the output stream. - flush: zlib.constants.ZSTD_e_flush, - params: { - // Default compression level is 3. We reduce this slightly to limit CPU overhead - [zlib.constants.ZSTD_c_compressionLevel]: 2 - } - }), - tracker - ), - encodingHeaders: { 'content-encoding': 'zstd' } - }; - } else if (encoding == 'gzip') { - tracker.setCompressed(encoding); + const transform = createCompressionTransform(encoding); + if (transform == null) { + // No matching compression supported - leave stream as-is return { - stream: transform( - stream, - zlib.createGzip({ - // We need to flush the frame after every new input chunk, to avoid delaying data - // in the output stream. - flush: zlib.constants.Z_SYNC_FLUSH - }), - tracker - ), - encodingHeaders: { 'content-encoding': 'gzip' } + stream, + encodingHeaders: {} }; } else { + tracker.setCompressed(encoding); return { - stream: stream, - encodingHeaders: {} + stream: transformStream(stream, transform, tracker), + encodingHeaders: { 'content-encoding': encoding } }; } } -function transform(source: Readable, transform: Transform, tracker: RequestTracker) { +function createCompressionTransform(encoding: string | undefined): Transform | null { + if (encoding == 'zstd') { + // Available since Node v23.8.0, v22.15.0 + // This does the actual compression in a background thread pool. + return zlib.createZstdCompress({ + // We need to flush the frame after every new input chunk, to avoid delaying data + // in the output stream. + flush: zlib.constants.ZSTD_e_flush, + params: { + // Default compression level is 3. We reduce this slightly to limit CPU overhead + [zlib.constants.ZSTD_c_compressionLevel]: 2 + } + }); + } else if (encoding == 'gzip') { + return zlib.createGzip({ + // We need to flush the frame after every new input chunk, to avoid delaying data + // in the output stream. + flush: zlib.constants.Z_SYNC_FLUSH + }); + } + return null; +} + +function transformStream(source: Readable, transform: Transform, tracker: RequestTracker) { // pipe does not forward error events automatically, resulting in unhandled error // events. This forwards it. const out = new PassThrough(); const trackingTransform = new Transform({ - transform(chunk, encoding, callback) { + transform(chunk, _encoding, callback) { tracker.addCompressedDataSent(chunk.length); callback(null, chunk); }