Skip to content

Commit ee3c3f2

Browse files
committed
BucketChecksumState test: Use real sync streams
1 parent b49acf0 commit ee3c3f2

File tree

1 file changed

+33
-92
lines changed

1 file changed

+33
-92
lines changed

packages/service-core/test/src/sync/BucketChecksumState.test.ts

Lines changed: 33 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -607,11 +607,20 @@ bucket_definitions:
607607
describe('streams', () => {
608608
let source: { -readonly [P in keyof BucketSource]: BucketSource[P] };
609609
let storage: MockBucketChecksumStateStorage;
610-
let staticBucketIds = ['stream|0[]'];
611610

612-
function checksumState(options?: Partial<BucketChecksumStateOptions>) {
613-
const rules = new SqlSyncRules('');
614-
rules.bucketSources.push(source);
611+
function checksumState(source: string | boolean, options?: Partial<BucketChecksumStateOptions>) {
612+
if (typeof source == 'boolean') {
613+
source = `
614+
streams:
615+
stream:
616+
auto_subscribe: ${source}
617+
query: SELECT * FROM assets WHERE id IN ifnull(subscription.parameter('ids'), '["default"]');
618+
`;
619+
}
620+
621+
const rules = SqlSyncRules.fromYaml(source, {
622+
defaultSchema: 'public'
623+
});
615624

616625
return new BucketChecksumState({
617626
syncContext,
@@ -623,77 +632,15 @@ bucket_definitions:
623632
});
624633
}
625634

626-
function createQuerier(ids: string[], subscription: number | null): BucketParameterQuerier {
627-
return {
628-
staticBuckets: ids.map((bucket) => ({
629-
definition: 'stream',
630-
inclusion_reasons: subscription == null ? ['default'] : [{ subscription }],
631-
bucket,
632-
priority: 3
633-
})),
634-
hasDynamicBuckets: false,
635-
parameterQueryLookups: [],
636-
queryDynamicBucketDescriptions: function (): never {
637-
throw new Error('no dynamic buckets.');
638-
}
639-
};
640-
}
641-
642635
beforeEach(() => {
643-
// Currently using mocked streams before streams are actually implemented as parsable rules.
644-
source = {
645-
name: 'stream',
646-
type: BucketSourceType.SYNC_STREAM,
647-
subscribedToByDefault: false,
648-
pushBucketParameterQueriers(result, options) {
649-
// Create a fake querier that resolves the global stream["default"] bucket by default and allows extracting
650-
// additional buckets from parameters.
651-
const subscriptions = options.streams['stream'] ?? [];
652-
if (!this.subscribedToByDefault && !subscriptions.length) {
653-
return;
654-
}
655-
656-
let hasExplicitDefaultSubscription = false;
657-
for (const subscription of subscriptions) {
658-
try {
659-
let subscriptionParameters = [];
660-
661-
if (subscription.parameters != null) {
662-
subscriptionParameters = JSON.parse(subscription.parameters['ids'] as string).map(
663-
(e: string) => `stream["${e}"]`
664-
);
665-
} else {
666-
hasExplicitDefaultSubscription = true;
667-
}
668-
669-
result.queriers.push(createQuerier([...subscriptionParameters], subscription.opaque_id));
670-
} catch (e) {
671-
result.errors.push({
672-
descriptor: 'stream',
673-
subscription,
674-
message: `Error evaluating bucket ids: ${e.message}`
675-
});
676-
}
677-
}
678-
679-
// If the stream is subscribed to by default and there is no explicit subscription that would match the default
680-
// subscription, also include the default querier.
681-
if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) {
682-
result.queriers.push(createQuerier(['stream["default"]'], null));
683-
}
684-
}
685-
} satisfies Partial<BucketSource> as any;
686-
687636
storage = new MockBucketChecksumStateStorage();
688-
storage.updateTestChecksum({ bucket: 'stream["default"]', checksum: 1, count: 1 });
689-
storage.updateTestChecksum({ bucket: 'stream["a"]', checksum: 1, count: 1 });
690-
storage.updateTestChecksum({ bucket: 'stream["b"]', checksum: 1, count: 1 });
637+
storage.updateTestChecksum({ bucket: 'stream|0["default"]', checksum: 1, count: 1 });
638+
storage.updateTestChecksum({ bucket: 'stream|0["a"]', checksum: 1, count: 1 });
639+
storage.updateTestChecksum({ bucket: 'stream|0["b"]', checksum: 1, count: 1 });
691640
});
692641

693642
test('includes defaults', async () => {
694-
source.subscribedToByDefault = true;
695-
const state = checksumState();
696-
643+
const state = checksumState(true);
697644
const line = await state.buildNextCheckpointLine({
698645
base: storage.makeCheckpoint(1n),
699646
writeCheckpoint: null,
@@ -703,7 +650,7 @@ bucket_definitions:
703650
expect(line?.checkpointLine).toEqual({
704651
checkpoint: {
705652
buckets: [
706-
{ bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
653+
{ bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
707654
],
708655
last_op_id: '1',
709656
write_checkpoint: undefined,
@@ -713,8 +660,7 @@ bucket_definitions:
713660
});
714661

715662
test('can exclude defaults', async () => {
716-
source.subscribedToByDefault = true;
717-
const state = checksumState({ syncRequest: { streams: { include_defaults: false, subscriptions: [] } } });
663+
const state = checksumState(true, { syncRequest: { streams: { include_defaults: false, subscriptions: [] } } });
718664

719665
const line = await state.buildNextCheckpointLine({
720666
base: storage.makeCheckpoint(1n),
@@ -733,9 +679,7 @@ bucket_definitions:
733679
});
734680

735681
test('custom subscriptions', async () => {
736-
source.subscribedToByDefault = true;
737-
738-
const state = checksumState({
682+
const state = checksumState(true, {
739683
syncRequest: {
740684
streams: {
741685
subscriptions: [
@@ -755,9 +699,9 @@ bucket_definitions:
755699
expect(line?.checkpointLine).toEqual({
756700
checkpoint: {
757701
buckets: [
758-
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
759-
{ bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] },
760-
{ bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
702+
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
703+
{ bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] },
704+
{ bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
761705
],
762706
last_op_id: '1',
763707
write_checkpoint: undefined,
@@ -767,7 +711,7 @@ bucket_definitions:
767711
});
768712

769713
test('overlap between custom subscriptions', async () => {
770-
const state = checksumState({
714+
const state = checksumState(false, {
771715
syncRequest: {
772716
streams: {
773717
subscriptions: [
@@ -787,8 +731,8 @@ bucket_definitions:
787731
expect(line?.checkpointLine).toEqual({
788732
checkpoint: {
789733
buckets: [
790-
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
791-
{ bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] }
734+
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
735+
{ bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] }
792736
],
793737
last_op_id: '1',
794738
write_checkpoint: undefined,
@@ -798,8 +742,7 @@ bucket_definitions:
798742
});
799743

800744
test('overlap between default and custom subscription', async () => {
801-
source.subscribedToByDefault = true;
802-
const state = checksumState({
745+
const state = checksumState(true, {
803746
syncRequest: {
804747
streams: {
805748
subscriptions: [{ stream: 'stream', parameters: { ids: '["a", "default"]' }, override_priority: 1 }]
@@ -816,9 +759,9 @@ bucket_definitions:
816759
expect(line?.checkpointLine).toEqual({
817760
checkpoint: {
818761
buckets: [
819-
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
762+
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
820763
{
821-
bucket: 'stream["default"]',
764+
bucket: 'stream|0["default"]',
822765
checksum: 1,
823766
count: 1,
824767
priority: 1,
@@ -833,9 +776,7 @@ bucket_definitions:
833776
});
834777

835778
test('reports errors', async () => {
836-
source.subscribedToByDefault = true;
837-
838-
const state = checksumState({
779+
const state = checksumState(true, {
839780
syncRequest: {
840781
streams: {
841782
subscriptions: [
@@ -855,10 +796,10 @@ bucket_definitions:
855796
expect(line?.checkpointLine).toEqual({
856797
checkpoint: {
857798
buckets: [
858-
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
859-
{ bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
799+
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
800+
{ bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
860801
{
861-
bucket: 'stream["default"]',
802+
bucket: 'stream|0["default"]',
862803
checksum: 1,
863804
count: 1,
864805
priority: 3,

0 commit comments

Comments
 (0)