Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.

Commit d46f0e2

Browse files
authored
Improve RPC peer error handling (#7)
Define a new error hierarchy for being unable to complete a method call. Reject method call Promises with instances of the `MethodCallError` subclasses `MethodCallTimeout` or `RPCStreamClosed` in response to the respective events. Define a new error class `UnexpectedResponse`, and emit that instead of a plain `Error` when receiving an unexpected JSON-RPC response object. Reject the method call Promise instead of throwing when calling `callMethod` while the RPC stream is already closed, for consistency with the RPC stream closing while the response is pending. No longer assert that the Writable side of the Peer is open in the `sendNotification` and `pushError` methods. This is consistent with the behaviour of the `Readable.push` method, and allows Peer to be operated in a simplex mode for sending notifications. This change also fixes a bug with handling of a request handler's error response when the Writable side is closed. Before, success responses were pushed to the Readable side but errors resulted in an unhandled Promise rejection within the library and no response being pushed. Now all responses are pushed to the Readable side irrespective of the response type or state of the Peer's Writable side.
1 parent 74cf015 commit d46f0e2

File tree

3 files changed

+176
-54
lines changed

3 files changed

+176
-54
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@fitbit/jsonrpc-ts",
3-
"version": "1.0.2",
3+
"version": "2.0.0",
44
"description": "A very flexible library for building JSON-RPC 2.0 endpoints.",
55
"files": [
66
"lib",
@@ -37,6 +37,6 @@
3737
"error-subclass": "^2.2.0"
3838
},
3939
"peerDependencies": {
40-
"io-ts": "1.4.1"
40+
"io-ts": "^1.4.2"
4141
}
4242
}

src/peer.test.ts

Lines changed: 107 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,61 @@ describe('InvalidParams', () => {
8383
});
8484
});
8585

