Skip to content

Commit 0ebd5f1

Browse files
committed
update WebSocket lib, which fixes concurrent SendAsync() calls
1 parent d617885 commit 0ebd5f1

File tree

4 files changed

+235
-153
lines changed

4 files changed

+235
-153
lines changed

Assets/Plugins/Colyseus/Connection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
using System;
1+
using System;
22
using System.IO;
33
using System.Collections.Generic;
44
using System.Threading.Tasks;
55

6-
using UnityWebSockets;
6+
using NativeWebSocket;
77
using GameDevWare.Serialization;
88

99
namespace Colyseus

Assets/Plugins/Colyseus/Room.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
using System;
1+
using System;
22
using System.IO;
33
using System.Threading.Tasks;
44
using GameDevWare.Serialization;
55

66
namespace Colyseus
77
{
88
public delegate void ColyseusOpenEventHandler();
9-
public delegate void ColyseusCloseEventHandler(UnityWebSockets.WebSocketCloseCode code);
9+
public delegate void ColyseusCloseEventHandler(NativeWebSocket.WebSocketCloseCode code);
1010
public delegate void ColyseusErrorEventHandler(string message);
1111

1212
public class RoomAvailable
@@ -120,7 +120,7 @@ public async Task Leave (bool consented = true)
120120
}
121121

122122
} else if (OnLeave != null) {
123-
OnLeave?.Invoke (UnityWebSockets.WebSocketCloseCode.Normal);
123+
OnLeave?.Invoke (NativeWebSocket.WebSocketCloseCode.Normal);
124124
}
125125
}
126126

Assets/Plugins/WebSocket/WebSocket.cs

Lines changed: 131 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.IO;
33
using System.Net.WebSockets;
44
using System.Threading;
@@ -12,7 +12,7 @@
1212

1313
using UnityEngine;
1414

