Skip to content

Commit 511f6d7

Browse files
authored
Merge pull request #9 from powersync-ja/improve-logging
Improve logging & reduce sync rule update delay
2 parents 97b8b2b + 28d4873 commit 511f6d7

File tree

8 files changed

+45
-18
lines changed

8 files changed

+45
-18
lines changed

packages/rsocket-router/src/router/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {router as micro_router} from '@journeyapps-platform/micro';
1+
import { router as micro_router } from '@journeyapps-platform/micro';
22
import * as t from 'ts-codec';
33
import { OnExtensionSubscriber, OnNextSubscriber, OnTerminalSubscriber } from 'rsocket-core';
44
import { SocketRouterObserver } from './SocketRouterListener.js';

packages/service-core/src/metrics/Metrics.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,16 @@ export class Metrics {
146146
}
147147
micro.logger.info('Configuring telemetry.');
148148

149-
micro.logger.info(`
149+
micro.logger.info(
150+
`
150151
Attention:
151152
PowerSync collects completely anonymous telemetry regarding usage.
152153
This information is used to shape our roadmap to better serve our customers.
153154
You can learn more, including how to opt-out if you'd not like to participate in this anonymous program, by visiting the following URL:
154155
https://docs.powersync.com/self-hosting/telemetry
155156
Anonymous telemetry is currently: ${options.disable_telemetry_sharing ? 'disabled' : 'enabled'}
156-
`.trim());
157+
`.trim()
158+
);
157159

158160
const configuredExporters: MetricReader[] = [];
159161

@@ -167,7 +169,7 @@ Anonymous telemetry is currently: ${options.disable_telemetry_sharing ? 'disable
167169
exporter: new OTLPMetricExporter({
168170
url: options.internal_metrics_endpoint
169171
}),
170-
exportIntervalMillis: 1000 * 60 * 5 // 5 minutes
172+
exportIntervalMillis: 1000 * 60 * 5 // 5 minutes
171173
});
172174

173175
configuredExporters.push(periodicExporter);

