|
32 | 32 | import io.gravitee.plugin.endpoint.http.proxy.client.HttpClientFactory; |
33 | 33 | import io.gravitee.plugin.endpoint.http.proxy.configuration.HttpProxyEndpointConnectorConfiguration; |
34 | 34 | import io.gravitee.plugin.endpoint.http.proxy.configuration.HttpProxyEndpointConnectorSharedConfiguration; |
| 35 | +import io.netty.handler.codec.http.HttpHeaderNames; |
35 | 36 | import io.reactivex.rxjava3.core.Completable; |
36 | 37 | import io.vertx.core.http.RequestOptions; |
37 | 38 | import io.vertx.core.http.UpgradeRejectedException; |
38 | 39 | import io.vertx.core.http.WebSocketConnectOptions; |
39 | 40 | import io.vertx.rxjava3.core.http.ServerWebSocket; |
| 41 | +import java.util.Arrays; |
| 42 | +import java.util.List; |
40 | 43 | import java.util.Set; |
| 44 | +import java.util.stream.Collectors; |
41 | 45 |
|
42 | 46 | /** |
43 | 47 | * @author Guillaume LAMIRAND (guillaume.lamirand at graviteesource.com) |
@@ -67,8 +71,14 @@ public Completable connect(final HttpExecutionContext ctx) { |
67 | 71 | ObservableHttpClientRequest observableHttpClientRequest = new ObservableHttpClientRequest(options); |
68 | 72 | Span httpRequestSpan = ctx.getTracer().startSpanFrom(observableHttpClientRequest); |
69 | 73 |
|
70 | | - ctx.metrics().setEndpoint(options.getURI()); |
| 74 | + ctx.metrics().setEndpoint(buildWebSocketUri(options)); |
71 | 75 | WebSocketConnectOptions webSocketConnectOptions = new WebSocketConnectOptions(options.toJson()); |
| 76 | + |
| 77 | + // Add subprotocols: handle comma-separated values, trim whitespace, filter empty strings |
| 78 | + if (request.headers().contains(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL)) { |
| 79 | + webSocketConnectOptions.setSubProtocols(parseSubProtocols(request)); |
| 80 | + } |
| 81 | + |
72 | 82 | return httpClientFactory |
73 | 83 | .getOrBuildHttpClient(ctx, configuration, sharedConfiguration) |
74 | 84 | .rxWebSocket(webSocketConnectOptions) |
@@ -133,4 +143,20 @@ public Completable connect(final HttpExecutionContext ctx) { |
133 | 143 | protected Set<CharSequence> hopHeaders() { |
134 | 144 | return HOP_HEADERS; |
135 | 145 | } |
| 146 | + |
| 147 | + private String buildWebSocketUri(RequestOptions options) { |
| 148 | + String protocol = options.isSsl() ? "wss" : "ws"; |
| 149 | + return protocol + "://" + defaultHost + ":" + defaultPort + options.getURI(); |
| 150 | + } |
| 151 | + |
| 152 | + private List<String> parseSubProtocols(HttpRequest request) { |
| 153 | + return request |
| 154 | + .headers() |
| 155 | + .getAll(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL) |
| 156 | + .stream() |
| 157 | + .flatMap(header -> Arrays.stream(header.split(","))) |
| 158 | + .map(String::trim) |
| 159 | + .filter(s -> !s.isEmpty()) |
| 160 | + .collect(Collectors.toList()); |
| 161 | + } |
136 | 162 | } |
0 commit comments