15-
namespace UnityWebSockets
15+
namespace NativeWebSocket
1616
{
1717
public delegate void WebSocketOpenEventHandler();
1818
public delegate void WebSocketMessageEventHandler(byte[] data);
@@ -136,6 +136,9 @@ public class WebSocket : IWebSocket
136136
[DllImport("__Internal")]
137137
public static extern int WebSocketSend(int instanceId, byte[] dataPtr, int dataLength);
138138

139+
[DllImport("__Internal")]
140+
public static extern int WebSocketSendText(int instanceId, string message);
141+
139142
[DllImport("__Internal")]
140143
public static extern int WebSocketGetState(int instanceId);
141144

@@ -148,9 +151,9 @@ public class WebSocket : IWebSocket
148151

149152
public WebSocket(string url)
150153
{
151-
if (!WebSocketFactory.isInitialized) {
152-
WebSocketFactory.Initialize();
153-
}
154+
if (!WebSocketFactory.isInitialized) {
155+
WebSocketFactory.Initialize();
156+
}
154157

155158
int instanceId = WebSocketFactory.WebSocketAllocate(url);
156159
WebSocketFactory.instances.Add(instanceId, this);
@@ -168,29 +171,44 @@ public int GetInstanceId()
168171
return this.instanceId;
169172
}
170173

171-
public async Task Connect()
174+
public Task Connect()
172175
{
173176
int ret = WebSocketConnect(this.instanceId);
174177

175178
if (ret < 0)
176179
throw WebSocketHelpers.GetErrorMessageFromCode(ret, null);
180+
181+
return Task.CompletedTask;
177182
}
178183

179-
public async Task Close(WebSocketCloseCode code = WebSocketCloseCode.Normal, string reason = null)
184+
public Task Close(WebSocketCloseCode code = WebSocketCloseCode.Normal, string reason = null)
180185
{
181186
int ret = WebSocketClose(this.instanceId, (int)code, reason);
182187

183188
if (ret < 0)
184189
throw WebSocketHelpers.GetErrorMessageFromCode(ret, null);
190+
191+
return Task.CompletedTask;
185192
}
186193

187-
public async Task Send(byte[] data)
194+
public Task Send(byte[] data)
188195
{
189196
int ret = WebSocketSend(this.instanceId, data, data.Length);
190197

191198
if (ret < 0)
192199
throw WebSocketHelpers.GetErrorMessageFromCode(ret, null);
193200

201+
return Task.CompletedTask;
202+
}
203+
204+
public Task SendText(string message)
205+
{
206+
int ret = WebSocketSendText(this.instanceId, message);
207+
208+
if (ret < 0)
209+
throw WebSocketHelpers.GetErrorMessageFromCode(ret, null);
210+
211+
return Task.CompletedTask;
194212
}
195213

196214
public WebSocketState State {
@@ -245,7 +263,7 @@ public void DelegateOnCloseEvent(int closeCode)
245263

246264
#else
247265

248-
public class WebSocket : IWebSocket
266+
public class WebSocket : IWebSocket
249267
{
250268
public event WebSocketOpenEventHandler OnOpen;
251269
public event WebSocketMessageEventHandler OnMessage;
@@ -255,7 +273,13 @@ public class WebSocket : IWebSocket
255273
private Uri uri;
256274
private ClientWebSocket m_Socket = new ClientWebSocket();
257275

258-
public WebSocket(string url)
276+
private readonly object Lock = new object();
277+
278+
private bool isSending = false;
279+
private List<ArraySegment<byte>> sendBytesQueue = new List<ArraySegment<byte>>();
280+
private List<ArraySegment<byte>> sendTextQueue = new List<ArraySegment<byte>>();
281+
282+
public WebSocket(string url)
259283
{
260284
uri = new Uri(url);
261285

@@ -307,21 +331,108 @@ public WebSocketState State
307331
}
308332
}
309333

310-
public async Task Send(byte[] bytes)
334+
public Task Send(byte[] bytes)
311335
{
312-
var buffer = new ArraySegment<byte>(bytes);
313-
await m_Socket.SendAsync(buffer, WebSocketMessageType.Binary, true, CancellationToken.None);
336+
// return m_Socket.SendAsync(buffer, WebSocketMessageType.Binary, true, CancellationToken.None);
337+
return SendMessage(sendBytesQueue, WebSocketMessageType.Binary, new ArraySegment<byte>(bytes));
314338
}
315339

316-
public async Task SendText(string message)
340+
public Task SendText(string message)
317341
{
318342
var encoded = Encoding.UTF8.GetBytes(message);
319-
var buffer = new ArraySegment<byte>(encoded, 0, encoded.Length);
320343

321-
await m_Socket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
322-
}
344+
// m_Socket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
345+
return SendMessage(sendTextQueue, WebSocketMessageType.Text, new ArraySegment<byte>(encoded, 0, encoded.Length));
346+
}
347+
348+
private async Task SendMessage(List<ArraySegment<byte>> queue, WebSocketMessageType messageType, ArraySegment<byte> buffer)
349+
{
350+
// Return control to the calling method immediately.
351+
await Task.Yield();
352+
353+
// Make sure we have data.
354+
if (buffer.Count == 0)
355+
{
356+
return;
357+
}
358+
359+
// The state of the connection is contained in the context Items dictionary.
360+
bool sending;
361+
362+
lock (Lock)
363+
{
364+
sending = isSending;
365+
366+
// If not, we are now.
367+
if (!isSending)
368+
{
369+
isSending = true;
370+
}
371+
}
323372

324-
public async Task Receive()
373+
if (!sending)
374+
{
375+
// Lock with a timeout, just in case.
376+
if (!Monitor.TryEnter(m_Socket, 1000))
377+
{
378+
// If we couldn't obtain exclusive access to the socket in one second, something is wrong.
379+
await m_Socket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None);
380+
return;
381+
}
382+
383+
try
384+
{
385+
// Send the message synchronously.
386+
var t = m_Socket.SendAsync(buffer, messageType, true, CancellationToken.None);
387+
t.Wait();
388+
}
389+
finally
390+
{
391+
Monitor.Exit(m_Socket);
392+
}
393+
394+
// Note that we've finished sending.
395+
lock (Lock)
396+
{
397+
isSending = false;
398+
}
399+
400+
// Handle any queued messages.
401+
await HandleQueue(queue, messageType);
402+
}
403+
else
404+
{
405+
// Add the message to the queue.
406+
lock (Lock)
407+
{
408+
queue.Add(buffer);
409+
}
410+
}
411+
}
412+
413+
private async Task HandleQueue(List<ArraySegment<byte>> queue, WebSocketMessageType messageType)
414+
{
415+
var buffer = new ArraySegment<byte>();
416+
lock (Lock)
417+
{
418+
// Check for an item in the queue.
419+
if (queue.Count > 0)
420+
{
421+
// Pull it off the top.
422+
buffer = queue[0];
423+
queue.RemoveAt(0);
424+
}
425+
}
426+
427+
// Send that message.
428+
if (buffer.Count > 0)
429+
{
430+
await SendMessage(queue, messageType, buffer);
431+
}
432+
}
433+
434+
435+
public async Task Receive()
325436
{
326437
ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[8192]);
327438

@@ -377,7 +488,7 @@ public async Task Close()
377488

378489
///
379490
/// Factory
380-
///
491+
///
381492

382493

383494
/// <summary>
@@ -518,4 +629,4 @@ public static WebSocket CreateInstance(string url)
518629
}
519630

520631

521-
}
632+
}

0 commit comments

Comments
 (0)