Skip to content

Commit

Permalink
Merge pull request #5 from Yellow-Dog-Man/froox/feat/dynamic-window-size
Browse files Browse the repository at this point in the history
Dynamic Window Size
  • Loading branch information
Frooxius authored Jul 7, 2024
2 parents 95f16b1 + 8fa5228 commit d1b6feb
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 40 deletions.
1 change: 1 addition & 0 deletions LibSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public static class Program
new PacketProcessorExample(),
new AesEncryptionTest(),
new NtpTest(),
new ThroughputTest(),
};

static void Main(string[] args)
Expand Down
259 changes: 259 additions & 0 deletions LibSample/ThroughputTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using LiteNetLib;
using LiteNetLib.Utils;

namespace LibSample
{
public class ThroughputTest : IExample
{
public const long TOTAL_BYTES = 1024L * 1024L * 16; // 256 MB
public const int SEND_CHUNK_SIZE = 1024 * 1024; // 1 MB
public const int SEND_CHUNK_COUNT = (int)(TOTAL_BYTES / SEND_CHUNK_SIZE);

public class Server : INetEventListener, IDisposable
{
public int ReliableReceived { get; private set; }
public bool HasCompleted { get; private set; }

readonly NetManager _server;

public TimeSpan TransferTime => _lastChunkReceiveTime - _peerConnectTime;

DateTime _peerConnectTime;
DateTime _lastChunkReceiveTime;

public NetStatistics Stats => _server.Statistics;

public Server(int latency, int jitter, int packetLoss)
{
_server = new NetManager(this)
{
AutoRecycle = true,
UpdateTime = 1,
SimulatePacketLoss = true,
SimulationPacketLossChance = packetLoss,
SimulateLatency = true,
SimulationMinLatency = latency,
SimulationMaxLatency = latency + jitter,
EnableStatistics = true,
UnsyncedEvents = true
};
_server.Start(9050);
}

void INetEventListener.OnNetworkError(IPEndPoint endPoint, SocketError socketErrorCode)
{
Console.WriteLine($"Server: error: {socketErrorCode}");
}

void INetEventListener.OnNetworkLatencyUpdate(NetPeer peer, int latency)
{
}

public void OnConnectionRequest(ConnectionRequest request)
{
request.AcceptIfKey("ConnKey");
}

void INetEventListener.OnNetworkReceive(NetPeer peer, NetPacketReader reader, byte channelNumber, DeliveryMethod deliveryMethod)
{
if (++ReliableReceived == SEND_CHUNK_COUNT)
{
_lastChunkReceiveTime = DateTime.UtcNow;
HasCompleted = true;
}
}

void INetEventListener.OnNetworkReceiveUnconnected(IPEndPoint remoteEndPoint, NetPacketReader reader,
UnconnectedMessageType messageType)
{
}

void INetEventListener.OnPeerConnected(NetPeer peer)
{
Console.WriteLine($"Server: client connected: {peer}");

_peerConnectTime = DateTime.UtcNow;
}

void INetEventListener.OnPeerDisconnected(NetPeer peer, DisconnectInfo disconnectInfo)
{
Console.WriteLine($"Server: client disconnected: {disconnectInfo.Reason}");
}

public void Dispose()
{
_server.Stop();
}
}

public class Client : INetEventListener, IDisposable
{
public int ReliableSent;

public bool HasCompleted { get; private set; }
public bool IsRunning => _peer.ConnectionState == ConnectionState.Connected;

readonly NetManager _client;
NetPeer _peer;

public NetStatistics Stats => _client.Statistics;

public Client(int latency, int jitter, int packetLoss)
{
_client = new NetManager(this)
{
UnsyncedEvents = true,
AutoRecycle = true,
SimulatePacketLoss = true,
SimulationPacketLossChance = packetLoss,
SimulateLatency = true,
SimulationMinLatency = latency,
SimulationMaxLatency = latency + jitter,
EnableStatistics = true
};
_client.Start();
}

public void SendReliable(byte[] data)
{
_peer.Send(data, DeliveryMethod.ReliableOrdered);
ReliableSent++;
}

public void Connect()
{
_peer = _client.Connect("localhost", 9050, "ConnKey");
}

void INetEventListener.OnNetworkError(IPEndPoint endPoint, SocketError socketErrorCode)
{
Console.WriteLine($"Client: error: {socketErrorCode}");
}

void INetEventListener.OnNetworkLatencyUpdate(NetPeer peer, int latency)
{
}

public void OnConnectionRequest(ConnectionRequest request)
{
request.RejectForce();
}

void INetEventListener.OnNetworkReceive(NetPeer peer, NetPacketReader reader, byte channelNumber, DeliveryMethod deliveryMethod)
{

}

void INetEventListener.OnNetworkReceiveUnconnected(IPEndPoint remoteEndPoint, NetPacketReader reader,
UnconnectedMessageType messageType)
{
}

void INetEventListener.OnPeerConnected(NetPeer peer)
{
Task.Run(() =>
{
var data = new byte[SEND_CHUNK_SIZE];
var r = new Random();

for (int i = 0; i < SEND_CHUNK_COUNT; i++)
{
r.NextBytes(data);
SendReliable(data);
}

HasCompleted = true;
});
}

void INetEventListener.OnPeerDisconnected(NetPeer peer, DisconnectInfo disconnectInfo)
{
Console.WriteLine($"Client: Disconnected {disconnectInfo.Reason}");
}

public void Dispose()
{
_client.Stop();
}
}

public void Run()
{
Console.WriteLine("Testing Throughput...");

int[] packetLossValues = new int[] { 0, 1, 2, 5, 10 };
int[] latencies = new int[] { 5, 10, 20, 40, 60, 80, 100, 150, 200, 300 };

Dictionary<int, List<TimeSpan>> resultGroups = new Dictionary<int, List<TimeSpan>>();

foreach (var packetLoss in packetLossValues)
{
var results = new List<TimeSpan>();

resultGroups.Add(packetLoss, results);

foreach (var latency in latencies)
{
var jitter = Math.Max(1, (int)Math.Round(latency / 5f));

var serverThread = new Thread(() => StartServer(latency, jitter, packetLoss, results));
serverThread.Start();

var clientThread = new Thread(() => StartClient(latency, jitter, packetLoss));
clientThread.Start();

Console.WriteLine($"Processing, latency: {latency} ms, jitter: {jitter}, packet loss: {packetLoss} %...");

serverThread.Join();
clientThread.Join();
}
}

foreach (var resultGroup in resultGroups)
{
Console.WriteLine($"Results for packet loss: {resultGroup.Key} %");

for (int i = 0; i < latencies.Length; i++)
{
var bytesPerSec = TOTAL_BYTES / resultGroup.Value[i].TotalSeconds;
Console.WriteLine($"\t{latencies[i]} ms -> {resultGroup.Value[i]}\t{bytesPerSec / 1024:F2} kB/s");
}
}
}

static void StartServer(int latency, int jitter, int packetLoss, List<TimeSpan> results)
{
using (Server s = new Server(latency, jitter, packetLoss))
{
while (!s.HasCompleted)
Thread.Sleep(100);

Console.WriteLine("SERVER RECEIVED -> Reliable: " + s.ReliableReceived + " in " + s.TransferTime);
Console.WriteLine("SERVER STATS:\n" + s.Stats);

results.Add(s.TransferTime);
}
}

static void StartClient(int latency, int jitter, int packetLoss)
{
using (Client c = new Client(latency, jitter, packetLoss))
{
c.Connect();

while (!c.HasCompleted || c.IsRunning)
Thread.Sleep(100);

Console.WriteLine("CLIENT SENT -> Reliable: " + c.ReliableSent);
Console.WriteLine("CLIENT STATS:\n" + c.Stats);
}
}
}
}
2 changes: 1 addition & 1 deletion LiteNetLib/BaseChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace LiteNetLib
internal abstract class BaseChannel
{
protected readonly NetPeer Peer;
protected readonly Queue<NetPacket> OutgoingQueue = new Queue<NetPacket>(NetConstants.DefaultWindowSize);
protected readonly Queue<NetPacket> OutgoingQueue = new Queue<NetPacket>(NetConstants.MaximumWindowSize);
private int _isAddedToPeerChannelSendQueue;

public int PacketsInQueue => OutgoingQueue.Count;
Expand Down
4 changes: 3 additions & 1 deletion LiteNetLib/NetConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public enum DeliveryMethod : byte
public static class NetConstants
{
//can be tuned
public const int DefaultWindowSize = 64;
public const int MinimumWindowSize = 8;
public const int MaximumWindowSize = 512;
public const int StartingDynamicWindowSize = 64;
public const int SocketBufferSize = 1024 * 1024; //1mb
public const int SocketTTL = 255;

Expand Down
48 changes: 44 additions & 4 deletions LiteNetLib/NetPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using LiteNetLib.Utils;
Expand Down Expand Up @@ -81,6 +82,8 @@ private set
}
}

