Skip to content
This repository has been archived by the owner on Oct 10, 2018. It is now read-only.

Commit

Permalink
Network rework - needs refactoring
Browse files Browse the repository at this point in the history
Down to 40ms, less then that creates corrupted packages.
  • Loading branch information
jmfranz committed May 1, 2018
1 parent 1cc4ce7 commit c4e123b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 38 deletions.
81 changes: 43 additions & 38 deletions AnnelidaDispatcher/Model/DispatcherServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace AnnelidaDispatcher.Model
public class DispatcherServer
{
private static readonly ManualResetEvent AllDone = new ManualResetEvent(false);
private const int BufferSize = 1024;

private static Semaphore semaphore;

//List of all the connected clients
private readonly Dictionary<ClientTypes.Types, List<Socket>> connectedClients;
Expand Down Expand Up @@ -51,6 +52,8 @@ public class DispatcherServer
/// </summary>
public DispatcherServer(MongoWrapper sensorDb, MongoWrapper controlDb, string missionName)
{
semaphore = new Semaphore(1,1);

connectedClients = new Dictionary<ClientTypes.Types, List<Socket>>
{
{ClientTypes.Types.Undefined, new List<Socket>()},
Expand Down Expand Up @@ -119,7 +122,7 @@ public void AcceptHandler(IAsyncResult result)

var so = new DispatcherClientObject() {WorkSocket = client};
//Starts receving messages
client.BeginReceive(so.Buffer, 0, so.BufferSize, 0, ReadHandler, so);
client.BeginReceive(so.Buffer, 0, 4, 0, ReadHandler, so);
}

/// <summary>
Expand All @@ -136,13 +139,15 @@ public void ReadHandler(IAsyncResult ar)
// Read data from the client socket.
try
{
state.RecvBytesCount = handler.EndReceive(ar);
semaphore.WaitOne();
state.RecvBytesCount += handler.EndReceive(ar);
semaphore.Release();
}
//TODO: handle disonnection messages (SHUTODOWNMODES)
catch(SocketException e)
{
// ReSharper disable once LocalizableElement
Console.WriteLine($"Socket error {e}");
//Console.WriteLine($"Socket error {e}");
//probably our client disconnected
connectedClients[state.MyType].Remove(state.WorkSocket);
var clientAddr = state.WorkSocket.RemoteEndPoint as IPEndPoint;
Expand All @@ -157,14 +162,9 @@ public void ReadHandler(IAsyncResult ar)
if(state?.RecvBytesCount <=0)
return;

if (state.RecvBytesCount == 4)
{
state
}

//if our client has not identified itself the first
//message he sends is his ID
if (state != null && (state.RecvBytesCount > 0 && !state.IsInitialized))
if (state != null && !state.IsInitialized)
{

//We are expecting and int representing the client type
Expand All @@ -180,48 +180,49 @@ public void ReadHandler(IAsyncResult ar)
ClientConnectedEvent?.Invoke((ClientTypes.Types) type, clientAddr.Address.ToString());
else
throw new ArgumentNullException();

state.RecvBytesCount = 0;

//Continue handling messages
handler.BeginReceive(state.Buffer, 0, state.BufferSize, 0,
handler.BeginReceive(state.Buffer, 0, 4, 0,
ReadHandler, state);
//sets the buffer to 0 because the next message contains the size
state.BufferSize = 0;
}
else if (state != null && (bytesRead > 0 && state.IsInitialized))
else if (state != null && state.IsInitialized)
{
//We are receiving the package but don't know the size yet
//Serialized bson contains the size in the first 4 bytes.
if(state.BufferSize == 0)

if (state.RecvBytesCount == 4)
{
int size = BitConverter.ToInt32(state.Buffer, 0);
//We take 4 out because we already red those bytes
state.BufferSize = size - 4;
//Resize the array because we need the full set of
//bytes in order to deserialize the Bson
Array.Resize(ref state.Buffer, size);
state.RecvBytesCount = 0;
handler.BeginReceive(state.Buffer, 4, state.BufferSize , 0,
ReadHandler, state);
state.TotalPackageSize = BitConverter.ToInt32(state.Buffer, 0);
if (state.TotalPackageSize > state.BufferSize)
{
throw new ArgumentOutOfRangeException("Is our data more than 8192 bytes?!!?!??!");
}
state.ResetBuffer(state.TotalPackageSize);
var lenght = BitConverter.GetBytes(state.TotalPackageSize);
state.Buffer[0] = lenght[0];
state.Buffer[1] = lenght[1];
state.Buffer[2] = lenght[2];
state.Buffer[3] = lenght[3];
//Keep reading...
handler.BeginReceive(state.Buffer, 4, state.TotalPackageSize - 4, 0, ReadHandler,
state);
}
//we already know the package size

//we already know the package size
else
{
state.RecvBytesCount += bytesRead;

if (state.RecvBytesCount < state.BufferSize)
{
var revLeft = state.BufferSize - state.RecvBytesCount;

handler.BeginReceive(state.Buffer, 4 + state.RecvBytesCount - 1,
revLeft, 0,
ReadHandler, state);
}
//Not done yet
if (state.RecvBytesCount < state.TotalPackageSize - 4)
handler.BeginReceive(state.Buffer, state.RecvBytesCount - 1,
state.TotalPackageSize - state.RecvBytesCount, 0, ReadHandler, state);
else
{
HandleMessage(state.Buffer, state);
//Prep to receive another package
state.BufferSize = 0;
//int32 with the size
state.Buffer = new byte[4];
state.ResetBuffer();
state.RecvBytesCount = 0;
handler.BeginReceive(state.Buffer, 0, 4, 0,
ReadHandler, state);
}
Expand Down Expand Up @@ -321,14 +322,18 @@ private static BsonDocument ProcessSerializedBson(byte[] bytes)
{
try
{
semaphore.WaitOne();
var doc = BsonSerializer.Deserialize<BsonDocument>(bytes);
BsonDateTime timestamp = DateTime.UtcNow;
doc["timestamp"] = timestamp;
semaphore.Release();

return doc;
}
catch (Exception e)
{
Console.WriteLine(e);
semaphore.Release();
return null;
}

Expand Down
8 changes: 8 additions & 0 deletions AnnelidaDispatcher/Model/StateObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class DispatcherClientObject
/// </summary>
public ClientTypes.Types MyType;

public int TotalPackageSize;

/// <summary>
/// Class constructor, defines the type as undefined and waits for client identification
/// </summary>
Expand All @@ -50,11 +52,17 @@ public DispatcherClientObject()
IsInitialized = false;
Buffer = new byte[BufferSize];
MyType = ClientTypes.Types.Undefined;
TotalPackageSize = 0;
}

public void ResetBuffer()
{
Buffer = new byte[BufferSize];
}

public void ResetBuffer(int size)
{
Buffer = new byte[size];
}
}
}

0 comments on commit c4e123b

Please sign in to comment.