Skip to content

Commit c6ffb88

Browse files
committed
clients resilient, connect and disconnect events, graceful cancellation
1 parent 9df15b5 commit c6ffb88

25 files changed

+210
-126
lines changed

src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketConnector.cs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public abstract class ClientWebSocketConnector : IWebSocketConnector
1616
private readonly IServiceProvider _serviceProvider;
1717
private readonly IStreamCompressor _compressor;
1818
private readonly ILoggerFactory _loggerFactory;
19+
private readonly ILogger<ClientWebSocketConnector> _logger;
1920

2021
public string ConnectionId
2122
{
@@ -43,18 +44,18 @@ public ClientWebSocketConnector(IServiceProvider serviceProvider,
4344
_serviceProvider = serviceProvider;
4445
_compressor = compressor;
4546
_loggerFactory = loggerFactory;
47+
_logger = _loggerFactory.CreateLogger<ClientWebSocketConnector>();
4648
}
4749

4850
public abstract ClientInvocatorContext InvocatorContext { get; }
4951

50-
private async Task<ClientWebSocketReceiver> TryConnectAsync(CancellationTokenSource cancellationTokenSource = null)
52+
private async Task<ClientWebSocketReceiver> TryConnectAsync(CancellationToken cancellationToken)
5153
{
5254
_webSocket = new ClientWebSocket();
5355
_webSocket.Options.SetRequestHeader(NCSConstants.ConnectorName, InvocatorContext.ConnectorName);
5456
try
5557
{
56-
CancellationToken token = cancellationTokenSource != null ? cancellationTokenSource.Token : CancellationToken.None;
57-
await _webSocket.ConnectAsync(InvocatorContext.Uri, token);
58+
await _webSocket.ConnectAsync(InvocatorContext.Uri, cancellationToken);
5859
}
5960
catch (Exception ex)
6061
{
@@ -77,27 +78,42 @@ private async Task<ClientWebSocketReceiver> TryConnectAsync(CancellationTokenSou
7778
return receiver;
7879
}
7980

80-
public async Task ConnectAsync(CancellationTokenSource cancellationTokenSource = null)
81+
public async Task ConnectAsync(CancellationToken cancellationToken)
8182
{
82-
if (cancellationTokenSource == null)
83-
cancellationTokenSource = new CancellationTokenSource();
84-
8583
ClientWebSocketReceiver receiver = null;
86-
while (!cancellationTokenSource.IsCancellationRequested)
84+
while (!cancellationToken.IsCancellationRequested)
8785
{
88-
receiver = await TryConnectAsync(cancellationTokenSource);
86+
_logger.LogInformation("===TryConnectAsync to: {0}", InvocatorContext.Uri.ToString());
87+
receiver = await TryConnectAsync(cancellationToken);
8988
if (receiver != null && WebSocketState == WebSocketState.Open)
9089
{
9190
break;
9291
}
92+
93+
_logger.LogInformation("===Retry...");
94+
await Task.Delay(1000);
9395
}
9496

95-
await Task.WhenAll(receiver.ReceiveAsync());
97+
_logger.LogInformation("===WebSocketConnected to: {0}", InvocatorContext.Uri.ToString());
98+
99+
if (InvocatorContext.OnConnectedAsync != null)
100+
{
101+
await InvocatorContext.OnConnectedAsync(_webSocket);
102+
}
103+
104+
await Task.WhenAll(receiver.ReceiveAsync(cancellationToken));
96105

97-
// Handshake down try re-connect
98-
if (_webSocket.CloseStatus.HasValue)
106+
// Disconnected
107+
if (_webSocket.CloseStatus.HasValue || _webSocket.State == WebSocketState.Aborted)
99108
{
100-
await ConnectAsync(cancellationTokenSource);
109+
if (InvocatorContext.OnDisconnectedAsync != null)
110+
{
111+
await InvocatorContext.OnDisconnectedAsync(_webSocket);
112+
}
113+
else
114+
{
115+
await ConnectAsync(cancellationToken);
116+
}
101117
}
102118
}
103119

@@ -139,9 +155,7 @@ public async Task SendBinaryAsync(byte[] bytes)
139155

140156
internal void Close(ClientWebSocketReceiverContext context)
141157
{
142-
context.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
143-
nameof(ClientWebSocketReceiverContext),
144-
CancellationToken.None);
158+
context.WebSocket.Abort();
145159
}
146160

147161
internal void Close(string statusDescription)

src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketReceiver.cs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public ClientWebSocketReceiver(IServiceProvider serviceProvider,
2828
_logger = context.LoggerFactory.CreateLogger<ClientWebSocketReceiver>();
2929
}
3030

31-
public async Task ReceiveAsync()
31+
public async Task ReceiveAsync(CancellationToken cancellationToken)
3232
{
3333
var buffer = new byte[NCSConstants.ChunkSize];
34-
var result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
34+
var result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
3535
while (!result.CloseStatus.HasValue)
3636
{
3737
if (result.MessageType == WebSocketMessageType.Text)
@@ -42,7 +42,7 @@ public async Task ReceiveAsync()
4242
while (!result.EndOfMessage)
4343
{
4444
await ms.WriteAsync(buffer, 0, result.Count);
45-
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
45+
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
4646
}
4747

4848
await ms.WriteAsync(buffer, 0, result.Count);
@@ -66,7 +66,17 @@ public async Task ReceiveAsync()
6666
{
6767
_logger.LogWarning(ex, "{0} An error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Text);
6868
}
69-
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
69+
70+
try
71+
{
72+
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
73+
}
74+
catch (WebSocketException ex)
75+
{
76+
_logger.LogInformation("ClientWebSocketReceiver[Proxy] {0} has close status for connection: {1}", ex?.WebSocketErrorCode, _context.ConnectionId);
77+
_closeCallback?.Invoke(_context);
78+
return;
79+
}
7080
}
7181

7282
if (result.MessageType == WebSocketMessageType.Binary)
@@ -77,7 +87,7 @@ public async Task ReceiveAsync()
7787
while (!result.EndOfMessage)
7888
{
7989
await ms.WriteAsync(buffer, 0, result.Count);
80-
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
90+
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
8191
}
8292

8393
await ms.WriteAsync(buffer, 0, result.Count);
@@ -94,13 +104,14 @@ public async Task ReceiveAsync()
94104
}
95105
catch (Exception ex)
96106
{
97-
_logger.LogWarning(ex, "{0} Invocator error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Binary);
107+
_logger.LogWarning(ex, "ClientWebSocketReceiver {0} Invocator error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Binary);
98108
}
99-
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
109+
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
100110
}
101111
}
102112

103-
await _context.WebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
113+
await _context.WebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, cancellationToken);
114+
_logger.LogInformation("ClientWebSocketReceiver[Proxy] {0} has close status for connection: {1}", result.CloseStatus, _context.ConnectionId);
104115
_closeCallback?.Invoke(_context);
105116
}
106117
}