internal bool HasUnsentData => _mergeCount > 0;

//Channels
private readonly Queue<NetPacket> _unreliableChannel;
private readonly ConcurrentQueue<BaseChannel> _channelSendQueue;
Expand Down Expand Up @@ -302,6 +305,27 @@ private void OverrideMtu(int mtuValue)
_finishMtu = true;
}

public void SetDynamicWindowSize(int size)
{
foreach (var channel in _channels)
if (channel is ReliableChannel reliableChannel)
reliableChannel.CurrentDynamicWindowSize = size;
}

public int TotalReliablePacketsInFlight
{
get
{
int count = 0;

foreach (var channel in _channels)
if (channel is ReliableChannel reliableChannel)
count += reliableChannel.CurrentPacketsInFlight;

return count;
}
}

/// <summary>
/// Returns packets count in queue for reliable channel
/// </summary>
Expand Down Expand Up @@ -1277,10 +1301,19 @@ private void SendMerged()
_mergeCount = 0;
}

internal void SendUserData(NetPacket packet)
internal int ComputeMergedPacketSize(NetPacket packet) => NetConstants.HeaderSize + packet.Size + 2;

internal bool CanMerge(NetPacket packet)
{
int mergedPacketSize = ComputeMergedPacketSize(packet);

return _mergePos + mergedPacketSize <= _mtu;
}

