Skip to content

Commit 1f5a456

Browse files
committed
feat: consolidate sharding utils
1 parent c9eb170 commit 1f5a456

File tree

16 files changed

+201
-610
lines changed

16 files changed

+201
-610
lines changed

packages/core/src/lib/connection_manager/shard_reader.spec.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
PubsubTopic,
77
ShardInfo
88
} from "@waku/interfaces";
9-
import { contentTopicToShardIndex, encodeRelayShard } from "@waku/utils";
9+
import { AutoShardingRoutingInfo, encodeRelayShard } from "@waku/utils";
1010
import { expect } from "chai";
1111
import { Libp2p } from "libp2p";
1212
import sinon from "sinon";
@@ -29,16 +29,17 @@ describe("ShardReader", function () {
2929

3030
const testContentTopic = "/test/1/waku-light-push/utf8";
3131
const testClusterId = 3;
32-
const testShardIndex = contentTopicToShardIndex(
33-
testContentTopic,
34-
DEFAULT_NUM_SHARDS
35-
);
3632

3733
const testNetworkConfig: AutoSharding = {
3834
clusterId: testClusterId,
3935
numShardsInCluster: DEFAULT_NUM_SHARDS
4036
};
4137

38+
const testShardIndex = AutoShardingRoutingInfo.fromContentTopic(
39+
testContentTopic,
40+
testNetworkConfig
41+
).shardId;
42+
4243
const testShardInfo: ShardInfo = {
4344
clusterId: testClusterId,
4445
shards: [testShardIndex]

packages/core/src/lib/connection_manager/shard_reader.ts

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,7 @@ import type {
66
ShardId,
77
ShardInfo
88
} from "@waku/interfaces";
9-
import {
10-
decodeRelayShard,
11-
Logger,
12-
pubsubTopicToSingleShardInfo
13-
} from "@waku/utils";
9+
import { decodeRelayShard, Logger } from "@waku/utils";
1410
import { Libp2p } from "libp2p";
1511

1612
const log = new Logger("shard-reader");
@@ -65,7 +61,7 @@ export class ShardReader implements IShardReader {
6561
pubsubTopic: PubsubTopic
6662
): Promise<boolean> {
6763
try {
68-
const { clusterId, shard } = pubsubTopicToSingleShardInfo(pubsubTopic);
64+
const { clusterId, shard } = this.parsePubsubTopic(pubsubTopic);
6965
if (clusterId !== this.clusterId) return false;
7066
return await this.isPeerOnShard(id, shard);
7167
} catch (error) {
@@ -93,6 +89,34 @@ export class ShardReader implements IShardReader {
9389
);
9490
}
9591

92+
private parsePubsubTopic(pubsubTopic: PubsubTopic): {
93+
clusterId: ClusterId;
94+
shard: ShardId;
95+
} {
96+
const parts = pubsubTopic.split("/");
97+
98+
if (
99+
parts.length !== 6 ||
100+
parts[1] !== "waku" ||
101+
parts[2] !== "2" ||
102+
parts[3] !== "rs"
103+
) {
104+
throw new Error("Invalid pubsub topic");
105+
}
106+
107+
const clusterId = parseInt(parts[4], 10);
108+
const shard = parseInt(parts[5], 10);
109+
110+
if (isNaN(clusterId) || isNaN(shard)) {
111+
throw new Error("Invalid clusterId or shard");
112+
}
113+
114+
return {
115+
clusterId,
116+
shard
117+
};
118+
}
119+
96120
private async getRelayShards(id: PeerId): Promise<ShardInfo | undefined> {
97121
try {
98122
const peer = await this.libp2p.peerStore.get(id);

packages/reliability-tests/tests/high-throughput.spec.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
import { LightNode, Protocols } from "@waku/interfaces";
22
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
3-
import {
4-
contentTopicToPubsubTopic,
5-
createRoutingInfo,
6-
delay
7-
} from "@waku/utils";
3+
import { AutoShardingRoutingInfo, createRoutingInfo, delay } from "@waku/utils";
84
import { expect } from "chai";
95

106
import {
@@ -63,11 +59,8 @@ describe("High Throughput Messaging", function () {
6359
await delay(1000);
6460

6561
await nwaku.ensureSubscriptions([
66-
contentTopicToPubsubTopic(
67-
ContentTopic,
68-
networkConfig.clusterId,
69-
networkConfig.numShardsInCluster
70-
)
62+
AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig)
63+
.pubsubTopic
7164
]);
7265

7366
waku = await createLightNode({ networkConfig });

packages/reliability-tests/tests/longevity.spec.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
import { LightNode, Protocols } from "@waku/interfaces";
22
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
3-
import {
4-
contentTopicToPubsubTopic,
5-
createRoutingInfo,
6-
delay
7-
} from "@waku/utils";
3+
import { AutoShardingRoutingInfo, createRoutingInfo, delay } from "@waku/utils";
84
import { expect } from "chai";
95

106
import {
@@ -62,11 +58,8 @@ describe("Longevity", function () {
6258
);
6359

6460
await nwaku.ensureSubscriptions([
65-
contentTopicToPubsubTopic(
66-
ContentTopic,
67-
networkConfig.clusterId,
68-
networkConfig.numShardsInCluster
69-
)
61+
AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig)
62+
.pubsubTopic
7063
]);
7164

7265
waku = await createLightNode({ networkConfig });

packages/reliability-tests/tests/throughput-sizes.spec.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
import { LightNode, Protocols } from "@waku/interfaces";
22
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
3-
import {
4-
contentTopicToPubsubTopic,
5-
createRoutingInfo,
6-
delay
7-
} from "@waku/utils";
3+
import { AutoShardingRoutingInfo, createRoutingInfo, delay } from "@waku/utils";
84
import { expect } from "chai";
95

106
import {
@@ -68,11 +64,8 @@ describe("Throughput Sanity Checks - Different Message Sizes", function () {
6864
await delay(1000);
6965

7066
await nwaku.ensureSubscriptions([
71-
contentTopicToPubsubTopic(
72-
ContentTopic,
73-
networkConfig.clusterId,
74-
networkConfig.numShardsInCluster
75-
)
67+
AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig)
68+
.pubsubTopic
7669
]);
7770

7871
waku = await createLightNode({ networkConfig });

packages/tests/src/lib/service_node.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ import { peerIdFromString } from "@libp2p/peer-id";
33
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
44
import { ContentTopic, PubsubTopic } from "@waku/interfaces";
55
import {
6-
formatPubsubTopic,
76
isAutoSharding,
87
isDefined,
98
isStaticSharding,
10-
RoutingInfo
9+
RoutingInfo,
10+
StaticShardingRoutingInfo
1111
} from "@waku/utils";
1212
import { Logger } from "@waku/utils";
1313
import pRetry from "p-retry";
@@ -279,10 +279,10 @@ export class ServiceNode {
279279
if (this.args?.shard) {
280280
if (this.args?.shard.length > 1)
281281
throw "More that one shard passed, not supported";
282-
const pubsubTopic = formatPubsubTopic(
283-
this.args.clusterId ?? DefaultTestNetworkConfig.clusterId,
284-
this.args?.shard[0]
285-
);
282+
const pubsubTopic = StaticShardingRoutingInfo.fromShard(
283+
this.args?.shard[0],
284+
{ clusterId: this.args.clusterId ?? DefaultTestNetworkConfig.clusterId }
285+
).pubsubTopic;
286286
return this.pubsubTopicMessages(pubsubTopic);
287287
}
288288

packages/tests/src/utils/nodes.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import {
66
} from "@waku/interfaces";
77
import { createLightNode } from "@waku/sdk";
88
import {
9-
contentTopicToPubsubTopic,
10-
formatPubsubTopic,
9+
AutoShardingRoutingInfo,
1110
isAutoShardingRoutingInfo,
12-
RoutingInfo
11+
RoutingInfo,
12+
StaticShardingRoutingInfo
1313
} from "@waku/utils";
1414
import { Context } from "mocha";
1515
import pRetry from "p-retry";
@@ -75,19 +75,20 @@ export async function runMultipleNodes(
7575
if (customArgs?.shard) {
7676
const shards = customArgs?.shard ?? [];
7777
for (const s of shards) {
78-
pubsubTopics.push(formatPubsubTopic(routingInfo.clusterId, s));
78+
pubsubTopics.push(
79+
StaticShardingRoutingInfo.fromShard(s, {
80+
clusterId: routingInfo.clusterId
81+
}).pubsubTopic
82+
);
7983
}
8084
}
8185

8286
if (customArgs?.contentTopic && isAutoShardingRoutingInfo(routingInfo)) {
8387
const contentTopics = customArgs?.contentTopic ?? [];
8488
for (const ct of contentTopics) {
8589
pubsubTopics.push(
86-
contentTopicToPubsubTopic(
87-
ct,
88-
routingInfo.clusterId,
89-
routingInfo.networkConfig.numShardsInCluster
90-
)
90+
AutoShardingRoutingInfo.fromContentTopic(ct, routingInfo.networkConfig)
91+
.pubsubTopic
9192
);
9293
}
9394
}

packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createDecoder, createEncoder } from "@waku/core";
22
import { LightNode } from "@waku/interfaces";
33
import { Protocols, utf8ToBytes } from "@waku/sdk";
4-
import { createRoutingInfo, formatPubsubTopic } from "@waku/utils";
4+
import { createRoutingInfo, StaticShardingRoutingInfo } from "@waku/utils";
55

66
import {
77
afterEachCustom,
@@ -78,7 +78,9 @@ const runTests = (strictCheckNodes: boolean): void => {
7878
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
7979

8080
await nwaku2.ensureSubscriptions([
81-
formatPubsubTopic(TestClusterId, shardId)
81+
StaticShardingRoutingInfo.fromShard(shardId, {
82+
clusterId: TestClusterId
83+
}).pubsubTopic
8284
]);
8385

8486
const messageCollector2 = new MessageCollector();

packages/tests/tests/filter/utils.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { createDecoder, createEncoder } from "@waku/core";
22
import {
3-
contentTopicToShardIndex,
3+
AutoShardingRoutingInfo,
44
createRoutingInfo,
55
Logger
66
} from "@waku/utils";
@@ -11,14 +11,14 @@ export const log = new Logger("test:filter");
1111
export const TestContentTopic = "/test/1/waku-filter/default";
1212
export const TestClusterId = 2;
1313
export const TestNumShardsInCluster = 8;
14-
export const TestShardIndex = contentTopicToShardIndex(
15-
TestContentTopic,
16-
TestNumShardsInCluster
17-
);
1814
export const TestNetworkConfig = {
1915
clusterId: TestClusterId,
2016
numShardsInCluster: TestNumShardsInCluster
2117
};
18+
export const TestShardIndex = AutoShardingRoutingInfo.fromContentTopic(
19+
TestContentTopic,
20+
TestNetworkConfig
21+
).shardId;
2222
export const TestRoutingInfo = createRoutingInfo(TestNetworkConfig, {
2323
contentTopic: TestContentTopic
2424
});

packages/tests/tests/sharding/auto_sharding.spec.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { AutoSharding, LightNode } from "@waku/interfaces";
22
import { createEncoder, utf8ToBytes } from "@waku/sdk";
3-
import { contentTopicToPubsubTopic, createRoutingInfo } from "@waku/utils";
3+
import { AutoShardingRoutingInfo, createRoutingInfo } from "@waku/utils";
44
import { expect } from "chai";
55

66
import {
@@ -136,7 +136,10 @@ describe("Autosharding: Running Nodes", function () {
136136
it("Wrong topic", async function () {
137137
const wrongTopic = "wrong_format";
138138
try {
139-
contentTopicToPubsubTopic(wrongTopic, clusterId, 8);
139+
AutoShardingRoutingInfo.fromContentTopic(wrongTopic, {
140+
clusterId,
141+
numShardsInCluster: 8
142+
});
140143
throw new Error("Wrong topic should've thrown an error");
141144
} catch (err) {
142145
if (

0 commit comments

Comments
 (0)