diff --git a/AnnelidaDispatcher/Model/DispatcherServer.cs b/AnnelidaDispatcher/Model/DispatcherServer.cs index 85c664f..b87f66a 100644 --- a/AnnelidaDispatcher/Model/DispatcherServer.cs +++ b/AnnelidaDispatcher/Model/DispatcherServer.cs @@ -21,7 +21,8 @@ namespace AnnelidaDispatcher.Model public class DispatcherServer { private static readonly ManualResetEvent AllDone = new ManualResetEvent(false); - private const int BufferSize = 1024; + + private static Semaphore semaphore; //List of all the connected clients private readonly Dictionary> connectedClients; @@ -51,6 +52,8 @@ public class DispatcherServer /// public DispatcherServer(MongoWrapper sensorDb, MongoWrapper controlDb, string missionName) { + semaphore = new Semaphore(1,1); + connectedClients = new Dictionary> { {ClientTypes.Types.Undefined, new List()}, @@ -119,7 +122,7 @@ public void AcceptHandler(IAsyncResult result) var so = new DispatcherClientObject() {WorkSocket = client}; //Starts receving messages - client.BeginReceive(so.Buffer, 0, so.BufferSize, 0, ReadHandler, so); + client.BeginReceive(so.Buffer, 0, 4, 0, ReadHandler, so); } /// @@ -136,13 +139,15 @@ public void ReadHandler(IAsyncResult ar) // Read data from the client socket. try { - state.RecvBytesCount = handler.EndReceive(ar); + semaphore.WaitOne(); + state.RecvBytesCount += handler.EndReceive(ar); + semaphore.Release(); } //TODO: handle disonnection messages (SHUTODOWNMODES) catch(SocketException e) { // ReSharper disable once LocalizableElement - Console.WriteLine($"Socket error {e}"); + //Console.WriteLine($"Socket error {e}"); //probably our client disconnected connectedClients[state.MyType].Remove(state.WorkSocket); var clientAddr = state.WorkSocket.RemoteEndPoint as IPEndPoint; @@ -157,14 +162,9 @@ public void ReadHandler(IAsyncResult ar) if(state?.RecvBytesCount <=0) return; - if (state.RecvBytesCount == 4) - { - state - } - //if our client has not identified itself the first //message he sends is his ID - if (state != null && (state.RecvBytesCount > 0 && !state.IsInitialized)) + if (state != null && !state.IsInitialized) { //We are expecting and int representing the client type @@ -180,48 +180,49 @@ public void ReadHandler(IAsyncResult ar) ClientConnectedEvent?.Invoke((ClientTypes.Types) type, clientAddr.Address.ToString()); else throw new ArgumentNullException(); + + state.RecvBytesCount = 0; + //Continue handling messages - handler.BeginReceive(state.Buffer, 0, state.BufferSize, 0, + handler.BeginReceive(state.Buffer, 0, 4, 0, ReadHandler, state); - //sets the buffer to 0 because the next message contains the size - state.BufferSize = 0; } - else if (state != null && (bytesRead > 0 && state.IsInitialized)) + else if (state != null && state.IsInitialized) { //We are receiving the package but don't know the size yet //Serialized bson contains the size in the first 4 bytes. - if(state.BufferSize == 0) + + if (state.RecvBytesCount == 4) { - int size = BitConverter.ToInt32(state.Buffer, 0); - //We take 4 out because we already red those bytes - state.BufferSize = size - 4; - //Resize the array because we need the full set of - //bytes in order to deserialize the Bson - Array.Resize(ref state.Buffer, size); - state.RecvBytesCount = 0; - handler.BeginReceive(state.Buffer, 4, state.BufferSize , 0, - ReadHandler, state); + state.TotalPackageSize = BitConverter.ToInt32(state.Buffer, 0); + if (state.TotalPackageSize > state.BufferSize) + { + throw new ArgumentOutOfRangeException("Is our data more than 8192 bytes?!!?!??!"); + } + state.ResetBuffer(state.TotalPackageSize); + var lenght = BitConverter.GetBytes(state.TotalPackageSize); + state.Buffer[0] = lenght[0]; + state.Buffer[1] = lenght[1]; + state.Buffer[2] = lenght[2]; + state.Buffer[3] = lenght[3]; + //Keep reading... + handler.BeginReceive(state.Buffer, 4, state.TotalPackageSize - 4, 0, ReadHandler, + state); } - //we already know the package size + + //we already know the package size else { - state.RecvBytesCount += bytesRead; - - if (state.RecvBytesCount < state.BufferSize) - { - var revLeft = state.BufferSize - state.RecvBytesCount; - - handler.BeginReceive(state.Buffer, 4 + state.RecvBytesCount - 1, - revLeft, 0, - ReadHandler, state); - } + //Not done yet + if (state.RecvBytesCount < state.TotalPackageSize - 4) + handler.BeginReceive(state.Buffer, state.RecvBytesCount - 1, + state.TotalPackageSize - state.RecvBytesCount, 0, ReadHandler, state); else { HandleMessage(state.Buffer, state); //Prep to receive another package - state.BufferSize = 0; - //int32 with the size - state.Buffer = new byte[4]; + state.ResetBuffer(); + state.RecvBytesCount = 0; handler.BeginReceive(state.Buffer, 0, 4, 0, ReadHandler, state); } @@ -321,14 +322,18 @@ private static BsonDocument ProcessSerializedBson(byte[] bytes) { try { + semaphore.WaitOne(); var doc = BsonSerializer.Deserialize(bytes); BsonDateTime timestamp = DateTime.UtcNow; doc["timestamp"] = timestamp; + semaphore.Release(); + return doc; } catch (Exception e) { Console.WriteLine(e); + semaphore.Release(); return null; } diff --git a/AnnelidaDispatcher/Model/StateObject.cs b/AnnelidaDispatcher/Model/StateObject.cs index c7046d8..900e6e9 100644 --- a/AnnelidaDispatcher/Model/StateObject.cs +++ b/AnnelidaDispatcher/Model/StateObject.cs @@ -39,6 +39,8 @@ public class DispatcherClientObject /// public ClientTypes.Types MyType; + public int TotalPackageSize; + /// /// Class constructor, defines the type as undefined and waits for client identification /// @@ -50,11 +52,17 @@ public DispatcherClientObject() IsInitialized = false; Buffer = new byte[BufferSize]; MyType = ClientTypes.Types.Undefined; + TotalPackageSize = 0; } public void ResetBuffer() { Buffer = new byte[BufferSize]; } + + public void ResetBuffer(int size) + { + Buffer = new byte[size]; + } } }