Skip to content

Commit 4aecb92

Browse files
fix: WebSocket connections should not write if readyState is CLOSING or CLOSED. WebSocketDuplex connection should handle invalid frames.
1 parent b559817 commit 4aecb92

File tree

3 files changed

+76
-74
lines changed

3 files changed

+76
-74
lines changed

packages/rsocket-websocket-server/src/WebsocketDuplexConnection.ts

Lines changed: 58 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@ import {
2424
FrameHandler,
2525
Multiplexer,
2626
Outbound,
27-
serializeFrame,
28-
} from "rsocket-core";
29-
import { Duplex } from "stream";
30-
31-
export class WebsocketDuplexConnection
32-
extends Deferred
33-
implements DuplexConnection, Outbound
34-
{
27+
serializeFrame
28+
} from 'rsocket-core';
29+
import { Duplex } from 'stream';
30+
import WebSocket from 'ws';
31+
32+
export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound {
3533
readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler;
3634

3735
constructor(
@@ -40,18 +38,16 @@ export class WebsocketDuplexConnection
4038
multiplexerDemultiplexerFactory: (
4139
frame: Frame,
4240
outbound: Outbound & Closeable
43-
) => Multiplexer & Demultiplexer & FrameHandler
41+
) => Multiplexer & Demultiplexer & FrameHandler,
42+
private rawSocket: WebSocket.WebSocket
4443
) {
4544
super();
4645

47-
websocketDuplex.on("close", this.handleClosed);
48-
websocketDuplex.on("error", this.handleError);
49-
websocketDuplex.on("data", this.handleMessage);
46+
websocketDuplex.on('close', this.handleClosed);
47+
websocketDuplex.on('error', this.handleError);
48+
websocketDuplex.on('data', this.handleMessage);
5049

51-
this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(
52-
frame,
53-
this
54-
);
50+
this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this);
5551
}
5652

5753
get availability(): number {
@@ -77,32 +73,40 @@ export class WebsocketDuplexConnection
7773
return;
7874
}
7975