src/NetCoreStack.WebSockets.ProxyClient/DefaultClientInvocatorContextFactory.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,19 @@ public DefaultClientInvocatorContextFactory(IOptions<ProxyOptions<TInvocator>> o
2020

2121
public ClientInvocatorContext CreateInvocatorContext()
2222
{
23-
return new ClientInvocatorContext(_proxyOptions.Invocator, _proxyOptions.ConnectorName, _proxyOptions.WebSocketHostAddress);
23+
var context = new ClientInvocatorContext(_proxyOptions.Invocator, _proxyOptions.ConnectorName, _proxyOptions.WebSocketHostAddress);
24+
25+
if (_proxyOptions.OnConnectedAsync != null)
26+
{
27+
context.OnConnectedAsync = _proxyOptions.OnConnectedAsync;
28+
}
29+
30+
if (_proxyOptions.OnDisconnectedAsync != null)
31+
{
32+
context.OnDisconnectedAsync = _proxyOptions.OnDisconnectedAsync;
33+
}
34+
35+
return context;
2436
}
2537
}
2638
}

src/NetCoreStack.WebSockets.ProxyClient/Extensions/ApplicationBuilderExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ private static void ThrowIfServiceNotRegistered(IServiceProvider applicationServ
1717
throw new InvalidOperationException(string.Format("Required services are not registered - are you missing a call to AddProxyWebSockets?"));
1818
}
1919