86+
describe('MethodCallTimeout', () => {
87+
it('constructs objects which are instances of MethodCallTimeout', () => {
88+
const obj = new peer.MethodCallTimeout('foo');
89+
expect(obj).toBeInstanceOf(Error);
90+
expect(obj).toBeInstanceOf(peer.MethodCallError);
91+
expect(obj).toBeInstanceOf(peer.MethodCallTimeout);
92+
});
93+
94+
it('sets the message and method', () => {
95+
const method = 'some.method.name';
96+
expect(new peer.MethodCallTimeout(method)).toMatchObject({
97+
method,
98+
message: `No response received for RPC call to '${method}'`,
99+
});
100+
});
101+
});
102+
103+
describe('RPCStreamClosed', () => {
104+
it('constructs objects which are instances of RPCStreamClosed', () => {
105+
const obj = new peer.RPCStreamClosed('foo');
106+
expect(obj).toBeInstanceOf(Error);
107+
expect(obj).toBeInstanceOf(peer.MethodCallError);
108+
expect(obj).toBeInstanceOf(peer.RPCStreamClosed);
109+
});
110+
111+
it('sets the error message and method', () => {
112+
const method = 'some.method.name';
113+
expect(new peer.RPCStreamClosed(method)).toMatchObject({
114+
method,
115+
message: `RPC call to '${method}' could not be completed as the RPC stream is closed`,
116+
});
117+
});
118+
});
119+
120+
describe('UnexpectedResponse', () => {
121+
it('constructs objects which are instances of UnexpectedResponse', () => {
122+
const obj = new peer.UnexpectedResponse(0);
123+
expect(obj).toBeInstanceOf(Error);
124+
expect(obj).toBeInstanceOf(peer.UnexpectedResponse);
125+
});
126+
127+
it('sets the error message, kind and id', () => {
128+
expect(new peer.UnexpectedResponse('eye dee', 'error')).toMatchObject({
129+
id: 'eye dee',
130+
kind: 'error',
131+
// tslint:disable-next-line:max-line-length
132+
message: 'Received error with id \'"eye dee"\', which does not correspond to any outstanding RPC call',
133+
});
134+
});
135+
136+
it('defaults to kind "response"', () => {
137+
expect(new peer.UnexpectedResponse(0)).toHaveProperty('kind', 'response');
138+
});
139+
});
140+
86141
describe('numeric request id iterator', () => {
87142
it('does not repeat values', () => {
88143
// Not quite true; values will repeat when it wraps around.
@@ -145,9 +200,14 @@ describe('Peer', () => {
145200
});
146201
});
147202

148-
it('handles an unexpected response by emitting an error', (done) => {
203+
it('handles an unexpected response by emitting an UnexpectedResponse error', (done) => {
149204
uut.once('error', (err: Error) => {
150-
expect(err.message).toMatch(/Received response with id '55'/);
205+
expect(err).toMatchObject({
206+
message: expect.stringContaining("Received response with id \'55\'"),
207+
kind: 'response',
208+
id: 55,
209+
});
210+
expect(err).toBeInstanceOf(peer.UnexpectedResponse);
151211
done();
152212
});
153213
uut.write(jrpc.response(55));
@@ -168,11 +228,16 @@ describe('Peer', () => {
168228
});
169229

170230
it(
171-
'handles an error with an id not matching any outstanding request ' +
172-
'by emitting an error',
231+
// tslint:disable-next-line:max-line-length
232+
'handles an error with an id not matching any outstanding request by emitting an UnexpectedResponse error',
173233
(done) => {
174234
uut.once('error', (err: Error) => {
175-
expect(err.message).toMatch(/Received error with id 'yellow'/);
235+
expect(err).toMatchObject({
236+
message: expect.stringContaining('Received error with id \'"yellow"\''),
237+
kind: 'error',
238+
id: 'yellow',
239+
});
240+
expect(err).toBeInstanceOf(peer.UnexpectedResponse);
176241
done();
177242
});
178243
uut.write(jrpc.error({ id: 'yellow', message: '', code: 1 }));
@@ -361,6 +426,26 @@ describe('Peer', () => {
361426
uut.write(jrpc.request('foo', '', [5, 4, 3]));
362427
});
363428

429+
it('sends a response after the Writable side is closed', (done) => {
430+
uut.onRequest = () => {
431+
uut.end();
432+
return new Promise(resolve => setImmediate(resolve));
433+
};
434+
uut.on('data', (value) => {
435+
try {
436+
const message = jrpc.parse(value);
437+
expect(message).toMatchObject({
438+
kind: 'response',
439+
id: 'bar',
440+
});
441+
done();
442+
} catch (e) {
443+
done.fail(e);
444+
}
445+
});
446+
uut.write(jrpc.request('bar', ''));
447+
});
448+
364449
describe('sends an internal error response', () => {
365450
function testInternalError(onRequest: peer.RequestHandler) {
366451
uut.onRequest = onRequest;
@@ -433,6 +518,15 @@ describe('Peer', () => {
433518
return Promise.reject(new peer.RPCError('You dun goofed', 5555));
434519
});
435520
});
521+
522+
test('after the Writable side is closed', (done) => {
523+
testErrorResponse(done, () => {
524+
uut.end();
525+
return new Promise((resolve, reject) => {
526+
setImmediate(() => reject(new peer.RPCError('You dun goofed', 5555)));
527+
});
528+
});
529+
});
436530
});
437531

438532
it('forwards a parse error from the deserializer to the remote peer', (done) => {
@@ -480,28 +574,23 @@ describe('Peer', () => {
480574
const methodCalls = [uut.callMethod('foo'), uut.callMethod('bar')];
481575
uut.end();
482576
return Promise.all(methodCalls.map((call) => {
483-
return expect(call).rejects.toEqual(
484-
expect.objectContaining({
485-
message: expect.stringMatching(/RPC stream closed/),
486-
}),
487-
);
577+
return expect(call).rejects.toThrow(peer.RPCStreamClosed);
488578
}));
489579
});
490580

491581
describe('after the stream ends', () => {
492582
beforeEach(() => uut.end());
493583

494-
it('throws an error when attempting to call a method', () => {
495-
expect(() => uut.callMethod('foo')).toThrow(/RPC stream closed/);
584+
it('rejects when attempting to call a method', () => {
585+
return expect(uut.callMethod('foo')).rejects.toThrow(peer.RPCStreamClosed);
496586
});
497587

498-
it('throws an error when attempting to send a notification', () => {
499-
expect(() => uut.sendNotification('foo')).toThrow(/RPC stream closed/);
588+
it('does not throw when attempting to send a notification', () => {
589+
expect(() => uut.sendNotification('foo')).not.toThrow();
500590
});
501591

502-
it('throws an error when attempting to push an error', () => {
503-
expect(() => uut.pushError({ code: 0, message: 'foo' }))
504-
.toThrow(/RPC stream closed/);
592+
it('does not throw when attempting to push an error', () => {
593+
expect(() => uut.pushError({ code: 0, message: 'foo' })).not.toThrow();
505594
});
506595
});
507596

@@ -555,7 +644,7 @@ describe('Peer', () => {
555644
beforeEach(() => uut.end());
556645

557646
it('rejects with an error', () => expect(methodCall).rejects.toThrow(
558-
/RPC stream closed/,
647+
peer.RPCStreamClosed,
559648
));
560649
it('clears the timeout timer', () => expect(clearTimeout).toBeCalled());
561650
});

src/peer.ts

Lines changed: 67 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ export enum ErrorCodes {
3939
*/
4040
export class RPCError extends ErrorSubclass {
4141
static displayName = 'RPCError';
42-
readonly code: number;
43-
readonly data: any;
4442

45-
constructor(message: string, code: number = ErrorCodes.INTERNAL_ERROR, data?: any) {
43+
constructor(
44+
message: string,
45+
public readonly code: number = ErrorCodes.INTERNAL_ERROR,
46+
public readonly data?: any,
47+
) {
4648
super(message);
47-
this.code = code;
48-
this.data = data;
4949
}
5050

5151
toErrorObject(): jrpc.ErrorObject {
@@ -96,16 +96,52 @@ export class ParseError extends RPCError {
9696
}
9797
}
9898

99+
/**
100+
* The method call could not be completed.
101+
*/
102+
export class MethodCallError extends ErrorSubclass {
103+
static displayName = 'MethodCallError';
104+
105+
constructor(
106+
public readonly method: string,
107+
message = `RPC call to '${method}' could not be completed`,
108+
) {
109+
super(message);
110+
}
111+
}
112+
99113
/**
100114
* No response to a method call was received in time.
101115
*/
102-
export class MethodCallTimeout extends ErrorSubclass {
116+
export class MethodCallTimeout extends MethodCallError {
103117
static displayName = 'MethodCallTimeout';
104-
readonly method: string;
105118

106119
constructor(method: string) {
107-
super(`${method} timed out`);
108-
this.method = method;
120+
super(method, `No response received for RPC call to '${method}'`);
121+
}
122+
}
123+
124+
/**
125+
* The method call could not be completed as the Peer's writable stream
126+
* has been closed.
127+
*/
128+
export class RPCStreamClosed extends MethodCallError {
129+
static displayName = 'RPCStreamClosed';
130+
131+
constructor(method: string) {
132+
super(method, `RPC call to '${method}' could not be completed as the RPC stream is closed`);
133+
}
134+
}
135+
136+
/**
137+
* An unexpected JSON-RPC Response has been received.
138+
*/
139+
export class UnexpectedResponse extends ErrorSubclass {
140+
static displayName = 'UnexpectedResponse';
141+
142+
constructor(public readonly id: jrpc.RPCID, public readonly kind = 'response') {
143+
// tslint:disable-next-line:max-line-length
144+
super(`Received ${kind} with id '${JSON.stringify(id)}', which does not correspond to any outstanding RPC call`);
109145
}
110146
}
111147

@@ -155,6 +191,12 @@ export interface PeerOptions {
155191
idIterator?: Iterator<jrpc.RPCID>;
156192
}
157193

194+
interface PendingRequest {
195+
method: string;
196+
resolve: (value: any) => void;
197+
reject: (reason: Error) => void;
198+
}
199+
158200
/**
159201
* A JSON-RPC Peer which reads and writes JSON-RPC objects as
160202
* JavaScript objects.
@@ -175,8 +217,7 @@ export class Peer extends stream.Duplex {
175217
onNotification?: NotificationHandler;
176218
requestIdIterator: Iterator<jrpc.RPCID>;
177219

178-
private pendingRequests = new Map<
179-
jrpc.RPCID, { resolve: (value: any) => void, reject: (reason: Error) => void }>();
220+
private pendingRequests = new Map<jrpc.RPCID, PendingRequest>();
180221

181222
ended = false;
182223

@@ -201,13 +242,10 @@ export class Peer extends stream.Duplex {
201242

202243
private onend() {
203244
this.ended = true;
204-
this.pendingRequests.forEach(({ reject }) => {
205-
reject(new Error('RPC stream closed'));
245+
this.pendingRequests.forEach(({ method, reject }) => {
246+
reject(new RPCStreamClosed(method));
206247
});
207-
}
208-
209-
private assertNotEnded() {
210-
if (this.ended) throw new Error('RPC stream closed');
248+
this.pendingRequests.clear();
211249
}
212250

213251
/**
@@ -223,7 +261,7 @@ export class Peer extends stream.Duplex {
223261
params?: jrpc.RPCParams,
224262
{ timeout = undefined as number | undefined } = {},
225263
): Promise<any> {
226-
this.assertNotEnded();
264+
if (this.ended) return Promise.reject(new RPCStreamClosed(method));
227265
const idResult = this.requestIdIterator.next();
228266
if (idResult.done) {
229267
throw new Error(
@@ -245,15 +283,14 @@ export class Peer extends stream.Duplex {
245283

246284
let timer: NodeJS.Timer | undefined;
247285

248-
const promise = new Promise(
249-
(resolve: (value: any) => void, reject: (reason: any) => void) => {
250-
this.push(jrpc.request(id, method, params));
251-
this.pendingRequests.set(id, { resolve, reject });
286+
const promise = new Promise<any>((resolve, reject) => {
287+
this.push(jrpc.request(id, method, params));
288+
this.pendingRequests.set(id, { method, resolve, reject });
252289

253-
if (timeout !== undefined) {
254-
timer = setTimeout(() => reject(new MethodCallTimeout(method)), timeout);
255-
}
256-
});
290+
if (timeout !== undefined) {
291+
timer = setTimeout(() => reject(new MethodCallTimeout(method)), timeout);
292+
}
293+
});
257294

258295
if (timer !== undefined) {
259296
const timerRef = timer;
@@ -274,14 +311,12 @@ export class Peer extends stream.Duplex {
274311

275312
/** Send an RPC Notification object to the remote peer. */
276313
sendNotification(method: string, params?: jrpc.RPCParams) {
277-
this.assertNotEnded();
278-
this.push(jrpc.notification(method, params));
314+
return this.push(jrpc.notification(method, params));
279315
}
280316

281317
/** Push an RPC Error object to the remote peer. */
282318
pushError(error: jrpc.ErrorObject) {
283-
this.assertNotEnded();
284-
this.push(jrpc.error(error));
319+
return this.push(jrpc.error(error));
285320
}
286321

287322
// tslint:disable-next-line:function-name
@@ -392,8 +427,7 @@ export class Peer extends stream.Duplex {
392427
this.pendingRequests.delete(id);
393428
rpcCall.resolve(result);
394429
} else {
395-
throw new Error(
396-
`Received response with id '${id}', which does not correspond to any outstanding RPC call`);
430+
throw new UnexpectedResponse(id);
397431
}
398432
}
399433

@@ -413,8 +447,7 @@ export class Peer extends stream.Duplex {
413447
this.pendingRequests.delete(id);
414448
rpcCall.reject(rpcError);
415449
} else {
416-
throw new Error(
417-
`Received error with id '${id}', which does not correspond to any outstanding RPC call`);
450+
throw new UnexpectedResponse(id, 'error');
418451
}
419452
} else {
420453
throw rpcError;

0 commit comments

Comments
 (0)