diff --git a/packages/rsocket-websocket-server/src/WebsocketDuplexConnection.ts b/packages/rsocket-websocket-server/src/WebsocketDuplexConnection.ts index e2479516..7d4116ff 100644 --- a/packages/rsocket-websocket-server/src/WebsocketDuplexConnection.ts +++ b/packages/rsocket-websocket-server/src/WebsocketDuplexConnection.ts @@ -24,14 +24,12 @@ import { FrameHandler, Multiplexer, Outbound, - serializeFrame, -} from "rsocket-core"; -import { Duplex } from "stream"; - -export class WebsocketDuplexConnection - extends Deferred - implements DuplexConnection, Outbound -{ + serializeFrame +} from 'rsocket-core'; +import { Duplex } from 'stream'; +import WebSocket from 'ws'; + +export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound { readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler; constructor( @@ -40,18 +38,16 @@ export class WebsocketDuplexConnection multiplexerDemultiplexerFactory: ( frame: Frame, outbound: Outbound & Closeable - ) => Multiplexer & Demultiplexer & FrameHandler + ) => Multiplexer & Demultiplexer & FrameHandler, + private rawSocket: WebSocket.WebSocket ) { super(); - websocketDuplex.on("close", this.handleClosed); - websocketDuplex.on("error", this.handleError); - websocketDuplex.on("data", this.handleMessage); + websocketDuplex.on('close', this.handleClosed); + websocketDuplex.on('error', this.handleError); + websocketDuplex.on('data', this.handleMessage); - this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory( - frame, - this - ); + this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this); } get availability(): number { @@ -77,32 +73,40 @@ export class WebsocketDuplexConnection return; } - // if (__DEV__) { - // if (this._options.debug) { - // console.log(printFrame(frame)); - // } - // } - const buffer = - /* this._options.lengthPrefixedFrames - ? serializeFrameWithLength(frame, this._encoders) - :*/ serializeFrame(frame); - // if (!this._socket) { - // throw new Error( - // "RSocketWebSocketClient: Cannot send frame, not connected." - // ); - // } - this.websocketDuplex.write(buffer); + try { + // if (__DEV__) { + // if (this._options.debug) { + // console.log(printFrame(frame)); + // } + // } + const buffer = + /* this._options.lengthPrefixedFrames + ? serializeFrameWithLength(frame, this._encoders) + :*/ serializeFrame(frame); + // if (!this._socket) { + // throw new Error( + // "RSocketWebSocketClient: Cannot send frame, not connected." + // ); + // } + + // Work around for this issue + // https://github.com/websockets/ws/issues/1515 + if (this.rawSocket.readyState == this.rawSocket.CLOSING || this.rawSocket.readyState == this.rawSocket.CLOSED) { + this.close(new Error('WebSocket is closing')); + return; + } + + this.websocketDuplex.write(buffer); + } catch (ex) { + this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`)); + } } - private handleClosed = (e: CloseEvent): void => { - this.close( - new Error( - e.reason || "WebsocketDuplexConnection: Socket closed unexpectedly." - ) - ); + private handleClosed = (e: WebSocket.CloseEvent): void => { + this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.')); }; - private handleError = (e: ErrorEvent): void => { + private handleError = (e: WebSocket.ErrorEvent): void => { this.close(e.error); }; @@ -125,23 +129,27 @@ export class WebsocketDuplexConnection static create( socket: Duplex, - connectionAcceptor: ( - frame: Frame, - connection: DuplexConnection - ) => Promise, + connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise, multiplexerDemultiplexerFactory: ( frame: Frame, outbound: Outbound & Closeable - ) => Multiplexer & Demultiplexer & FrameHandler + ) => Multiplexer & Demultiplexer & FrameHandler, + rawSocket: WebSocket.WebSocket ): void { // TODO: timeout on no data? - socket.once("data", async (buffer) => { - const frame = deserializeFrame(buffer); - const connection = new WebsocketDuplexConnection( - socket, - frame, - multiplexerDemultiplexerFactory - ); + socket.once('data', async (buffer) => { + let frame: Frame | undefined = undefined; + try { + frame = deserializeFrame(buffer); + if (!frame) { + throw new Error(`Unable to deserialize frame`); + } + } catch (ex) { + // The initial frame should always be parsable + return socket.end(); + } + + const connection = new WebsocketDuplexConnection(socket, frame, multiplexerDemultiplexerFactory, rawSocket); if (connection.done) { return; } diff --git a/packages/rsocket-websocket-server/src/WebsocketServerTransport.ts b/packages/rsocket-websocket-server/src/WebsocketServerTransport.ts index 4d0f78f9..86b6ecf2 100644 --- a/packages/rsocket-websocket-server/src/WebsocketServerTransport.ts +++ b/packages/rsocket-websocket-server/src/WebsocketServerTransport.ts @@ -23,10 +23,10 @@ import { FrameHandler, Multiplexer, Outbound, - ServerTransport, -} from "rsocket-core"; -import WebSocket, { Server } from "ws"; -import { WebsocketDuplexConnection } from "./WebsocketDuplexConnection"; + ServerTransport +} from 'rsocket-core'; +import WebSocket, { Server } from 'ws'; +import { WebsocketDuplexConnection } from './WebsocketDuplexConnection'; export type SocketFactory = (options: SocketOptions) => Server; @@ -43,7 +43,7 @@ export type ServerOptions = SocketOptions & { const defaultFactory: SocketFactory = (options: SocketOptions) => { return new Server({ host: options.host, - port: options.port, + port: options.port }); }; @@ -59,10 +59,7 @@ export class WebsocketServerTransport implements ServerTransport { } async bind( - connectionAcceptor: ( - frame: Frame, - connection: DuplexConnection - ) => Promise, + connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise, multiplexerDemultiplexerFactory: ( frame: Frame, outbound: Outbound & Closeable @@ -72,22 +69,18 @@ export class WebsocketServerTransport implements ServerTransport { const serverCloseable = new ServerCloseable(websocketServer); const connectionListener = (websocket: WebSocket) => { - websocket.binaryType = "nodebuffer"; + websocket.binaryType = 'nodebuffer'; const duplex = WebSocket.createWebSocketStream(websocket); - WebsocketDuplexConnection.create( - duplex, - connectionAcceptor, - multiplexerDemultiplexerFactory - ); + WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket); }; const closeListener = (error?: Error) => { serverCloseable.close(error); }; - websocketServer.addListener("connection", connectionListener); - websocketServer.addListener("close", closeListener); - websocketServer.addListener("error", closeListener); + websocketServer.addListener('connection', connectionListener); + websocketServer.addListener('close', closeListener); + websocketServer.addListener('error', closeListener); return serverCloseable; } @@ -96,16 +89,16 @@ export class WebsocketServerTransport implements ServerTransport { return new Promise((resolve, reject) => { const websocketServer = this.factory({ host: this.host, - port: this.port, + port: this.port }); const earlyCloseListener = (error?: Error) => { reject(error); }; - websocketServer.addListener("close", earlyCloseListener); - websocketServer.addListener("error", earlyCloseListener); - websocketServer.addListener("listening", () => resolve(websocketServer)); + websocketServer.addListener('close', earlyCloseListener); + websocketServer.addListener('error', earlyCloseListener); + websocketServer.addListener('listening', () => resolve(websocketServer)); }); } } diff --git a/packages/rsocket-websocket-server/src/index.ts b/packages/rsocket-websocket-server/src/index.ts index 443a81b1..72282bdb 100644 --- a/packages/rsocket-websocket-server/src/index.ts +++ b/packages/rsocket-websocket-server/src/index.ts @@ -14,6 +14,7 @@ * limitations under the License. */ -"use strict"; +'use strict'; -export * from "./WebsocketServerTransport"; +export * from './WebsocketServerTransport'; +export * from './WebsocketDuplexConnection';