diff --git a/QuicNet.Tests.ConsoleClient/Program.cs b/QuicNet.Tests.ConsoleClient/Program.cs index cee4afd..192f81d 100644 --- a/QuicNet.Tests.ConsoleClient/Program.cs +++ b/QuicNet.Tests.ConsoleClient/Program.cs @@ -11,14 +11,21 @@ namespace QuicNet.Tests.ConsoleClient { class Program { - static void Main(string[] args) + static async Task Main(string[] args) + { + //Example(); + await ExampleAsync(); + Console.ReadKey(); + } + + private static void Example() { Console.WriteLine("Starting client."); QuicClient client = new QuicClient(); Console.WriteLine("Connecting to server."); QuicConnection connection = client.Connect("127.0.0.1", 11000); // Connect to peer (Server) Console.WriteLine("Connected"); - + QuicStream stream = connection.CreateStream(QuickNet.Utilities.StreamType.ClientBidirectional); // Create a data stream Console.WriteLine("Create stream with id: " + stream.StreamId.IntegerValue.ToString()); @@ -48,8 +55,45 @@ static void Main(string[] args) { Console.WriteLine(e.Message); } + } - Console.ReadKey(); + private static async Task ExampleAsync() + { + Console.WriteLine("Starting client."); + QuicClient client = new QuicClient(); + Console.WriteLine("Connecting to server."); + QuicConnection connection = await client.ConnectAsync("127.0.0.1", 11000); // Connect to peer (Server) + Console.WriteLine("Connected"); + + QuicStream stream = connection.CreateStream(QuickNet.Utilities.StreamType.ClientBidirectional); // Create a data stream + Console.WriteLine("Create stream with id: " + stream.StreamId.IntegerValue.ToString()); + + Console.WriteLine("Send 'Hello From Client!'"); + await stream.SendAsync(Encoding.UTF8.GetBytes("Hello from Client!")); // Send Data + + stream = connection.CreateStream(QuickNet.Utilities.StreamType.ClientBidirectional); // Create a data stream + await stream.SendAsync(Encoding.UTF8.GetBytes("Hello from Client2!")); + + Console.WriteLine("Waiting for message from the server"); + try + { + byte[] data = await stream.ReceiveAsync(); // Receive from server + Console.WriteLine("Received: " + Encoding.UTF8.GetString(data)); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } + + try + { + byte[] data = await stream.ReceiveAsync(); // Receive from server + Console.WriteLine("Received: " + Encoding.UTF8.GetString(data)); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } } } } diff --git a/QuicNet/Connections/QuicConnection.cs b/QuicNet/Connections/QuicConnection.cs index 67333bd..ae6a3f2 100644 --- a/QuicNet/Connections/QuicConnection.cs +++ b/QuicNet/Connections/QuicConnection.cs @@ -12,6 +12,8 @@ using System.Collections.Generic; using System.Net; using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; namespace QuicNet.Connections { @@ -74,6 +76,41 @@ public void ProcessFrames(List frames) } } + public async Task ProcessFramesAsync(List frames) + { + foreach (Frame frame in frames) + { + switch (frame.Type) + { + case 0x01: + case 0x04: + OnRstStreamFrame(frame); + break; + case byte type when type >= 0x08 && frame.Type <= 0x0f: + await OnStreamFrameAsync(frame); + break; + case 0x10: + OnMaxDataFrame(frame); + break; + case 0x11: + OnMaxStreamDataFrame(frame); + break; + case 0x12: + case 0x13: + OnMaxStreamFrame(frame); + break; + case 0x14: + OnDataBlockedFrame(frame); + break; + case 0x1c: + OnConnectionCloseFrame(frame); + break; + default: + break; + } + } + } + public void IncrementRate(int length) { _currentTransferRate += (UInt32)length; @@ -128,6 +165,26 @@ private void OnStreamFrame(Frame frame) } } + private async Task OnStreamFrameAsync(Frame frame) + { + StreamFrame sf = (StreamFrame)frame; + if (_streams.ContainsKey(sf.ConvertedStreamId.Id) == false) + { + QuicStream stream = new QuicStream(this, sf.ConvertedStreamId); + await stream.ProcessDataAsync(sf); + + if ((UInt64)_streams.Count < MaxStreams) + _streams.Add(sf.ConvertedStreamId.Id, stream); + else + SendMaximumStreamReachedError(); + } + else + { + QuicStream stream = _streams[sf.ConvertedStreamId.Id]; + await stream.ProcessDataAsync(sf); + } + } + private void OnMaxDataFrame(Frame frame) { MaxDataFrame sf = (MaxDataFrame)frame; @@ -177,7 +234,7 @@ private void OnStreamDataBlockedFrame(Frame frame) } #region Internal - + internal QuicConnection(ConnectionData connection) { _currentTransferRate = 0; @@ -221,11 +278,43 @@ internal void ReceivePacket() } + /// + /// Client async only! + /// + /// + internal async Task ReceivePacketAsync() + { + Packet packet = await _pwt.ReadPacketAsync(); + + if (packet is ShortHeaderPacket) + { + ShortHeaderPacket shp = (ShortHeaderPacket)packet; + await ProcessFramesAsync(shp.GetFrames()); + } + + // If the connection has been closed + if (_state == ConnectionState.Draining) + { + if (string.IsNullOrWhiteSpace(_lastError)) + _lastError = "Protocol error"; + + TerminateConnection(); + + throw new QuicConnectivityException(_lastError); + } + + } + internal bool SendData(Packet packet) { return _pwt.SendPacket(packet); } + internal async Task SendDataAsync(Packet packet) + { + return await _pwt.SendPacketAsync(packet); + } + internal void DataReceived(QuicStream context) { OnDataReceived?.Invoke(context); diff --git a/QuicNet/InternalInfrastructure/PacketWireTransfer.cs b/QuicNet/InternalInfrastructure/PacketWireTransfer.cs index 3b68e77..988898d 100644 --- a/QuicNet/InternalInfrastructure/PacketWireTransfer.cs +++ b/QuicNet/InternalInfrastructure/PacketWireTransfer.cs @@ -37,6 +37,20 @@ public Packet ReadPacket() return packet; } + public async Task ReadPacketAsync() + { + // Await response for sucessfull connection creation by the server + var udpReceiveResult = await _client.ReceiveAsync(); + _peerEndpoint = udpReceiveResult.RemoteEndPoint; + byte[] peerData = udpReceiveResult.Buffer; + if (peerData == null) + throw new QuicConnectivityException("Server did not respond properly."); + + Packet packet = _unpacker.Unpack(peerData); + + return packet; + } + public bool SendPacket(Packet packet) { byte[] data = packet.Encode(); @@ -46,6 +60,15 @@ public bool SendPacket(Packet packet) return sent > 0; } + public async Task SendPacketAsync(Packet packet) + { + byte[] data = packet.Encode(); + + var sent = _client.SendAsync(data, data.Length, _peerEndpoint); + + return await sent > 0; + } + public IPEndPoint LastTransferEndpoint() { return _peerEndpoint; diff --git a/QuicNet/QuicClient.cs b/QuicNet/QuicClient.cs index e5e920b..3716ec2 100644 --- a/QuicNet/QuicClient.cs +++ b/QuicNet/QuicClient.cs @@ -71,6 +71,35 @@ public QuicConnection Connect(string ip, int port) return _connection; } + /// + /// Connect to a remote server. + /// + /// Ip Address + /// Port + /// + public async Task ConnectAsync(string ip, int port) + { + // Establish socket connection + _peerIp = new IPEndPoint(IPAddress.Parse(ip), port); + + // Initialize packet reader + _pwt = new PacketWireTransfer(_client, _peerIp); + + // Start initial protocol process + InitialPacket connectionPacket = _packetCreator.CreateInitialPacket(0, 0); + + // Send the initial packet + await _pwt.SendPacketAsync(connectionPacket); + + // Await response for sucessfull connection creation by the server + InitialPacket packet = (InitialPacket)await _pwt.ReadPacketAsync(); + + HandleInitialFrames(packet); + EstablishConnection(packet.SourceConnectionId, packet.SourceConnectionId); + + return _connection; + } + /// /// Handles initial packet's frames. (In most cases protocol frames) /// diff --git a/QuicNet/QuicListener.cs b/QuicNet/QuicListener.cs index c7d233c..ab2dc57 100644 --- a/QuicNet/QuicListener.cs +++ b/QuicNet/QuicListener.cs @@ -13,6 +13,7 @@ using System.Linq; using System.Net; using System.Net.Sockets; +using System.Runtime.InteropServices; using System.Text; using System.Threading.Tasks; @@ -90,6 +91,32 @@ public QuicConnection AcceptQuicClient() } } + /// + /// Blocks and waits for incomming connection. Does NOT block additional incomming packets. + /// + /// Returns an instance of QuicConnection. + public async Task AcceptQuicClientAsync() + { + if (!_started) + throw new QuicListenerNotStartedException("Please call the Start() method before receving data."); + + /* + * Wait until there is initial packet incomming. + * Otherwise we still need to orchestrate any other protocol or data pakcets. + * */ + while (true) + { + Packet packet = await _pwt.ReadPacketAsync(); + if (packet is InitialPacket) + { + QuicConnection connection = await ProcessInitialPacketAsync(packet, _pwt.LastTransferEndpoint()); + return connection; + } + + OrchestratePacket(packet); + } + } + /// /// Starts receiving data from clients. /// @@ -108,6 +135,24 @@ private void Receive() } } + /// + /// Starts receiving data from clients. + /// + private async Task ReceiveAsync() + { + while (true) + { + Packet packet = await _pwt.ReadPacketAsync(); + + // Discard unknown packets + if (packet == null) + continue; + + // TODO: Validate packet before dispatching + OrchestratePacket(packet); + } + } + /// /// Orchestrates packets to connections, depending on the packet type. /// @@ -176,6 +221,62 @@ private QuicConnection ProcessInitialPacket(Packet packet, IPEndPoint endPoint) return null; } + /// + /// Processes incomming initial packet and creates or halts a connection. + /// + /// Initial Packet + /// Peer's endpoint + /// + private async Task ProcessInitialPacketAsync(Packet packet, IPEndPoint endPoint) + { + QuicConnection result = null; + UInt32 availableConnectionId; + byte[] data; + // Unsupported version. Version negotiation packet is sent only on initial connection. All other packets are dropped. (5.2.2 / 16th draft) + if (packet.Version != QuicVersion.CurrentVersion || !QuicVersion.SupportedVersions.Contains(packet.Version)) + { + VersionNegotiationPacket vnp = _packetCreator.CreateVersionNegotiationPacket(); + data = vnp.Encode(); + + await _client.SendAsync(data, data.Length, endPoint); + return null; + } + + InitialPacket cast = packet as InitialPacket; + InitialPacket ip = _packetCreator.CreateInitialPacket(0, cast.SourceConnectionId); + + // Protocol violation if the initial packet is smaller than the PMTU. (pt. 14 / 16th draft) + if (cast.Encode().Length < QuicSettings.PMTU) + { + ip.AttachFrame(new ConnectionCloseFrame(ErrorCode.PROTOCOL_VIOLATION, "PMTU have not been reached.")); + } + else if (ConnectionPool.AddConnection(new ConnectionData(_pwt, cast.SourceConnectionId, 0), out availableConnectionId) == true) + { + // Tell the peer the available connection id + ip.SourceConnectionId = (byte)availableConnectionId; + + // We're including the maximum possible stream id during the connection handshake. (4.5 / 16th draft) + ip.AttachFrame(new MaxStreamsFrame(QuicSettings.MaximumStreamId, StreamType.ServerBidirectional)); + + // Set the return result + result = ConnectionPool.Find(availableConnectionId); + } + else + { + // Not accepting connections. Send initial packet with CONNECTION_CLOSE frame. + // TODO: Buffering. The server might buffer incomming 0-RTT packets in anticipation of late delivery InitialPacket. + // Maximum buffer size should be set in QuicSettings. + ip.AttachFrame(new ConnectionCloseFrame(ErrorCode.SERVER_BUSY, "The server is too busy to process your request.")); + } + + data = ip.Encode(); + int dataSent = await _client.SendAsync(data, data.Length, endPoint); + if (dataSent > 0) + return result; + + return null; + } + /// /// Processes short header packet, by distributing the frames towards connections. /// diff --git a/QuicNet/Streams/QuicStream.cs b/QuicNet/Streams/QuicStream.cs index 7dbfe09..868eb11 100644 --- a/QuicNet/Streams/QuicStream.cs +++ b/QuicNet/Streams/QuicStream.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading.Tasks; namespace QuicNet.Streams { @@ -59,6 +60,20 @@ public bool Send(byte[] data) return _connection.SendData(packet); } + public async Task SendAsync(byte[] data) + { + if (Type == StreamType.ServerUnidirectional) + throw new StreamException("Cannot send data on unidirectional stream."); + + _connection.IncrementRate(data.Length); + + ShortHeaderPacket packet = _connection.PacketCreator.CreateDataPacket(this.StreamId.IntegerValue, data); + if (_connection.MaximumReached()) + packet.AttachFrame(new StreamDataBlockedFrame(StreamId.IntegerValue, (UInt64)data.Length)); + + return await _connection.SendDataAsync(packet); + } + /// /// Client only! /// @@ -76,6 +91,23 @@ public byte[] Receive() return Data; } + /// + /// Client async only! + /// + /// + public async Task ReceiveAsync() + { + if (Type == StreamType.ClientUnidirectional) + throw new StreamException("Cannot receive data on unidirectional stream."); + + while (!IsStreamFull() || State == StreamState.Recv) + { + await _connection.ReceivePacketAsync(); + } + + return Data; + } + public void ResetStream(ResetStreamFrame frame) { // Reset the state @@ -142,6 +174,48 @@ public void ProcessData(StreamFrame frame) } } + public async Task ProcessDataAsync(StreamFrame frame) + { + // Do not accept data if the stream is reset. + if (State == StreamState.ResetRecvd) + return; + + byte[] data = frame.StreamData; + if (frame.Offset != null) + { + _data.Add(frame.Offset.Value, frame.StreamData); + } + else + { + // TODO: Careful with duplicate 0 offset packets on the same stream. Probably PROTOCOL_VIOLATION? + _data.Add(0, frame.StreamData); + } + + // Either this frame marks the end of the stream, + // or fin frame came before the data frames + if (frame.EndOfStream) + State = StreamState.SizeKnown; + + _currentTransferRate += (UInt64)data.Length; + + // Terminate connection if maximum stream data is reached + if (_currentTransferRate >= _maximumStreamData) + { + ShortHeaderPacket errorPacket = _connection.PacketCreator.CreateConnectionClosePacket(Infrastructure.ErrorCode.FLOW_CONTROL_ERROR, "Maximum stream data transfer reached."); + await _connection.SendDataAsync(errorPacket); + _connection.TerminateConnection(); + + return; + } + + if (State == StreamState.SizeKnown && IsStreamFull()) + { + _connection.DataReceived(this); + + State = StreamState.DataRecvd; + } + } + public void ProcessStreamDataBlocked(StreamDataBlockedFrame frame) { State = StreamState.DataRecvd; diff --git a/QuickNet.Tests.ConsoleServer/Program.cs b/QuickNet.Tests.ConsoleServer/Program.cs index 2518f49..a94b66e 100644 --- a/QuickNet.Tests.ConsoleServer/Program.cs +++ b/QuickNet.Tests.ConsoleServer/Program.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace QuickNet.Tests.ConsoleServer @@ -39,9 +40,33 @@ static void Example() } } - static void Main(string[] args) + static async Task ExampleAsync() { - Example(); + QuicListener listener = new QuicListener(11000); + listener.Start(); + + while (true) + { + // Blocks while waiting for a connection + QuicConnection client = await listener.AcceptQuicClientAsync(); + + // Assign an action when a data is received from that client. + client.OnDataReceived += async (c) => { + + byte[] data = c.Data; + + Console.WriteLine("Data received: " + Encoding.UTF8.GetString(data)); + + await c.SendAsync(Encoding.UTF8.GetBytes("Echo!")); + await c.SendAsync(Encoding.UTF8.GetBytes("Echo2!")); + }; + } + } + + static async Task Main(string[] args) + { + //Example(); + await ExampleAsync(); return; byte[] bytes = new VariableInteger(12345);