Skip to content

Commit

Permalink
Replaced BlockingCollection with custom written LightWeightBlockingQu…
Browse files Browse the repository at this point in the history
…eue.
  • Loading branch information
CptMoore committed Dec 3, 2024
1 parent b4b92fd commit d7ebcb9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 35 deletions.
68 changes: 68 additions & 0 deletions ModTek/Features/Logging/LightWeightBlockingQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading;

namespace ModTek.Features.Logging;

// this=40ns BlockingCollection=170ns
// does not really matter but we anyway get better suited APIs
internal class LightWeightBlockingQueue<T>
{
private readonly ConcurrentQueue<T> _queue = new();

private const int MaxQueueSize = 100_000; // probably about 30MB if full
private long _queueSize; // faster than calling _queue.Count
private bool _addingCompleted; // some way to shut down the thread

// returns false if nothing can be dequeued anymore (empty + _addingCompleted)
// [MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryDequeueOrWait(out T item)
{
var spinWait = new SpinWait();
while (!_queue.TryDequeue(out item))
{
if (_addingCompleted)
{
// this can still drop logs, very unlikely but possible
Thread.Sleep(1);
if (_queue.IsEmpty)
{
return false;
}
}
spinWait.SpinOnce();
}

Interlocked.Decrement(ref _queueSize);
return true;
}

// returns false if nothing can be enqueued anymore (_addingCompleted)
// [MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryEnqueueOrWait(T item)
{
if (_addingCompleted)
{
return false;
}

while (_queueSize >= MaxQueueSize)
{
Thread.SpinWait(4);

if (_addingCompleted)
{
return false;
}
}

Interlocked.Increment(ref _queueSize);
_queue.Enqueue(item);
return true;
}

internal void CompleteAdding()
{
_addingCompleted = true;
}
}
71 changes: 36 additions & 35 deletions ModTek/Features/Logging/MTLoggerAsyncQueue.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using UnityEngine;
using ThreadPriority = System.Threading.ThreadPriority;
Expand All @@ -9,12 +8,13 @@ namespace ModTek.Features.Logging;
internal class MTLoggerAsyncQueue
{
private readonly Action<MTLoggerMessageDto> _processor;
private readonly BlockingCollection<MTLoggerMessageDto> _queue;
private readonly LightWeightBlockingQueue<MTLoggerMessageDto> _queue;
internal readonly int LogWriterThreadId;

internal MTLoggerAsyncQueue(Action<MTLoggerMessageDto> processor)
{
_processor = processor;
_queue = new BlockingCollection<MTLoggerMessageDto>(100_000);
_queue = new LightWeightBlockingQueue<MTLoggerMessageDto>();
Application.quitting += () => _queue.CompleteAdding();
var thread = new Thread(Loop)
{
Expand All @@ -34,39 +34,38 @@ internal MTLoggerAsyncQueue(Action<MTLoggerMessageDto> processor)
var offloadedTime = stats.TotalTime.Subtract(dispatchStats.TotalTime);
Log.Main.Debug?.Log($"Asynchronous logging offloaded {offloadedTime} from the main thread.");
Log.Main.Trace?.Log($"Dispatched {dispatchStats.Count} log statements in {dispatchStats.TotalTime} with an average of {dispatchStats.AverageNanoseconds}ns.");
Log.Main.Trace?.Log($"An estimated maximum of {s_memoryEstimatedUsageMax / 1_000_000} MB was ever used by {s_memoryObjectCountMax} log statements.");
#if MEMORY_TRACE
Log.Main.Trace?.Log($"An estimated maximum of {s_memoryEstimatedUsageMax / 1_000_000} MB was ever used by {s_memoryObjectCountMax}
#endif
},
CallbackForEveryNumberOfMeasurements = 50_000
};

private void Loop()
{
try
while (true)
{
while (!_queue.IsCompleted)
if (!_queue.TryDequeueOrWait(out var message))
{
var message = _queue.Take();
try
{
_loggingStopwatch.Track(() => _processor(message));
}
catch (Exception e)
{
LoggingFeature.WriteExceptionToFatalLog(e);
}
finally
{
UnTrackMemory(message);
}
return;
}

_loggingStopwatch.Start();
try
{
_processor(message);
}
catch (Exception e)
{
LoggingFeature.WriteExceptionToFatalLog(e);
}
finally
{
_loggingStopwatch.Stop();
#if MEMORY_TRACE
UnTrackMemory(message);
#endif
}
}
catch (InvalidOperationException)
{
// ignore
}
finally
{
_queue.Dispose();
}
}

Expand All @@ -78,21 +77,22 @@ internal bool Add(MTLoggerMessageDto messageDto)
_dispatchStopWatch.Start();
try
{
_queue.Add(messageDto);
TrackMemory(messageDto); // only track if add did not fail
return true;
}
catch
{
// ignore
if (_queue.TryEnqueueOrWait(messageDto))
{
#if MEMORY_TRACE
TrackMemory(messageDto);
#endif
return true;
}
return false;
}
finally
{
_dispatchStopWatch.Stop();
}
return false;
}

#if MEMORY_TRACE
// memory tracking
private static long s_memoryEstimatedUsage;
private static long s_memoryEstimatedUsageMax;
Expand Down Expand Up @@ -130,4 +130,5 @@ private static void UnTrackMemory(MTLoggerMessageDto messageDto)
Interlocked.Add(ref s_memoryEstimatedUsage, -messageDto.EstimatedSizeInMemory);
Interlocked.Decrement(ref s_memoryObjectCount);
}
#endif
}

0 comments on commit d7ebcb9

Please sign in to comment.