Skip to content
This repository has been archived by the owner on Oct 10, 2018. It is now read-only.

Commit

Permalink
Protocol Buffer over sockets
Browse files Browse the repository at this point in the history
Not the best implementation, lost of array copy
  • Loading branch information
jmfranz committed Jun 18, 2018
1 parent 478798a commit 3f61bde
Show file tree
Hide file tree
Showing 12 changed files with 378 additions and 94 deletions.
6 changes: 6 additions & 0 deletions AnnelidaDispatcher.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AnnelidaDataFormat", "Annel
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StoreInMongo", "StoreInMongo\StoreInMongo.csproj", "{2296925B-6DD4-42B1-8020-CFE71A516E60}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ReceiveProtobuf", "ReceiveProtobuf\ReceiveProtobuf.csproj", "{DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -45,6 +47,10 @@ Global
{2296925B-6DD4-42B1-8020-CFE71A516E60}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2296925B-6DD4-42B1-8020-CFE71A516E60}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2296925B-6DD4-42B1-8020-CFE71A516E60}.Release|Any CPU.Build.0 = Release|Any CPU
{DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DAE31FBF-AF95-4DFA-BC88-E8C3D200ED95}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions AnnelidaDispatcher/AnnelidaDispatcher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<Compile Include="Model\Server\BsonServerDBEnabled.cs" />
<Compile Include="Model\Server\MongoDispatchStrategy.cs" />
<Compile Include="Model\Server\NoDBDispatchStrategy.cs" />
<Compile Include="Model\Server\ProtoBufNoDBDispatchStrategy.cs" />
<Compile Include="Model\Server\ProtoBufServer.cs" />
<Compile Include="Strings.Designer.cs">
<AutoGen>True</AutoGen>
Expand Down
87 changes: 3 additions & 84 deletions AnnelidaDispatcher/Model/Server/AsyncAbstractServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public AsyncAbstractServer(int tcpPort)
{
cts = new CancellationTokenSource();
listener = new TcpListener(IPAddress.Any, tcpPort);
messageDispatchStrategy = new NoDBDispatchStrategy();

}

public override void Start()
Expand All @@ -41,90 +41,9 @@ private async Task AcceptClientAsync(TcpListener listener, CancellationToken ct)
}
}

private async Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct)
{
Console.WriteLine($"New client ({clientIndex}) connected");
var myType = ClientTypes.Types.Undefined;
var clientEndPoint = (IPEndPoint)client.Client.RemoteEndPoint;
using (client)
{
var buf = new byte[4096];
byte[] completedMessage = null;
var stream = client.GetStream();
int count = 1;
var ammountToReceive = 4;
var totalMessageSize = 0;
var totalReceivedSize = 0;
while (!ct.IsCancellationRequested)
{
var timeOutTask = Task.Delay(TimeSpan.FromMinutes(1));
var ammountReadTask = stream.ReadAsync(buf, 0, ammountToReceive, ct);
var completedTask = await Task.WhenAny(ammountReadTask).ConfigureAwait(false);

if (completedTask == timeOutTask)
{
break;
}

int ammountRead;

try
{
ammountRead = ammountReadTask.Result;
}
catch (AggregateException ex)
{
Console.WriteLine(ex);
break;
}

if (ammountRead == 0) break;


if (myType == ClientTypes.Types.Undefined)
{
myType = IdentifyClient(buf, client);
OnClientConnected(myType, clientEndPoint.Address.ToString());
//ClientConnectedEvent?.Invoke(myType, clientEndPoint.Address.ToString());
}
else
{
totalReceivedSize += ammountRead;
if (ammountToReceive == 4)
{
byte[] size = { buf[0], buf[1], buf[2], buf[3] };
totalMessageSize = BitConverter.ToInt32(size, 0);
ammountToReceive = totalMessageSize - 4;
completedMessage = new byte[totalMessageSize];
Array.Copy(size, completedMessage, 4);

}
else
{
Array.Copy(buf, 0, completedMessage, totalReceivedSize - ammountRead, ammountRead);

if (totalReceivedSize == totalMessageSize)
{
HandleMessage(completedMessage, totalMessageSize, myType);
ammountToReceive = 4;
totalMessageSize = 0;
totalReceivedSize = 0;
}
else
{
ammountToReceive = totalMessageSize - totalReceivedSize;
}
}
}
}
}

OnClientDisconnected(myType, clientEndPoint.Address.ToString());
connectedClients[myType].Remove(client);
Console.WriteLine($"Client ({clientIndex}) disconnected");
}
protected abstract Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct);

