Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@
}
],
"@typescript-eslint/explicit-member-accessibility": "error",
"@typescript-eslint/member-ordering": [
"error",
{
"default": [
"static-field",
"static-method",
"public-field",
"public-method",
"private-field",
"private-method"
]
}
],
"@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_" }],
"prettier/prettier": [
"error",
Expand Down
20 changes: 10 additions & 10 deletions packages/core/src/lib/connection_manager/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,6 @@ type CreateKeepAliveManagerOptions = {
};

export class KeepAliveManager {
private readonly relay?: IRelay;
private readonly libp2p: Libp2p;

private readonly options: KeepAliveOptions;

private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>> =
new Map();
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>[]> =
new Map();

public constructor({
options,
relay,
Expand Down Expand Up @@ -129,6 +119,16 @@ export class KeepAliveManager {
);
}

private readonly relay?: IRelay;
private readonly libp2p: Libp2p;

private readonly options: KeepAliveOptions;

private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>> =
new Map();
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>[]> =
new Map();

private scheduleRelayPings(
relay: IRelay,
relayPeriodSecs: number,
Expand Down
9 changes: 6 additions & 3 deletions packages/core/src/lib/filter/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ type IncomingMessageHandler = (
) => Promise<void>;

export class FilterCore {
private streamManager: StreamManager;

public readonly multicodec = FilterCodecs.SUBSCRIBE;
public readonly pubsubTopics: PubsubTopic[];

public constructor(
private handleIncomingMessage: IncomingMessageHandler,
public readonly pubsubTopics: PubsubTopic[],
pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
this.pubsubTopics = pubsubTopics;
this.streamManager = new StreamManager(
FilterCodecs.SUBSCRIBE,
libp2p.components
Expand Down Expand Up @@ -282,6 +282,9 @@ export class FilterCore {
};
}

private streamManager: StreamManager;
private handleIncomingMessage: IncomingMessageHandler;

private onRequest(streamData: IncomingStreamData): void {
const { connection, stream } = streamData;
const { remotePeer } = connection;
Expand Down
83 changes: 41 additions & 42 deletions packages/core/src/lib/light_push/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,53 +32,14 @@ type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class LightPushCore {
private readonly streamManager: StreamManager;

public readonly multicodec = LightPushCodec;
public readonly pubsubTopics: PubsubTopic[];

public constructor(
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
public constructor(pubsubTopics: PubsubTopic[], libp2p: Libp2p) {
this.pubsubTopics = pubsubTopics;
this.streamManager = new StreamManager(LightPushCodec, libp2p.components);
}

private async preparePushMessage(
encoder: IEncoder,
message: IMessage
): Promise<PreparePushMessageResult> {
try {
if (!message.payload || message.payload.length === 0) {
log.error("Failed to send waku light push: payload is empty");
return { query: null, error: ProtocolError.EMPTY_PAYLOAD };
}

if (!(await isMessageSizeUnderCap(encoder, message))) {
log.error("Failed to send waku light push: message is bigger than 1MB");
return { query: null, error: ProtocolError.SIZE_TOO_BIG };
}

const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log.error("Failed to encode to protoMessage, aborting push");
return {
query: null,
error: ProtocolError.ENCODE_FAILED
};
}

const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic);
return { query, error: null };
} catch (error) {
log.error("Failed to prepare push message", error);

return {
query: null,
error: ProtocolError.GENERIC_FAIL
};
}
}

public async send(
encoder: IEncoder,
message: IMessage,
Expand Down Expand Up @@ -188,4 +149,42 @@ export class LightPushCore {

return { success: peerId, failure: null };
}

private readonly streamManager: StreamManager;

private async preparePushMessage(
encoder: IEncoder,
message: IMessage
): Promise<PreparePushMessageResult> {
try {
if (!message.payload || message.payload.length === 0) {
log.error("Failed to send waku light push: payload is empty");
return { query: null, error: ProtocolError.EMPTY_PAYLOAD };
}

if (!(await isMessageSizeUnderCap(encoder, message))) {
log.error("Failed to send waku light push: message is bigger than 1MB");
return { query: null, error: ProtocolError.SIZE_TOO_BIG };
}

const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log.error("Failed to encode to protoMessage, aborting push");
return {
query: null,
error: ProtocolError.ENCODE_FAILED
};
}

const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic);
return { query, error: null };
} catch (error) {
log.error("Failed to prepare push message", error);

return {
query: null,
error: ProtocolError.GENERIC_FAIL
};
}
}
}
8 changes: 4 additions & 4 deletions packages/core/src/lib/metadata/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ const log = new Logger("metadata");
export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata implements IMetadata {
private readonly streamManager: StreamManager;
private readonly libp2pComponents: Libp2pComponents;
protected handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();

public readonly multicodec = MetadataCodec;

public constructor(
Expand Down Expand Up @@ -106,6 +102,10 @@ class Metadata implements IMetadata {
return await this.query(peerId);
}

private readonly streamManager: StreamManager;
private readonly libp2pComponents: Libp2pComponents;
protected handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();

/**
* Handle an incoming metadata request
*/
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const log = new Logger("store");
export const StoreCodec = "/vac/waku/store-query/3.0.0";

export class StoreCore {
private readonly streamManager: StreamManager;

public readonly multicodec = StoreCodec;

public constructor(libp2p: Libp2p) {
Expand Down Expand Up @@ -144,4 +142,6 @@ export class StoreCore {
}
}
}

private readonly streamManager: StreamManager;
}
10 changes: 5 additions & 5 deletions packages/core/src/lib/stream_manager/stream_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import { selectOpenConnection } from "./utils.js";
const STREAM_LOCK_KEY = "consumed";

export class StreamManager {
private readonly log: Logger;

private ongoingCreation: Set<string> = new Set();
private streamPool: Map<string, Promise<void>> = new Map();

public constructor(
private multicodec: string,
private readonly libp2p: Libp2pComponents
Expand Down Expand Up @@ -48,6 +43,11 @@ export class StreamManager {
return stream;
}

private readonly log: Logger;

private ongoingCreation: Set<string> = new Set();
private streamPool: Map<string, Promise<void>> = new Map();

private async createStream(peerId: PeerId, retries = 0): Promise<Stream> {
const connections = this.libp2p.connectionManager.getConnections(peerId);
const connection = selectOpenConnection(connections);
Expand Down
6 changes: 3 additions & 3 deletions packages/discovery/src/dns/dns.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ const errorBranchB = `enrtree-branch:${branchDomainD}`;
* Mocks DNS resolution.
*/
class MockDNS implements DnsClient {
private fqdnRes: Map<string, string[]>;
private fqdnThrows: string[];

public constructor() {
this.fqdnRes = new Map();
this.fqdnThrows = [];
Expand All @@ -57,6 +54,9 @@ class MockDNS implements DnsClient {

return Promise.resolve(res);
}

private fqdnRes: Map<string, string[]>;
private fqdnThrows: string[];
}

describe("DNS Node Discovery", () => {
Expand Down
18 changes: 9 additions & 9 deletions packages/discovery/src/dns/dns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import {
const log = new Logger("discovery:dns");

export class DnsNodeDiscovery {
private readonly dns: DnsClient;
private readonly _DNSTreeCache: { [key: string]: string };
private readonly _errorTolerance: number = 10;

public static async dnsOverHttp(
dnsClient?: DnsClient
): Promise<DnsNodeDiscovery> {
Expand All @@ -30,6 +26,11 @@ export class DnsNodeDiscovery {
return new DnsNodeDiscovery(dnsClient);
}

public constructor(dns: DnsClient) {
this.dns = dns;
this._DNSTreeCache = {};
}

/**
* Returns a list of verified peers listed in an EIP-1459 DNS tree. Method may
* return fewer peers than requested if @link wantedNodeCapabilityCount requires
Expand Down Expand Up @@ -66,11 +67,6 @@ export class DnsNodeDiscovery {
return peers;
}

public constructor(dns: DnsClient) {
this._DNSTreeCache = {};
this.dns = dns;
}

/**
* {@inheritDoc getPeers}
*/
Expand All @@ -95,6 +91,10 @@ export class DnsNodeDiscovery {
}
}

private readonly dns: DnsClient;
private readonly _DNSTreeCache: { [key: string]: string };
private readonly _errorTolerance: number = 10;

/**
* Runs a recursive, randomized descent of the DNS tree to retrieve a single
* ENR record as an ENR. Returns null if parsing or DNS resolution fails.
Expand Down
10 changes: 5 additions & 5 deletions packages/discovery/src/dns/dns_discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ export class PeerDiscoveryDns
extends TypedEventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery, DiscoveryTrigger
{
private nextPeer: (() => AsyncGenerator<IEnr>) | undefined;
private _started: boolean;
private _components: DnsDiscoveryComponents;
private _options: DnsDiscOptions;

public constructor(
components: DnsDiscoveryComponents,
options: DnsDiscOptions
Expand Down Expand Up @@ -140,6 +135,11 @@ export class PeerDiscoveryDns
public get [Symbol.toStringTag](): string {
return DNS_DISCOVERY_TAG;
}

private nextPeer: (() => AsyncGenerator<IEnr>) | undefined;
private _started: boolean;
private _components: DnsDiscoveryComponents;
private _options: DnsDiscOptions;
}

export function wakuDnsDiscovery(
Expand Down
6 changes: 3 additions & 3 deletions packages/discovery/src/local-peer-cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ export class LocalPeerCacheDiscovery
extends TypedEventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery, Startable
{
private isStarted: boolean;
private peers: LocalStoragePeerInfo[] = [];

public constructor(
private readonly components: Libp2pComponents,
private readonly options?: LocalPeerCacheDiscoveryOptions
Expand Down Expand Up @@ -122,6 +119,9 @@ export class LocalPeerCacheDiscovery
this.savePeersToLocalStorage();
};

private isStarted: boolean;
private peers: LocalStoragePeerInfo[] = [];

private getPeersFromLocalStorage(): LocalStoragePeerInfo[] {
try {
const storedPeersData = localStorage.getItem("waku:peers");
Expand Down
14 changes: 7 additions & 7 deletions packages/discovery/src/peer-exchange/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ export class PeerExchangeRPC {
}

/**
* Encode the current PeerExchangeRPC request to bytes
* Decode the current PeerExchangeRPC request to bytes
* @returns Uint8Array
*/
public encode(): Uint8Array {
return proto.PeerExchangeRPC.encode(this.proto);
public static decode(bytes: Uint8ArrayList): PeerExchangeRPC {
const res = proto.PeerExchangeRPC.decode(bytes);
return new PeerExchangeRPC(res);
}

/**
* Decode the current PeerExchangeRPC request to bytes
* Encode the current PeerExchangeRPC request to bytes
* @returns Uint8Array
*/
public static decode(bytes: Uint8ArrayList): PeerExchangeRPC {
const res = proto.PeerExchangeRPC.decode(bytes);
return new PeerExchangeRPC(res);
public encode(): Uint8Array {
return proto.PeerExchangeRPC.encode(this.proto);
}

public get query(): proto.PeerExchangeQuery | undefined {
Expand Down
Loading