80-
// if (__DEV__) {
81-
// if (this._options.debug) {
82-
// console.log(printFrame(frame));
83-
// }
84-
// }
85-
const buffer =
86-
/* this._options.lengthPrefixedFrames
87-
? serializeFrameWithLength(frame, this._encoders)
88-
:*/ serializeFrame(frame);
89-
// if (!this._socket) {
90-
// throw new Error(
91-
// "RSocketWebSocketClient: Cannot send frame, not connected."
92-
// );
93-
// }
94-
this.websocketDuplex.write(buffer);
76+
try {
77+
// if (__DEV__) {
78+
// if (this._options.debug) {
79+
// console.log(printFrame(frame));
80+
// }
81+
// }
82+
const buffer =
83+
/* this._options.lengthPrefixedFrames
84+
? serializeFrameWithLength(frame, this._encoders)
85+
:*/ serializeFrame(frame);
86+
// if (!this._socket) {
87+
// throw new Error(
88+
// "RSocketWebSocketClient: Cannot send frame, not connected."
89+
// );
90+
// }
91+
92+
// Work around for this issue
93+
// https://github.com/websockets/ws/issues/1515
94+
if (this.rawSocket.readyState == this.rawSocket.CLOSING || this.rawSocket.readyState == this.rawSocket.CLOSED) {
95+
this.close(new Error('WebSocket is closing'));
96+
return;
97+
}
98+
99+
this.websocketDuplex.write(buffer);
100+
} catch (ex) {
101+
this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`));
102+
}
95103
}
96104

97-
private handleClosed = (e: CloseEvent): void => {
98-
this.close(
99-
new Error(
100-
e.reason || "WebsocketDuplexConnection: Socket closed unexpectedly."
101-
)
102-
);
105+
private handleClosed = (e: WebSocket.CloseEvent): void => {
106+
this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.'));
103107
};
104108

105-
private handleError = (e: ErrorEvent): void => {
109+
private handleError = (e: WebSocket.ErrorEvent): void => {
106110
this.close(e.error);
107111
};
108112

@@ -125,23 +129,27 @@ export class WebsocketDuplexConnection
125129

126130
static create(
127131
socket: Duplex,
128-
connectionAcceptor: (
129-
frame: Frame,
130-
connection: DuplexConnection
131-
) => Promise<void>,
132+
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
132133
multiplexerDemultiplexerFactory: (
133134
frame: Frame,
134135
outbound: Outbound & Closeable
135-
) => Multiplexer & Demultiplexer & FrameHandler
136+
) => Multiplexer & Demultiplexer & FrameHandler,
137+
rawSocket: WebSocket.WebSocket
136138
): void {
137139
// TODO: timeout on no data?
138-
socket.once("data", async (buffer) => {
139-
const frame = deserializeFrame(buffer);
140-
const connection = new WebsocketDuplexConnection(
141-
socket,
142-
frame,
143-
multiplexerDemultiplexerFactory
144-
);
140+
socket.once('data', async (buffer) => {
141+
let frame: Frame | undefined = undefined;
142+
try {
143+
frame = deserializeFrame(buffer);
144+
if (!frame) {
145+
throw new Error(`Unable to deserialize frame`);
146+
}
147+
} catch (ex) {
148+
// The initial frame should always be parsable
149+
return socket.end();
150+
}
151+
152+
const connection = new WebsocketDuplexConnection(socket, frame, multiplexerDemultiplexerFactory, rawSocket);
145153
if (connection.done) {
146154
return;
147155
}

packages/rsocket-websocket-server/src/WebsocketServerTransport.ts

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import {
2323
FrameHandler,
2424
Multiplexer,
2525
Outbound,
26-
ServerTransport,
27-
} from "rsocket-core";
28-
import WebSocket, { Server } from "ws";
29-
import { WebsocketDuplexConnection } from "./WebsocketDuplexConnection";
26+
ServerTransport
27+
} from 'rsocket-core';
28+
import WebSocket, { Server } from 'ws';
29+
import { WebsocketDuplexConnection } from './WebsocketDuplexConnection';
3030

3131
export type SocketFactory = (options: SocketOptions) => Server;
3232

@@ -43,7 +43,7 @@ export type ServerOptions = SocketOptions & {
4343
const defaultFactory: SocketFactory = (options: SocketOptions) => {
4444
return new Server({
4545
host: options.host,
46-
port: options.port,
46+
port: options.port
4747
});
4848
};
4949

@@ -59,10 +59,7 @@ export class WebsocketServerTransport implements ServerTransport {
5959
}
6060

6161
async bind(
62-
connectionAcceptor: (
63-
frame: Frame,
64-
connection: DuplexConnection
65-
) => Promise<void>,
62+
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
6663
multiplexerDemultiplexerFactory: (
6764
frame: Frame,
6865
outbound: Outbound & Closeable
@@ -72,22 +69,18 @@ export class WebsocketServerTransport implements ServerTransport {
7269
const serverCloseable = new ServerCloseable(websocketServer);
7370

7471
const connectionListener = (websocket: WebSocket) => {
75-
websocket.binaryType = "nodebuffer";
72+
websocket.binaryType = 'nodebuffer';
7673
const duplex = WebSocket.createWebSocketStream(websocket);
77-
WebsocketDuplexConnection.create(
78-
duplex,
79-
connectionAcceptor,
80-
multiplexerDemultiplexerFactory
81-
);
74+
WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket);
8275
};
8376

8477
const closeListener = (error?: Error) => {
8578
serverCloseable.close(error);
8679
};
8780

88-
websocketServer.addListener("connection", connectionListener);
89-
websocketServer.addListener("close", closeListener);
90-
websocketServer.addListener("error", closeListener);
81+
websocketServer.addListener('connection', connectionListener);
82+
websocketServer.addListener('close', closeListener);
83+
websocketServer.addListener('error', closeListener);
9184

9285
return serverCloseable;
9386
}
@@ -96,16 +89,16 @@ export class WebsocketServerTransport implements ServerTransport {
9689
return new Promise((resolve, reject) => {
9790
const websocketServer = this.factory({
9891
host: this.host,
99-
port: this.port,
92+
port: this.port
10093
});
10194

10295
const earlyCloseListener = (error?: Error) => {
10396
reject(error);
10497
};
10598

106-
websocketServer.addListener("close", earlyCloseListener);
107-
websocketServer.addListener("error", earlyCloseListener);
108-
websocketServer.addListener("listening", () => resolve(websocketServer));
99+
websocketServer.addListener('close', earlyCloseListener);
100+
websocketServer.addListener('error', earlyCloseListener);
101+
websocketServer.addListener('listening', () => resolve(websocketServer));
109102
});
110103
}
111104
}

packages/rsocket-websocket-server/src/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
"use strict";
17+
'use strict';
1818

19-
export * from "./WebsocketServerTransport";
19+
export * from './WebsocketServerTransport';
20+
export * from './WebsocketDuplexConnection';

0 commit comments

Comments
 (0)