20-
public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder app, CancellationTokenSource cancellationTokenSource = null)
20+
public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder app, CancellationToken cancellationToken = default(CancellationToken))
2121
{
2222
ThrowIfServiceNotRegistered(app.ApplicationServices);
2323
var appLifeTime = app.ApplicationServices.GetService<IApplicationLifetime>();
@@ -26,7 +26,7 @@ public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder ap
2626
{
2727
InvocatorsHelper.EnsureHostPair(connector.InvocatorContext);
2828
appLifeTime.ApplicationStopping.Register(OnShutdown, connector);
29-
Task.Factory.StartNew(async () => await connector.ConnectAsync(cancellationTokenSource), TaskCreationOptions.LongRunning);
29+
Task.Factory.StartNew(async () => await connector.ConnectAsync(cancellationToken), TaskCreationOptions.LongRunning);
3030
}
3131

3232
return app;

src/NetCoreStack.WebSockets.ProxyClient/Extensions/ConsoleApplicationBuilderExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ namespace NetCoreStack.WebSockets.ProxyClient
77
{
88
public static class ConsoleApplicationBuilderExtensions
99
{
10-
public static IServiceProvider UseProxyWebSocket(this IServiceProvider serviceProvider, CancellationTokenSource cancellationTokenSource = null)
10+
public static IServiceProvider UseProxyWebSocket(this IServiceProvider serviceProvider, CancellationToken cancellationToken = default(CancellationToken))
1111
{
1212
IList<IWebSocketConnector> connectors = InvocatorFactory.GetConnectors(serviceProvider);
1313
foreach (var connector in connectors)
1414
{
15-
Task.Run(async () => await connector.ConnectAsync(cancellationTokenSource));
15+
Task.Run(async () => await connector.ConnectAsync(cancellationToken));
1616
}
1717

1818
return serviceProvider;

src/NetCoreStack.WebSockets.ProxyClient/IWebSocketConnector.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public interface IWebSocketConnector
88
{
99
string ConnectionId { get; }
1010
WebSocketState WebSocketState { get; }
11-
Task ConnectAsync(CancellationTokenSource cancellationTokenSource);
11+
Task ConnectAsync(CancellationToken cancellationToken);
1212
Task SendAsync(WebSocketMessageContext context);
1313
Task SendBinaryAsync(byte[] bytes);
1414
ClientInvocatorContext InvocatorContext { get; }
Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1-
namespace NetCoreStack.WebSockets.ProxyClient
1+
using System;
2+
using System.Net.WebSockets;
3+
using System.Threading.Tasks;
4+
5+
namespace NetCoreStack.WebSockets.ProxyClient
26
{
37
public class ProxyOptions<TInvocator> : SocketsOptions<TInvocator> where TInvocator : IClientWebSocketCommandInvocator
48
{
59
public string ConnectorName { get; set; }
610
public string WebSocketHostAddress { get; set; }
11+
public Func<WebSocket, Task> OnConnectedAsync { get; set; }
12+
public Func<WebSocket, Task> OnDisconnectedAsync { get; set; }
713

814
public ProxyOptions()
915
{
1016
ConnectorName = "";
1117
}
1218
}
13-
}
19+
}

src/NetCoreStack.WebSockets.ProxyClient/Types/ClientInvocatorContext.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System;
2+
using System.Net.WebSockets;
3+
using System.Threading.Tasks;
24

35
namespace NetCoreStack.WebSockets.ProxyClient
46
{
@@ -12,10 +14,16 @@ public class ClientInvocatorContext : InvocatorContext
1214
public string ConnectorKey { get; }
1315
public Uri Uri { get; }
1416

17+
public Func<WebSocket, Task> OnConnectedAsync { get; set; }
18+
19+
public Func<WebSocket, Task> OnDisconnectedAsync { get; set; }
20+
1521
public ClientInvocatorContext(Type invocator, string connectorName, string hostAddress,
1622
WebSocketSupportedSchemes scheme = WebSocketSupportedSchemes.WS,
1723
string uriPath = "",
18-
string query = "")
24+
string query = "",
25+
Func<WebSocket, Task> onConnectedAsync = null,
26+
Func<WebSocket, Task> onDisconnectedAsync = null)
1927
:base(invocator)
2028
{
2129
ConnectorName = connectorName ?? throw new ArgumentNullException(nameof(connectorName));
@@ -24,6 +32,9 @@ public ClientInvocatorContext(Type invocator, string connectorName, string hostA
2432
UriPath = uriPath;
2533
Query = query;
2634

35+
OnConnectedAsync = onConnectedAsync;
36+
OnDisconnectedAsync = onDisconnectedAsync;
37+
2738
var schemeStr = Scheme == WebSocketSupportedSchemes.WS ? "ws" : "wss";
2839
var uriBuilder = new UriBuilder(new Uri($"{schemeStr}://{HostAddress}"));
2940
if (!string.IsNullOrEmpty(UriPath))

src/NetCoreStack.WebSockets/ConnectionManager.cs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,19 @@ private async Task SendDataAsync(Stream stream,
126126
}
127127
}
128128

129-
public async Task ConnectAsync(WebSocket webSocket, string connectionId, string connectorName = "")
129+
public async Task ConnectAsync(WebSocket webSocket,
130+
string connectionId,
131+
string connectorName = "",
132+
CancellationToken cancellationToken = default(CancellationToken))
130133
{
134+
if (cancellationToken != CancellationToken.None)
135+
{
136+
cancellationToken.Register(() =>
137+
{
138+
CancellationGraceful();
139+
});
140+
}
141+
131142
var receiverContext = new WebSocketReceiverContext
132143
{
133144
Compressor = _compressor,
@@ -153,8 +164,8 @@ public async Task ConnectAsync(WebSocket webSocket, string connectionId, string
153164
await SendAsync(connectionId, context);
154165
}
155166

156-
var receiver = new WebSocketReceiver(_serviceProvider, receiverContext, CloseConnection);
157-
await receiver.ReceiveAsync();
167+
var receiver = new WebSocketReceiver(_serviceProvider, receiverContext, CloseConnection, _loggerFactory);
168+
await receiver.ReceiveAsync(cancellationToken);
158169
}
159170

160171
public async Task BroadcastAsync(WebSocketMessageContext context)
@@ -257,11 +268,20 @@ public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary
257268
}
258269
}
259270

271+
public void CancellationGraceful()
272+
{
273+
foreach (KeyValuePair<string, WebSocketTransport> entry in Connections)
274+
{
275+
var transport = entry.Value;
276+
_logger.LogInformation("Graceful cancellation. Close the websocket transport for: {0}", transport.ConnectorName);
277+
}
278+
}
279+
260280
public void CloseConnection(string connectionId)
261281
{
262282
if (Connections.TryRemove(connectionId, out WebSocketTransport transport))
263283
{
264-
transport.Dispose();
284+
transport.WebSocket.Abort();
265285
}
266286
}
267287

src/NetCoreStack.WebSockets/Extensions/SocketApplicationBuilderExtensions.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
using Microsoft.AspNetCore.Builder;
22
using NetCoreStack.WebSockets.Internal;
33
using System;
4+
using System.Threading;
45

56
namespace NetCoreStack.WebSockets
67
{
78
public static class SocketApplicationBuilderExtensions
89
{
9-
public static IApplicationBuilder UseNativeWebSockets(this IApplicationBuilder app)
10+
public static IApplicationBuilder UseNativeWebSockets(this IApplicationBuilder app, CancellationToken cancellationToken = default(CancellationToken))
1011
{
1112
if (app == null)
1213
{
1314
throw new ArgumentNullException(nameof(app));
1415
}
1516

1617
app.UseWebSockets();
17-
app.UseMiddleware<WebSocketMiddleware>();
18+
19+
app.UseMiddleware<WebSocketMiddleware>(cancellationToken);
1820

1921
return app;
2022
}

0 commit comments

Comments
 (0)