internal bool SendUserData(NetPacket packet)
{
packet.ConnectionNumber = _connectNum;
int mergedPacketSize = NetConstants.HeaderSize + packet.Size + 2;
int mergedPacketSize = ComputeMergedPacketSize(packet);
const int sizeTreshold = 20;
if (mergedPacketSize + sizeTreshold >= _mtu)
{
Expand All @@ -1293,16 +1326,23 @@ internal void SendUserData(NetPacket packet)
Statistics.AddBytesSent(bytesSent);
}

return;
return true;
}
if (_mergePos + mergedPacketSize > _mtu)

bool packetWasSent = false;
if (!CanMerge(packet))
{
SendMerged();
packetWasSent = true;
}

FastBitConverter.GetBytes(_mergeData.RawData, _mergePos + NetConstants.HeaderSize, (ushort)packet.Size);
Buffer.BlockCopy(packet.RawData, 0, _mergeData.RawData, _mergePos + NetConstants.HeaderSize + 2, packet.Size);
_mergePos += packet.Size + 2;
_mergeCount++;
//DebugWriteForce("Merged: " + _mergePos + "/" + (_mtu - 2) + ", count: " + _mergeCount);

return packetWasSent;
}

internal void Update(int deltaTime)
Expand Down
Loading

0 comments on commit d1b6feb

Please sign in to comment.