diff --git a/.changeset/spicy-vans-matter.md b/.changeset/spicy-vans-matter.md new file mode 100644 index 000000000..edbe9c008 --- /dev/null +++ b/.changeset/spicy-vans-matter.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-core': minor +'@powersync/service-types': minor +'@powersync/service-image': minor +--- + +Support gzip and zstd compression in http streams. diff --git a/packages/service-core/src/api/api-metrics.ts b/packages/service-core/src/api/api-metrics.ts index 5f57cfb14..ee051b409 100644 --- a/packages/service-core/src/api/api-metrics.ts +++ b/packages/service-core/src/api/api-metrics.ts @@ -12,6 +12,12 @@ export function createCoreAPIMetrics(engine: MetricsEngine): void { unit: 'bytes' }); + engine.createCounter({ + name: APIMetric.DATA_SENT_BYTES, + description: 'Size of data sent to clients, after compression if applicable', + unit: 'bytes' + }); + engine.createCounter({ name: APIMetric.OPERATIONS_SYNCED, description: 'Number of operations synced' diff --git a/packages/service-core/src/routes/compression.ts b/packages/service-core/src/routes/compression.ts new file mode 100644 index 000000000..0fe02f9d7 --- /dev/null +++ b/packages/service-core/src/routes/compression.ts @@ -0,0 +1,75 @@ +import type Negotiator from 'negotiator'; +import { PassThrough, pipeline, Readable, Transform } from 'node:stream'; +import * as zlib from 'node:zlib'; +import { RequestTracker } from '../sync/RequestTracker.js'; + +/** + * Compress a streamed response. + * + * `@fastify/compress` can do something similar, but does not appear to work as well on streamed responses. + * The manual implementation is simple enough, and gives us more control over the low-level details. + * + * @param negotiator Negotiator from the request, to negotiate response encoding + * @param stream plain-text stream + * @returns + */ +export function maybeCompressResponseStream( + negotiator: Negotiator, + stream: Readable, + tracker: RequestTracker +): { stream: Readable; encodingHeaders: { 'content-encoding'?: string } } { + const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' }); + const transform = createCompressionTransform(encoding); + if (transform == null) { + // No matching compression supported - leave stream as-is + return { + stream, + encodingHeaders: {} + }; + } else { + tracker.setCompressed(encoding); + return { + stream: transformStream(stream, transform, tracker), + encodingHeaders: { 'content-encoding': encoding } + }; + } +} + +function createCompressionTransform(encoding: string | undefined): Transform | null { + if (encoding == 'zstd') { + // Available since Node v23.8.0, v22.15.0 + // This does the actual compression in a background thread pool. + return zlib.createZstdCompress({ + // We need to flush the frame after every new input chunk, to avoid delaying data + // in the output stream. + flush: zlib.constants.ZSTD_e_flush, + params: { + // Default compression level is 3. We reduce this slightly to limit CPU overhead + [zlib.constants.ZSTD_c_compressionLevel]: 2 + } + }); + } else if (encoding == 'gzip') { + return zlib.createGzip({ + // We need to flush the frame after every new input chunk, to avoid delaying data + // in the output stream. + flush: zlib.constants.Z_SYNC_FLUSH + }); + } + return null; +} + +function transformStream(source: Readable, transform: Transform, tracker: RequestTracker) { + // pipe does not forward error events automatically, resulting in unhandled error + // events. This forwards it. + const out = new PassThrough(); + const trackingTransform = new Transform({ + transform(chunk, _encoding, callback) { + tracker.addCompressedDataSent(chunk.length); + callback(null, chunk); + } + }); + pipeline(source, transform, trackingTransform, out, (err) => { + if (err) out.destroy(err); + }); + return out; +} diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 1c3f8d8b1..21b4064dc 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -113,7 +113,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => const serialized = sync.syncLineToBson(data); responder.onNext({ data: serialized }, false); requestedN--; - tracker.addDataSynced(serialized.length); + tracker.addPlaintextDataSynced(serialized.length); } if (requestedN <= 0 && !signal.aborted) { diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 15b16bd33..4b5f94638 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -1,7 +1,6 @@ -import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework'; -import { RequestParameters } from '@powersync/service-sync-rules'; -import { Readable } from 'stream'; +import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework'; import Negotiator from 'negotiator'; +import { Readable } from 'stream'; import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; @@ -10,6 +9,7 @@ import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; import { APIMetric } from '@powersync/service-types'; +import { maybeCompressResponseStream } from '../compression.js'; export enum SyncRoutes { STREAM = '/sync/stream' @@ -31,9 +31,10 @@ export const syncStreamed = routeDefinition({ const userAgent = headers['x-user-agent'] ?? headers['user-agent']; const clientId = payload.params.client_id; const streamStart = Date.now(); + const negotiator = new Negotiator(payload.request); // This falls back to JSON unless there's preference for the bson-stream in the Accept header. const useBson = payload.request.headers.accept - ? new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType + ? negotiator.mediaType(supportedContentTypes) == concatenatedBsonContentType : false; logger.defaultMeta = { @@ -81,10 +82,11 @@ export const syncStreamed = routeDefinition({ }); const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines); - const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), { + const plainStream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), { objectMode: false, highWaterMark: 16 * 1024 }); + const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream, tracker); // Best effort guess on why the stream was closed. // We use the `??=` operator everywhere, so that we catch the first relevant @@ -119,7 +121,8 @@ export const syncStreamed = routeDefinition({ return new router.RouterResponse({ status: 200, headers: { - 'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType + 'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType, + ...encodingHeaders }, data: stream, afterSend: async (details) => { diff --git a/packages/service-core/src/sync/RequestTracker.ts b/packages/service-core/src/sync/RequestTracker.ts index 6bda522bb..9e73ea3b5 100644 --- a/packages/service-core/src/sync/RequestTracker.ts +++ b/packages/service-core/src/sync/RequestTracker.ts @@ -2,6 +2,7 @@ import { MetricsEngine } from '../metrics/MetricsEngine.js'; import { APIMetric } from '@powersync/service-types'; import { SyncBucketData } from '../util/protocol-types.js'; +import { ServiceAssertionError } from '@powersync/lib-services-framework'; /** * Record sync stats per request stream. @@ -9,9 +10,12 @@ import { SyncBucketData } from '../util/protocol-types.js'; export class RequestTracker { operationsSynced = 0; dataSyncedBytes = 0; + dataSentBytes = 0; operationCounts: OperationCounts = { put: 0, remove: 0, move: 0, clear: 0 }; largeBuckets: Record = {}; + private encoding: string | undefined = undefined; + constructor(private metrics: MetricsEngine) { this.metrics = metrics; } @@ -29,18 +33,39 @@ export class RequestTracker { this.metrics.getCounter(APIMetric.OPERATIONS_SYNCED).add(operations.total); } - addDataSynced(bytes: number) { + setCompressed(encoding: string) { + this.encoding = encoding; + } + + addPlaintextDataSynced(bytes: number) { this.dataSyncedBytes += bytes; this.metrics.getCounter(APIMetric.DATA_SYNCED_BYTES).add(bytes); + + if (this.encoding == null) { + // This avoids having to create a separate stream just to track this + this.dataSentBytes += bytes; + + this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes); + } + } + + addCompressedDataSent(bytes: number) { + if (this.encoding == null) { + throw new ServiceAssertionError('No compression encoding set'); + } + this.dataSentBytes += bytes; + this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes); } getLogMeta() { return { operations_synced: this.operationsSynced, data_synced_bytes: this.dataSyncedBytes, + data_sent_bytes: this.dataSentBytes, operation_counts: this.operationCounts, - large_buckets: this.largeBuckets + large_buckets: this.largeBuckets, + encoding: this.encoding }; } } diff --git a/packages/service-core/src/sync/util.ts b/packages/service-core/src/sync/util.ts index 6cc0671b0..a458af993 100644 --- a/packages/service-core/src/sync/util.ts +++ b/packages/service-core/src/sync/util.ts @@ -125,7 +125,7 @@ export async function* transformToBytesTracked( encoded = data; } - tracker.addDataSynced(encoded.length); + tracker.addPlaintextDataSynced(encoded.length); yield encoded; } } diff --git a/packages/service-core/test/src/routes/mocks.ts b/packages/service-core/test/src/routes/mocks.ts new file mode 100644 index 000000000..f6205bf29 --- /dev/null +++ b/packages/service-core/test/src/routes/mocks.ts @@ -0,0 +1,59 @@ +import { + BucketStorageFactory, + createCoreAPIMetrics, + MetricsEngine, + OpenTelemetryMetricsFactory, + RouteAPI, + RouterEngine, + ServiceContext, + StorageEngine, + SyncContext, + SyncRulesBucketStorage +} from '@/index.js'; +import { MeterProvider } from '@opentelemetry/sdk-metrics'; + +export function mockServiceContext(storage: Partial | null) { + // This is very incomplete - just enough to get the current tests passing. + + const storageEngine: StorageEngine = { + activeBucketStorage: { + async getActiveStorage() { + return storage; + } + } as Partial + } as any; + + const meterProvider = new MeterProvider({ + readers: [] + }); + const meter = meterProvider.getMeter('powersync-tests'); + const metricsEngine = new MetricsEngine({ + disable_telemetry_sharing: true, + factory: new OpenTelemetryMetricsFactory(meter) + }); + createCoreAPIMetrics(metricsEngine); + const service_context: Partial = { + syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }), + routerEngine: { + getAPI() { + return { + getParseSyncRulesOptions() { + return { defaultSchema: 'public' }; + } + } as Partial; + }, + addStopHandler() { + return () => {}; + } + } as Partial as any, + storageEngine, + metricsEngine: metricsEngine, + // Not used + configuration: null as any, + lifeCycleEngine: null as any, + migrations: null as any, + replicationEngine: null as any, + serviceMode: null as any + }; + return service_context as ServiceContext; +} diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts new file mode 100644 index 000000000..52a3e4e9c --- /dev/null +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -0,0 +1,84 @@ +import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js'; +import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework'; +import { SqlSyncRules } from '@powersync/service-sync-rules'; +import { Readable, Writable } from 'stream'; +import { pipeline } from 'stream/promises'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js'; +import { mockServiceContext } from './mocks.js'; + +describe('Stream Route', () => { + describe('compressed stream', () => { + it('handles missing sync rules', async () => { + const context: Context = { + logger: logger, + service_context: mockServiceContext(null) + }; + + const request: BasicRouterRequest = { + headers: {}, + hostname: '', + protocol: 'http' + }; + + const error = (await (syncStreamed.handler({ context, params: {}, request }) as Promise).catch( + (e) => e + )) as ServiceError; + + expect(error.errorData.status).toEqual(500); + expect(error.errorData.code).toEqual('PSYNC_S2302'); + }); + + it('handles a stream error with compression', async () => { + // This primarily tests that an underlying storage error doesn't result in an uncaught error + // when compressing the stream. + + const storage = { + getParsedSyncRules() { + return new SqlSyncRules('bucket_definitions: {}'); + }, + watchCheckpointChanges: async function* (options) { + throw new Error('Simulated storage error'); + } + } as Partial; + const serviceContext = mockServiceContext(storage); + + const context: Context = { + logger: logger, + service_context: serviceContext, + token_payload: { + exp: new Date().getTime() / 1000 + 10000, + iat: new Date().getTime() / 1000 - 10000, + sub: 'test-user' + } + }; + + // It may be worth eventually doing this via Fastify to test the full stack + + const request: BasicRouterRequest = { + headers: { + 'accept-encoding': 'gzip' + }, + hostname: '', + protocol: 'http' + }; + + const response = await (syncStreamed.handler({ context, params: {}, request }) as Promise); + expect(response.status).toEqual(200); + const stream = response.data as Readable; + const r = await drainWithTimeout(stream).catch((error) => error); + expect(r.message).toContain('Simulated storage error'); + }); + }); +}); + +export async function drainWithTimeout(readable: Readable, ms = 2_000) { + const devNull = new Writable({ + write(_chunk, _enc, cb) { + cb(); + } // discard everything + }); + + // Throws AbortError if it takes longer than ms, and destroys the stream + await pipeline(readable, devNull, { signal: AbortSignal.timeout(ms) }); +} diff --git a/packages/types/src/metrics.ts b/packages/types/src/metrics.ts index 3f67ef53a..aa8473119 100644 --- a/packages/types/src/metrics.ts +++ b/packages/types/src/metrics.ts @@ -1,6 +1,8 @@ export enum APIMetric { // Uncompressed size of synced data from PowerSync to Clients DATA_SYNCED_BYTES = 'powersync_data_synced_bytes_total', + // Potentially-compressed size of data sent from PowerSync to Clients + DATA_SENT_BYTES = 'powersync_data_sent_bytes_total', // Number of operations synced OPERATIONS_SYNCED = 'powersync_operations_synced_total', // Number of concurrent sync connections