From 3f61bdebba2ca9bbd0d4f2f5076890e92c495a1a Mon Sep 17 00:00:00 2001 From: Juliano Franz Date: Mon, 18 Jun 2018 13:56:28 -0300 Subject: [PATCH] Protocol Buffer over sockets Not the best implementation, lost of array copy --- AnnelidaDispatcher.sln | 6 + AnnelidaDispatcher/AnnelidaDispatcher.csproj | 1 + .../Model/Server/AsyncAbstractServer.cs | 87 +-------------- AnnelidaDispatcher/Model/Server/BsonServer.cs | 84 +++++++++++++- .../Model/Server/NoDBDispatchStrategy.cs | 2 +- .../Server/ProtoBufNoDBDispatchStrategy.cs | 22 ++++ .../Model/Server/ProtoBufServer.cs | 103 ++++++++++++++++-- ReceiveProtobuf/App.config | 6 + ReceiveProtobuf/Program.cs | 50 +++++++++ ReceiveProtobuf/Properties/AssemblyInfo.cs | 36 ++++++ ReceiveProtobuf/ReceiveProtobuf.csproj | 71 ++++++++++++ ReceiveProtobuf/packages.config | 4 + 12 files changed, 378 insertions(+), 94 deletions(-) create mode 100644 AnnelidaDispatcher/Model/Server/ProtoBufNoDBDispatchStrategy.cs create mode 100644 ReceiveProtobuf/App.config create mode 100644 ReceiveProtobuf/Program.cs create mode 100644 ReceiveProtobuf/Properties/AssemblyInfo.cs create mode 100644 ReceiveProtobuf/ReceiveProtobuf.csproj create mode 100644 ReceiveProtobuf/packages.config diff --git a/AnnelidaDispatcher.sln b/AnnelidaDispatcher.sln index b9753f4..b1245fb 100644 --- a/AnnelidaDispatcher.sln +++ b/AnnelidaDispatcher.sln @@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AnnelidaDataFormat", "Annel EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StoreInMongo", "StoreInMongo\StoreInMongo.csproj", "{2296925B-6DD4-42B1-8020-CFE71A516E60}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ReceiveProtobuf", "ReceiveProtobuf\ReceiveProtobuf.csproj", "{DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -45,6 +47,10 @@ Global {2296925B-6DD4-42B1-8020-CFE71A516E60}.Debug|Any CPU.Build.0 = Debug|Any CPU {2296925B-6DD4-42B1-8020-CFE71A516E60}.Release|Any CPU.ActiveCfg = Release|Any CPU {2296925B-6DD4-42B1-8020-CFE71A516E60}.Release|Any CPU.Build.0 = Release|Any CPU + {DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/AnnelidaDispatcher/AnnelidaDispatcher.csproj b/AnnelidaDispatcher/AnnelidaDispatcher.csproj index fada5c6..3fbe470 100644 --- a/AnnelidaDispatcher/AnnelidaDispatcher.csproj +++ b/AnnelidaDispatcher/AnnelidaDispatcher.csproj @@ -123,6 +123,7 @@ + True diff --git a/AnnelidaDispatcher/Model/Server/AsyncAbstractServer.cs b/AnnelidaDispatcher/Model/Server/AsyncAbstractServer.cs index 6c0823a..18ad4c9 100644 --- a/AnnelidaDispatcher/Model/Server/AsyncAbstractServer.cs +++ b/AnnelidaDispatcher/Model/Server/AsyncAbstractServer.cs @@ -19,7 +19,7 @@ public AsyncAbstractServer(int tcpPort) { cts = new CancellationTokenSource(); listener = new TcpListener(IPAddress.Any, tcpPort); - messageDispatchStrategy = new NoDBDispatchStrategy(); + } public override void Start() @@ -41,90 +41,9 @@ private async Task AcceptClientAsync(TcpListener listener, CancellationToken ct) } } - private async Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct) - { - Console.WriteLine($"New client ({clientIndex}) connected"); - var myType = ClientTypes.Types.Undefined; - var clientEndPoint = (IPEndPoint)client.Client.RemoteEndPoint; - using (client) - { - var buf = new byte[4096]; - byte[] completedMessage = null; - var stream = client.GetStream(); - int count = 1; - var ammountToReceive = 4; - var totalMessageSize = 0; - var totalReceivedSize = 0; - while (!ct.IsCancellationRequested) - { - var timeOutTask = Task.Delay(TimeSpan.FromMinutes(1)); - var ammountReadTask = stream.ReadAsync(buf, 0, ammountToReceive, ct); - var completedTask = await Task.WhenAny(ammountReadTask).ConfigureAwait(false); - - if (completedTask == timeOutTask) - { - break; - } - - int ammountRead; - - try - { - ammountRead = ammountReadTask.Result; - } - catch (AggregateException ex) - { - Console.WriteLine(ex); - break; - } - - if (ammountRead == 0) break; - - - if (myType == ClientTypes.Types.Undefined) - { - myType = IdentifyClient(buf, client); - OnClientConnected(myType, clientEndPoint.Address.ToString()); - //ClientConnectedEvent?.Invoke(myType, clientEndPoint.Address.ToString()); - } - else - { - totalReceivedSize += ammountRead; - if (ammountToReceive == 4) - { - byte[] size = { buf[0], buf[1], buf[2], buf[3] }; - totalMessageSize = BitConverter.ToInt32(size, 0); - ammountToReceive = totalMessageSize - 4; - completedMessage = new byte[totalMessageSize]; - Array.Copy(size, completedMessage, 4); - - } - else - { - Array.Copy(buf, 0, completedMessage, totalReceivedSize - ammountRead, ammountRead); - - if (totalReceivedSize == totalMessageSize) - { - HandleMessage(completedMessage, totalMessageSize, myType); - ammountToReceive = 4; - totalMessageSize = 0; - totalReceivedSize = 0; - } - else - { - ammountToReceive = totalMessageSize - totalReceivedSize; - } - } - } - } - } - - OnClientDisconnected(myType, clientEndPoint.Address.ToString()); - connectedClients[myType].Remove(client); - Console.WriteLine($"Client ({clientIndex}) disconnected"); - } + protected abstract Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct); - private ClientTypes.Types IdentifyClient(byte[] buffer, TcpClient client) + protected ClientTypes.Types IdentifyClient(byte[] buffer, TcpClient client) { //TODO: Handle incorrect types var myType = (ClientTypes.Types)BitConverter.ToInt32(buffer, 0); diff --git a/AnnelidaDispatcher/Model/Server/BsonServer.cs b/AnnelidaDispatcher/Model/Server/BsonServer.cs index 7311379..9081649 100644 --- a/AnnelidaDispatcher/Model/Server/BsonServer.cs +++ b/AnnelidaDispatcher/Model/Server/BsonServer.cs @@ -16,7 +16,89 @@ public class BsonServer : AsyncAbstractServer { public BsonServer(int tcpPort) : base(tcpPort) { - + messageDispatchStrategy = new NoDBDispatchStrategy(); + } + protected override async Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct) + { + Console.WriteLine($"New client ({clientIndex}) connected"); + var myType = ClientTypes.Types.Undefined; + var clientEndPoint = (IPEndPoint)client.Client.RemoteEndPoint; + using (client) + { + var buf = new byte[4096]; + byte[] completedMessage = null; + var stream = client.GetStream(); + int count = 1; + var ammountToReceive = 4; + var totalMessageSize = 0; + var totalReceivedSize = 0; + while (!ct.IsCancellationRequested) + { + var timeOutTask = Task.Delay(TimeSpan.FromMinutes(1)); + var ammountReadTask = stream.ReadAsync(buf, 0, ammountToReceive, ct); + var completedTask = await Task.WhenAny(ammountReadTask).ConfigureAwait(false); + + if (completedTask == timeOutTask) + { + break; + } + + int ammountRead; + + try + { + ammountRead = ammountReadTask.Result; + } + catch (AggregateException ex) + { + Console.WriteLine(ex); + break; + } + + if (ammountRead == 0) break; + + + if (myType == ClientTypes.Types.Undefined) + { + myType = IdentifyClient(buf, client); + OnClientConnected(myType, clientEndPoint.Address.ToString()); + //ClientConnectedEvent?.Invoke(myType, clientEndPoint.Address.ToString()); + } + else + { + totalReceivedSize += ammountRead; + if (ammountToReceive == 4) + { + byte[] size = { buf[0], buf[1], buf[2], buf[3] }; + totalMessageSize = BitConverter.ToInt32(size, 0); + ammountToReceive = totalMessageSize - 4; + completedMessage = new byte[totalMessageSize]; + Array.Copy(size, completedMessage, 4); + + } + else + { + Array.Copy(buf, 0, completedMessage, totalReceivedSize - ammountRead, ammountRead); + + if (totalReceivedSize == totalMessageSize) + { + HandleMessage(completedMessage, totalMessageSize, myType); + ammountToReceive = 4; + totalMessageSize = 0; + totalReceivedSize = 0; + } + else + { + ammountToReceive = totalMessageSize - totalReceivedSize; + } + } + } + } + } + + OnClientDisconnected(myType, clientEndPoint.Address.ToString()); + connectedClients[myType].Remove(client); + Console.WriteLine($"Client ({clientIndex}) disconnected"); } protected override void HandleMessage(byte[] buffer, int ammountRead,ClientTypes.Types myType) diff --git a/AnnelidaDispatcher/Model/Server/NoDBDispatchStrategy.cs b/AnnelidaDispatcher/Model/Server/NoDBDispatchStrategy.cs index fb342f0..34f0afe 100644 --- a/AnnelidaDispatcher/Model/Server/NoDBDispatchStrategy.cs +++ b/AnnelidaDispatcher/Model/Server/NoDBDispatchStrategy.cs @@ -9,7 +9,7 @@ namespace AnnelidaDispatcher.Model.Server { public class NoDBDispatchStrategy: IMessageDispatcherStrategy { - public void RedespatchMessage(byte[] message, Dictionary> connectedClients, ClientTypes.Types origin) + public virtual void RedespatchMessage(byte[] message, Dictionary> connectedClients, ClientTypes.Types origin) { //TODO: Change behaviour to state, maybe? try diff --git a/AnnelidaDispatcher/Model/Server/ProtoBufNoDBDispatchStrategy.cs b/AnnelidaDispatcher/Model/Server/ProtoBufNoDBDispatchStrategy.cs new file mode 100644 index 0000000..d5d3c5c --- /dev/null +++ b/AnnelidaDispatcher/Model/Server/ProtoBufNoDBDispatchStrategy.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace AnnelidaDispatcher.Model.Server +{ + public class ProtoBufNoDBDispatchStrategy : NoDBDispatchStrategy + { + public override void RedespatchMessage(byte[] message, + Dictionary> connectedClients, ClientTypes.Types origin) + { + var buffer = new byte[message.Length+4]; + Array.Copy(BitConverter.GetBytes(message.Length),buffer,4); + Array.Copy(message,0,buffer,4,message.Length); + base.RedespatchMessage(buffer,connectedClients,origin); + } + + } +} diff --git a/AnnelidaDispatcher/Model/Server/ProtoBufServer.cs b/AnnelidaDispatcher/Model/Server/ProtoBufServer.cs index e1d489b..a8adf64 100644 --- a/AnnelidaDispatcher/Model/Server/ProtoBufServer.cs +++ b/AnnelidaDispatcher/Model/Server/ProtoBufServer.cs @@ -2,7 +2,10 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net; +using System.Net.Sockets; using System.Text; +using System.Threading; using System.Threading.Tasks; using Google.Protobuf; @@ -16,21 +19,105 @@ public class ProtoBufServer : AsyncAbstractServer private MessageParser protobuffParser; public ProtoBufServer(int tcpPort) : base(tcpPort) { - + messageDispatchStrategy = new ProtoBufNoDBDispatchStrategy(); + + } + protected override async Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct) + { + Console.WriteLine($"New client ({clientIndex}) connected"); + var myType = ClientTypes.Types.Undefined; + var clientEndPoint = (IPEndPoint)client.Client.RemoteEndPoint; + using (client) + { + var buf = new byte[4096]; + byte[] completedMessage = null; + var stream = client.GetStream(); + int count = 1; + var ammountToReceive = 4; + var totalMessageSize = 0; + var totalReceivedSize = 0; + while (!ct.IsCancellationRequested) + { + var timeOutTask = Task.Delay(TimeSpan.FromMinutes(1)); + var ammountReadTask = stream.ReadAsync(buf, 0, ammountToReceive, ct); + var completedTask = await Task.WhenAny(ammountReadTask).ConfigureAwait(false); + + if (completedTask == timeOutTask) + { + break; + } + + int ammountRead; + try + { + ammountRead = ammountReadTask.Result; + } + catch (AggregateException ex) + { + Console.WriteLine(ex); + break; + } + if (ammountRead == 0) break; + + + if (myType == ClientTypes.Types.Undefined) + { + myType = IdentifyClient(buf, client); + OnClientConnected(myType, clientEndPoint.Address.ToString()); + } + else + { + totalReceivedSize += ammountRead; + if (ammountToReceive == 4) + { + byte[] size = { buf[0], buf[1], buf[2], buf[3] }; + totalMessageSize = BitConverter.ToInt32(size, 0); + ammountToReceive = totalMessageSize; + completedMessage = new byte[totalMessageSize]; + totalReceivedSize = 0; + } + else + { + Array.Copy(buf, 0, completedMessage, totalReceivedSize - ammountRead, ammountRead); + + if (totalReceivedSize == totalMessageSize) + { + HandleMessage(completedMessage, totalMessageSize, myType); + ammountToReceive = 4; + totalMessageSize = 0; + totalReceivedSize = 0; + } + else + { + ammountToReceive = totalMessageSize - totalReceivedSize; + } + } + } + } + } + + OnClientDisconnected(myType, clientEndPoint.Address.ToString()); + connectedClients[myType].Remove(client); + Console.WriteLine($"Client ({clientIndex}) disconnected"); } - protected override void HandleMessage(byte[] buffer, int ammountRead, + protected override void HandleMessage(byte[] buffer, int totalMessageSize, ClientTypes.Types myType) { - var workingCopy = new byte[ammountRead]; - Array.Copy(buffer,4,workingCopy,0,workingCopy.Length); - - var s = AnnelidaSensors.Parser.ParseFrom(buffer); - s.Timestamp = Timestamp.FromDateTime(DateTime.UtcNow); + try + { + var s = AnnelidaSensors.Parser.ParseFrom(buffer); + s.Timestamp = Timestamp.FromDateTime(DateTime.UtcNow); + messageDispatchStrategy.RedespatchMessage(s.ToByteArray(), connectedClients, myType); + } + catch (Google.Protobuf.InvalidProtocolBufferException e) + { + Console.WriteLine(e); + throw; + } - messageDispatchStrategy.RedespatchMessage(s.ToByteArray(),connectedClients,myType); } } diff --git a/ReceiveProtobuf/App.config b/ReceiveProtobuf/App.config new file mode 100644 index 0000000..88fa402 --- /dev/null +++ b/ReceiveProtobuf/App.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/ReceiveProtobuf/Program.cs b/ReceiveProtobuf/Program.cs new file mode 100644 index 0000000..2c9e68b --- /dev/null +++ b/ReceiveProtobuf/Program.cs @@ -0,0 +1,50 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +using Google.Protobuf; + +using static AnnelidaSensors; + +namespace ReceiveProtobuf +{ + class Program + { + static void Main(string[] args) + { + TcpClient c = new TcpClient(); + c.Connect("127.0.0.1", 9999); + var stream = c.GetStream(); + + //int id = (int)ClientTypes.Types.Controller ; + Console.Write("Whats the id?: "); + int id; + string ca = Console.ReadLine(); + Int32.TryParse(ca, out id); + + stream.Write(BitConverter.GetBytes(id), 0, 4); + stream.Flush(); + + Console.WriteLine("Ready to receive..."); + Console.WriteLine("Press ESC to stop"); + + do + { + while (!Console.KeyAvailable) + { + byte[] buff = new byte[4]; + c.Client.Receive(buff, 4, 0); + int s = BitConverter.ToInt32(buff, 0); + buff = new byte[s]; + c.Client.Receive(buff, s, 0); + var prot = AnnelidaSensors.Parser.ParseFrom(buff); + Console.WriteLine(prot.ToString()); + } + + } while (Console.ReadKey(true).Key != ConsoleKey.Escape); + } + } +} diff --git a/ReceiveProtobuf/Properties/AssemblyInfo.cs b/ReceiveProtobuf/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..8ff8386 --- /dev/null +++ b/ReceiveProtobuf/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("ReceiveProtobuf")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("ReceiveProtobuf")] +[assembly: AssemblyCopyright("Copyright © 2018")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("dae31fbf-af95-4dfa-bc88-e8c3d200ed95")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/ReceiveProtobuf/ReceiveProtobuf.csproj b/ReceiveProtobuf/ReceiveProtobuf.csproj new file mode 100644 index 0000000..dbd0d31 --- /dev/null +++ b/ReceiveProtobuf/ReceiveProtobuf.csproj @@ -0,0 +1,71 @@ + + + + + Debug + AnyCPU + {DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95} + Exe + Properties + ReceiveProtobuf + ReceiveProtobuf + v4.5.2 + 512 + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\packages\Google.Protobuf.3.5.1\lib\net45\Google.Protobuf.dll + True + + + + + + + + + + + + + + + + + + + + + {cd57bd03-19d7-4d6b-9597-e51525bc214c} + AnnelidaDispatcher + + + + + \ No newline at end of file diff --git a/ReceiveProtobuf/packages.config b/ReceiveProtobuf/packages.config new file mode 100644 index 0000000..d48cfa2 --- /dev/null +++ b/ReceiveProtobuf/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file