private ClientTypes.Types IdentifyClient(byte[] buffer, TcpClient client)
protected ClientTypes.Types IdentifyClient(byte[] buffer, TcpClient client)
{
//TODO: Handle incorrect types
var myType = (ClientTypes.Types)BitConverter.ToInt32(buffer, 0);
Expand Down
84 changes: 83 additions & 1 deletion AnnelidaDispatcher/Model/Server/BsonServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,89 @@ public class BsonServer : AsyncAbstractServer
{
public BsonServer(int tcpPort) : base(tcpPort)
{

messageDispatchStrategy = new NoDBDispatchStrategy();
}
protected override async Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct)
{
Console.WriteLine($"New client ({clientIndex}) connected");
var myType = ClientTypes.Types.Undefined;
var clientEndPoint = (IPEndPoint)client.Client.RemoteEndPoint;
using (client)
{
var buf = new byte[4096];
byte[] completedMessage = null;
var stream = client.GetStream();
int count = 1;
var ammountToReceive = 4;
var totalMessageSize = 0;
var totalReceivedSize = 0;
while (!ct.IsCancellationRequested)
{
var timeOutTask = Task.Delay(TimeSpan.FromMinutes(1));
var ammountReadTask = stream.ReadAsync(buf, 0, ammountToReceive, ct);
var completedTask = await Task.WhenAny(ammountReadTask).ConfigureAwait(false);

if (completedTask == timeOutTask)
{
break;
}

int ammountRead;

try
{
ammountRead = ammountReadTask.Result;
}
catch (AggregateException ex)
{
Console.WriteLine(ex);
break;
}

if (ammountRead == 0) break;


if (myType == ClientTypes.Types.Undefined)
{
myType = IdentifyClient(buf, client);
OnClientConnected(myType, clientEndPoint.Address.ToString());
//ClientConnectedEvent?.Invoke(myType, clientEndPoint.Address.ToString());
}
else
{
totalReceivedSize += ammountRead;
if (ammountToReceive == 4)
{
byte[] size = { buf[0], buf[1], buf[2], buf[3] };
totalMessageSize = BitConverter.ToInt32(size, 0);
ammountToReceive = totalMessageSize - 4;
completedMessage = new byte[totalMessageSize];
Array.Copy(size, completedMessage, 4);

}
else
{
Array.Copy(buf, 0, completedMessage, totalReceivedSize - ammountRead, ammountRead);

if (totalReceivedSize == totalMessageSize)
{
HandleMessage(completedMessage, totalMessageSize, myType);
ammountToReceive = 4;
totalMessageSize = 0;
totalReceivedSize = 0;
}
else
{
ammountToReceive = totalMessageSize - totalReceivedSize;
}
}
}
}
}

OnClientDisconnected(myType, clientEndPoint.Address.ToString());
connectedClients[myType].Remove(client);
Console.WriteLine($"Client ({clientIndex}) disconnected");
}

protected override void HandleMessage(byte[] buffer, int ammountRead,ClientTypes.Types myType)
Expand Down
2 changes: 1 addition & 1 deletion AnnelidaDispatcher/Model/Server/NoDBDispatchStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace AnnelidaDispatcher.Model.Server
{
public class NoDBDispatchStrategy: IMessageDispatcherStrategy
{
public void RedespatchMessage(byte[] message, Dictionary<ClientTypes.Types, List<TcpClient>> connectedClients, ClientTypes.Types origin)
public virtual void RedespatchMessage(byte[] message, Dictionary<ClientTypes.Types, List<TcpClient>> connectedClients, ClientTypes.Types origin)
{
//TODO: Change behaviour to state, maybe?
try
Expand Down
22 changes: 22 additions & 0 deletions AnnelidaDispatcher/Model/Server/ProtoBufNoDBDispatchStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace AnnelidaDispatcher.Model.Server
{
public class ProtoBufNoDBDispatchStrategy : NoDBDispatchStrategy
{
public override void RedespatchMessage(byte[] message,
Dictionary<ClientTypes.Types, List<TcpClient>> connectedClients, ClientTypes.Types origin)
{
var buffer = new byte[message.Length+4];
Array.Copy(BitConverter.GetBytes(message.Length),buffer,4);
Array.Copy(message,0,buffer,4,message.Length);
base.RedespatchMessage(buffer,connectedClients,origin);
}

}
}
103 changes: 95 additions & 8 deletions AnnelidaDispatcher/Model/Server/ProtoBufServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Google.Protobuf;
Expand All @@ -16,21 +19,105 @@ public class ProtoBufServer : AsyncAbstractServer
private MessageParser<AnnelidaSensors> protobuffParser;
public ProtoBufServer(int tcpPort) : base(tcpPort)
{

messageDispatchStrategy = new ProtoBufNoDBDispatchStrategy();

}
protected override async Task ClientHandler(TcpClient client, int clientIndex, CancellationToken ct)
{
Console.WriteLine($"New client ({clientIndex}) connected");
var myType = ClientTypes.Types.Undefined;
var clientEndPoint = (IPEndPoint)client.Client.RemoteEndPoint;
using (client)
{
var buf = new byte[4096];
byte[] completedMessage = null;
var stream = client.GetStream();
int count = 1;
var ammountToReceive = 4;
var totalMessageSize = 0;
var totalReceivedSize = 0;
while (!ct.IsCancellationRequested)
{
var timeOutTask = Task.Delay(TimeSpan.FromMinutes(1));
var ammountReadTask = stream.ReadAsync(buf, 0, ammountToReceive, ct);
var completedTask = await Task.WhenAny(ammountReadTask).ConfigureAwait(false);

if (completedTask == timeOutTask)
{
break;
}

int ammountRead;

try
{
ammountRead = ammountReadTask.Result;
}
catch (AggregateException ex)
{
Console.WriteLine(ex);
break;
}

if (ammountRead == 0) break;


if (myType == ClientTypes.Types.Undefined)
{
myType = IdentifyClient(buf, client);
OnClientConnected(myType, clientEndPoint.Address.ToString());
}
else
{
totalReceivedSize += ammountRead;
if (ammountToReceive == 4)
{
byte[] size = { buf[0], buf[1], buf[2], buf[3] };
totalMessageSize = BitConverter.ToInt32(size, 0);
ammountToReceive = totalMessageSize;
completedMessage = new byte[totalMessageSize];
totalReceivedSize = 0;
}
else
{
Array.Copy(buf, 0, completedMessage, totalReceivedSize - ammountRead, ammountRead);

if (totalReceivedSize == totalMessageSize)
{
HandleMessage(completedMessage, totalMessageSize, myType);
ammountToReceive = 4;
totalMessageSize = 0;
totalReceivedSize = 0;
}
else
{
ammountToReceive = totalMessageSize - totalReceivedSize;
}
}
}
}
}

OnClientDisconnected(myType, clientEndPoint.Address.ToString());
connectedClients[myType].Remove(client);
Console.WriteLine($"Client ({clientIndex}) disconnected");
}

protected override void HandleMessage(byte[] buffer, int ammountRead,
protected override void HandleMessage(byte[] buffer, int totalMessageSize,
ClientTypes.Types myType)
{
var workingCopy = new byte[ammountRead];
Array.Copy(buffer,4,workingCopy,0,workingCopy.Length);

var s = AnnelidaSensors.Parser.ParseFrom(buffer);
s.Timestamp = Timestamp.FromDateTime(DateTime.UtcNow);
try
{
var s = AnnelidaSensors.Parser.ParseFrom(buffer);
s.Timestamp = Timestamp.FromDateTime(DateTime.UtcNow);
messageDispatchStrategy.RedespatchMessage(s.ToByteArray(), connectedClients, myType);
}
catch (Google.Protobuf.InvalidProtocolBufferException e)
{
Console.WriteLine(e);
throw;
}

messageDispatchStrategy.RedespatchMessage(s.ToByteArray(),connectedClients,myType);
}

}
Expand Down
6 changes: 6 additions & 0 deletions ReceiveProtobuf/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
</startup>
</configuration>
Loading

0 comments on commit 3f61bde

Please sign in to comment.