Skip to content

Commit 121a12b

Browse files
committed
refact(rpc): pubsub improvements
1 parent 35f0e0b commit 121a12b

File tree

1 file changed

+23
-6
lines changed

1 file changed

+23
-6
lines changed

lib/rpc.js

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,29 @@ class RpcServer {
5858
this.socket = socket;
5959
this.socket.pipe(new JSONFrameStream(this._processFrame.bind(this)));
6060
this.methods = methods;
61+
this.channelUnsubscribe = new Map();
62+
this.socket.on('close', () => {
63+
this.channelUnsubscribe.forEach(unsubscribe => unsubscribe());
64+
this.channelUnsubscribe.clear();
65+
});
6166
}
6267

6368
_processFrame(obj) {
6469
let args = obj.args || [];
6570
if(obj.method == 'subscribe') {
66-
this.methods.subscribe(obj.channel, (pubsub) => {
67-
this.socket.write(JSONFrameStream.makeFrame({pubsub}));
68-
});
71+
if(this.channelUnsubscribe.has('*')) {
72+
return;
73+
}
74+
if(obj.channel === '*') {
75+
this.channelUnsubscribe.forEach(unsubscribe => unsubscribe());
76+
this.channelUnsubscribe.clear();
77+
}
78+
if(!this.channelUnsubscribe.has(obj.channel)) {
79+
let unsubscribe = this.methods.subscribe(obj.channel, (pubsub) => {
80+
this.socket.write(JSONFrameStream.makeFrame({pubsub}));
81+
});
82+
this.channelUnsubscribe.set(obj.channel, unsubscribe);
83+
}
6984
return;
7085
}
7186
this.methods[obj.method].apply(null, args.concat([(error, result) => {
@@ -119,13 +134,15 @@ class RpcClient {
119134
this.defers.set(this.requestId, defer);
120135
return defer.promise;
121136
}
122-
subscribe(channel, cb) {
137+
subscribe(channelToSubscribe, callback) {
123138
let request = {
124139
method: 'subscribe',
125-
channel
140+
channel: channelToSubscribe
126141
};
127142
this.socket.write(JSONFrameStream.makeFrame(request));
128-
this.pubsub.addListener(channel, cb);
143+
this.pubsub.addListener(channelToSubscribe, (channel, ...args) => {
144+
callback.apply({channel}, args);
145+
});
129146
}
130147
}
131148

0 commit comments

Comments
 (0)