11'use strict'
22const From = require ( '@fastify/reply-from' )
3+ const { ServerResponse } = require ( 'http' )
34const WebSocket = require ( 'ws' )
45const { convertUrlToWebSocket } = require ( './utils' )
56const fp = require ( 'fastify-plugin' )
67
78const httpMethods = [ 'DELETE' , 'GET' , 'HEAD' , 'PATCH' , 'POST' , 'PUT' , 'OPTIONS' ]
89const urlPattern = / ^ h t t p s ? : \/ \/ /
10+ const kWs = Symbol ( 'ws' )
11+ const kWsHead = Symbol ( 'wsHead' )
912
1013function liftErrorCode ( code ) {
14+ /* istanbul ignore next */
1115 if ( typeof code !== 'number' ) {
1216 // Sometimes "close" event emits with a non-numeric value
1317 return 1011
@@ -33,9 +37,11 @@ function waitConnection (socket, write) {
3337 }
3438}
3539
36- function isExternalUrl ( url = '' ) {
40+ function isExternalUrl ( url ) {
3741 return urlPattern . test ( url )
38- } ;
42+ }
43+
44+ function noop ( ) { }
3945
4046function proxyWebSockets ( source , target ) {
4147 function close ( code , reason ) {
@@ -44,18 +50,26 @@ function proxyWebSockets (source, target) {
4450 }
4551
4652 source . on ( 'message' , ( data , binary ) => waitConnection ( target , ( ) => target . send ( data , { binary } ) ) )
53+ /* istanbul ignore next */
4754 source . on ( 'ping' , data => waitConnection ( target , ( ) => target . ping ( data ) ) )
55+ /* istanbul ignore next */
4856 source . on ( 'pong' , data => waitConnection ( target , ( ) => target . pong ( data ) ) )
4957 source . on ( 'close' , close )
58+ /* istanbul ignore next */
5059 source . on ( 'error' , error => close ( 1011 , error . message ) )
60+ /* istanbul ignore next */
5161 source . on ( 'unexpected-response' , ( ) => close ( 1011 , 'unexpected response' ) )
5262
5363 // source WebSocket is already connected because it is created by ws server
5464 target . on ( 'message' , ( data , binary ) => source . send ( data , { binary } ) )
65+ /* istanbul ignore next */
5566 target . on ( 'ping' , data => source . ping ( data ) )
67+ /* istanbul ignore next */
5668 target . on ( 'pong' , data => source . pong ( data ) )
5769 target . on ( 'close' , close )
70+ /* istanbul ignore next */
5871 target . on ( 'error' , error => close ( 1011 , error . message ) )
72+ /* istanbul ignore next */
5973 target . on ( 'unexpected-response' , ( ) => close ( 1011 , 'unexpected response' ) )
6074}
6175
@@ -64,37 +78,61 @@ class WebSocketProxy {
6478 this . logger = fastify . log
6579
6680 const wss = new WebSocket . Server ( {
67- server : fastify . server ,
81+ noServer : true ,
6882 ...wsServerOptions
6983 } )
7084
85+ fastify . server . on ( 'upgrade' , ( rawRequest , socket , head ) => {
86+ // Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
87+ rawRequest [ kWs ] = socket
88+ rawRequest [ kWsHead ] = head
89+
90+ const rawResponse = new ServerResponse ( rawRequest )
91+ rawResponse . assignSocket ( socket )
92+ fastify . routing ( rawRequest , rawResponse )
93+
94+ rawResponse . on ( 'finish' , ( ) => {
95+ socket . destroy ( )
96+ } )
97+ } )
98+
99+ this . handleUpgrade = ( request , cb ) => {
100+ wss . handleUpgrade ( request . raw , request . raw [ kWs ] , request . raw [ kWsHead ] , ( socket ) => {
101+ this . handleConnection ( socket , request )
102+ cb ( )
103+ } )
104+ }
105+
71106 // To be able to close the HTTP server,
72107 // all WebSocket clients need to be disconnected.
73108 // Fastify is missing a pre-close event, or the ability to
74109 // add a hook before the server.close call. We need to resort
75110 // to monkeypatching for now.
76- const oldClose = fastify . server . close
77- fastify . server . close = function ( done ) {
78- for ( const client of wss . clients ) {
79- client . close ( )
111+ {
112+ const oldClose = fastify . server . close
113+ fastify . server . close = function ( done ) {
114+ wss . close ( ( ) => {
115+ oldClose . call ( this , ( err ) => {
116+ /* istanbul ignore next */
117+ done && done ( err )
118+ } )
119+ } )
120+ for ( const client of wss . clients ) {
121+ client . close ( )
122+ }
80123 }
81- oldClose . call ( this , done )
82124 }
83125
126+ /* istanbul ignore next */
84127 wss . on ( 'error' , ( err ) => {
128+ /* istanbul ignore next */
85129 this . logger . error ( err )
86130 } )
87131
88- wss . on ( 'connection' , this . handleConnection . bind ( this ) )
89-
90132 this . wss = wss
91133 this . prefixList = [ ]
92134 }
93135
94- close ( done ) {
95- this . wss . close ( done )
96- }
97-
98136 addUpstream ( prefix , rewritePrefix , upstream , wsClientOptions ) {
99137 this . prefixList . push ( {
100138 prefix : new URL ( prefix , 'ws://127.0.0.1' ) . pathname ,
@@ -117,66 +155,64 @@ class WebSocketProxy {
117155 }
118156 }
119157
120- return undefined
158+ /* istanbul ignore next */
159+ throw new Error ( `no upstream found for ${ request . url } . this should not happened. Please report to https://github.com/fastify/fastify-http-proxy` )
121160 }
122161
123162 handleConnection ( source , request ) {
124163 const upstream = this . findUpstream ( request )
125- if ( ! upstream ) {
126- this . logger . debug ( { url : request . url } , 'not matching prefix' )
127- source . close ( )
128- return
129- }
130164 const { target : url , wsClientOptions } = upstream
165+ const rewriteRequestHeaders = wsClientOptions ?. rewriteRequestHeaders || defaultWsHeadersRewrite
166+ const headersToRewrite = wsClientOptions ?. headers || { }
131167
132168 const subprotocols = [ ]
133169 if ( source . protocol ) {
134170 subprotocols . push ( source . protocol )
135171 }
136172
137- let optionsWs = { }
138- if ( request . headers . cookie ) {
139- const headers = { cookie : request . headers . cookie }
140- optionsWs = { ...wsClientOptions , headers }
141- } else {
142- optionsWs = wsClientOptions
143- }
173+ const headers = rewriteRequestHeaders ( headersToRewrite , request )
174+ const optionsWs = { ...( wsClientOptions || { } ) , headers }
144175
145176 const target = new WebSocket ( url , subprotocols , optionsWs )
146177 this . logger . debug ( { url : url . href } , 'proxy websocket' )
147178 proxyWebSockets ( source , target )
148179 }
149180}
150181
182+ function defaultWsHeadersRewrite ( headers , request ) {
183+ if ( request . headers . cookie ) {
184+ return { ...headers , cookie : request . headers . cookie }
185+ }
186+ return { ...headers }
187+ }
188+
151189const httpWss = new WeakMap ( ) // http.Server => WebSocketProxy
152190
153191function setupWebSocketProxy ( fastify , options , rewritePrefix ) {
154192 let wsProxy = httpWss . get ( fastify . server )
155193 if ( ! wsProxy ) {
156194 wsProxy = new WebSocketProxy ( fastify , options . wsServerOptions )
157195 httpWss . set ( fastify . server , wsProxy )
158-
159- fastify . addHook ( 'onClose' , ( instance , done ) => {
160- httpWss . delete ( fastify . server )
161- wsProxy . close ( done )
162- } )
163196 }
164197
165198 if ( options . upstream !== '' ) {
166199 wsProxy . addUpstream ( fastify . prefix , rewritePrefix , options . upstream , options . wsClientOptions )
167- } else if ( typeof options . replyOptions . getUpstream === 'function' ) {
200+ // The else block is validate earlier in the code
201+ } else {
168202 wsProxy . findUpstream = function ( request ) {
169203 const source = new URL ( request . url , 'ws://127.0.0.1' )
170204 const upstream = options . replyOptions . getUpstream ( request , '' )
171205 const target = new URL ( source . pathname , upstream )
206+ /* istanbul ignore next */
172207 target . protocol = upstream . indexOf ( 'http:' ) === 0 ? 'ws:' : 'wss'
173208 target . search = source . search
174209 return { target, wsClientOptions : options . wsClientOptions }
175210 }
176211 }
212+ return wsProxy
177213}
178214
179- function generateRewritePrefix ( prefix = '' , opts ) {
215+ function generateRewritePrefix ( prefix , opts ) {
180216 let rewritePrefix = opts . rewritePrefix || ( opts . upstream ? new URL ( opts . upstream ) . pathname : '/' )
181217
182218 if ( ! prefix . endsWith ( '/' ) && rewritePrefix . endsWith ( '/' ) ) {
@@ -243,7 +279,23 @@ async function fastifyHttpProxy (fastify, opts) {
243279 handler
244280 } )
245281
282+ let wsProxy
283+
284+ if ( opts . websocket ) {
285+ wsProxy = setupWebSocketProxy ( fastify , opts , rewritePrefix )
286+ }
287+
246288 function handler ( request , reply ) {
289+ if ( request . raw [ kWs ] ) {
290+ reply . hijack ( )
291+ try {
292+ wsProxy . handleUpgrade ( request , noop )
293+ } catch ( err ) {
294+ /* istanbul ignore next */
295+ request . log . warn ( { err } , 'websocket proxy error' )
296+ }
297+ return
298+ }
247299 const queryParamIndex = request . raw . url . indexOf ( '?' )
248300 let dest = request . raw . url . slice ( 0 , queryParamIndex !== - 1 ? queryParamIndex : undefined )
249301
@@ -256,10 +308,6 @@ async function fastifyHttpProxy (fastify, opts) {
256308 }
257309 reply . from ( dest || '/' , replyOpts )
258310 }
259-
260- if ( opts . websocket ) {
261- setupWebSocketProxy ( fastify , opts , rewritePrefix )
262- }
263311}
264312
265313module . exports = fp ( fastifyHttpProxy , {
0 commit comments