packages/service-core/src/replication/ErrorRateLimiter.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ export class DefaultErrorRateLimiter implements ErrorRateLimiter {
1212

1313
async waitUntilAllowed(options?: { signal?: AbortSignal | undefined } | undefined): Promise<void> {
1414
const delay = Math.max(0, this.nextAllowed - Date.now());
15-
this.setDelay(5_000);
15+
// Minimum delay between connections, even without errors
16+
this.setDelay(500);
1617
await setTimeout(delay, undefined, { signal: options?.signal });
1718
}
1819

packages/service-core/src/storage/MongoBucketStorage.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,12 +350,12 @@ export class MongoBucketStorage implements BucketStorageFactory {
350350

351351
if (!instance) {
352352
const manager = locks.createMongoLockManager(this.db.locks, {
353-
name: `instance-id-insertion-lock`
353+
name: `instance-id-insertion-lock`
354354
});
355355

356356
await manager.lock(async () => {
357357
await this.db.instance.insertOne({
358-
_id: uuid(),
358+
_id: uuid()
359359
});
360360
});
361361

packages/service-core/src/storage/mongo/MongoBucketBatch.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -463,10 +463,8 @@ export class MongoBucketBatch implements BucketStorageBatch {
463463

464464
await this.withTransaction(async () => {
465465
flushTry += 1;
466-
if (flushTry == 1) {
467-
micro.logger.info(`${this.slot_name} ${description}`);
468-
} else if (flushTry % 10 == 0) {
469-
micro.logger.info(`${this.slot_name} ${description} ops - try ${flushTry}`);
466+
if (flushTry % 10 == 0) {
467+
micro.logger.info(`${this.slot_name} ${description} - try ${flushTry}`);
470468
}
471469
if (flushTry > 20 && Date.now() > lastTry) {
472470
throw new Error('Max transaction tries exceeded');

packages/service-core/src/storage/mongo/PersistedBatch.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { JSONBig } from '@powersync/service-jsonbig';
22
import { EvaluatedParameters, EvaluatedRow } from '@powersync/service-sync-rules';
33
import * as bson from 'bson';
44
import * as mongo from 'mongodb';
5+
import * as micro from '@journeyapps-platform/micro';
56

67
import * as util from '@/util/util-index.js';
78
import { SourceTable } from '../SourceTable.js';
@@ -42,6 +43,11 @@ export class PersistedBatch {
4243
bucketParameters: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = [];
4344
currentData: mongo.AnyBulkWriteOperation<CurrentDataDocument>[] = [];
4445

46+
/**
47+
* For debug logging only.
48+
*/
49+
debugLastOpId: bigint | null = null;
50+
4551
/**
4652
* Very rough estimate of transaction size.
4753
*/
@@ -75,13 +81,16 @@ export class PersistedBatch {
7581
const checksum = util.hashData(k.table, k.id, recordData);
7682
this.currentSize += recordData.length + 200;
7783

84+
const op_id = options.op_seq.next();
85+
this.debugLastOpId = op_id;
86+
7887
this.bucketData.push({
7988
insertOne: {
8089
document: {
8190
_id: {
8291
g: this.group_id,
8392
b: k.bucket,
84-
o: options.op_seq.next()
93+
o: op_id
8594
},
8695
op: 'PUT',
8796
source_table: options.table.id,
@@ -97,13 +106,17 @@ export class PersistedBatch {
97106

98107
for (let bd of remaining_buckets.values()) {
99108
// REMOVE
109+
110+
const op_id = options.op_seq.next();
111+
this.debugLastOpId = op_id;
112+
100113
this.bucketData.push({
101114
insertOne: {
102115
document: {
103116
_id: {
104117
g: this.group_id,
105118
b: bd.bucket,
106-
o: options.op_seq.next()
119+
o: op_id
107120
},
108121
op: 'REMOVE',
109122
source_table: options.table.id,
@@ -145,7 +158,9 @@ export class PersistedBatch {
145158
const binLookup = serializeLookup(result.lookup);
146159
const hex = binLookup.toString('base64');
147160
remaining_lookups.delete(hex);
161+
148162
const op_id = data.op_seq.next();
163+
this.debugLastOpId = op_id;
149164
this.bucketParameters.push({
150165
insertOne: {
151166
document: {
@@ -167,6 +182,7 @@ export class PersistedBatch {
167182
// 2. "REMOVE" entries for any lookup not touched.
168183
for (let lookup of remaining_lookups.values()) {
169184
const op_id = data.op_seq.next();
185+
this.debugLastOpId = op_id;
170186
this.bucketParameters.push({
171187
insertOne: {
172188
document: {
@@ -237,9 +253,16 @@ export class PersistedBatch {
237253
});
238254
}
239255

256+
micro.logger.info(
257+
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
258+
this.currentData.length
259+
} updates, ${Math.round(this.currentSize / 1024)}kb. Last op_id: ${this.debugLastOpId}`
260+
);
261+
240262
this.bucketData = [];
241263
this.bucketParameters = [];
242264
this.currentData = [];
243265
this.currentSize = 0;
266+
this.debugLastOpId = null;
244267
}
245268
}

packages/service-core/src/util/config/compound-config-collector.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ export class CompoundConfigCollector {
121121
migrations: baseConfig.migrations,
122122
telemetry: {
123123
disable_telemetry_sharing: baseConfig.telemetry?.disable_telemetry_sharing ?? false,
124-
internal_service_endpoint: baseConfig.telemetry?.internal_service_endpoint ?? 'https://pulse.journeyapps.com/v1/metrics'
124+
internal_service_endpoint:
125+
baseConfig.telemetry?.internal_service_endpoint ?? 'https://pulse.journeyapps.com/v1/metrics'
125126
},
126127
slot_name_prefix: connections[0]?.slot_name_prefix ?? 'powersync_'
127128
};

packages/types/src/config/PowerSyncConfig.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,12 @@ export const powerSyncConfig = t.object({
142142
})
143143
.optional(),
144144

145-
telemetry: t.object({
146-
disable_telemetry_sharing: t.boolean,
147-
internal_service_endpoint: t.string.optional()
148-
}).optional()
145+
telemetry: t
146+
.object({
147+
disable_telemetry_sharing: t.boolean,
148+
internal_service_endpoint: t.string.optional()
149+
})
150+
.optional()
149151
});
150152

151153
export type PowerSyncConfig = t.Decoded<typeof powerSyncConfig>;

0 commit comments

Comments
 (0)