Skip to content

Commit

Permalink
Optimize memory buffer and complete flow with Channels (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
glucaci authored Jun 4, 2020
1 parent 3fbaee8 commit 9706d29
Show file tree
Hide file tree
Showing 17 changed files with 120 additions and 159 deletions.
2 changes: 1 addition & 1 deletion src/Core/Transmission.Abstractions/AttachmentsOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Thor.Core.Transmission.EventHub
namespace Thor.Core.Transmission.Abstractions
{
/// <summary>
/// Options for attachments processing
Expand Down
16 changes: 1 addition & 15 deletions src/Core/Transmission.Abstractions/BufferOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;

namespace Thor.Core.Transmission.EventHub
namespace Thor.Core.Transmission.Abstractions
{
/// <summary>
/// Options for buffering used for both Events and Attachments
Expand All @@ -12,17 +10,5 @@ public class BufferOptions
/// Default 1000 items.
/// </summary>
public int Size { get; set; } = 1000;

/// <summary>
/// Try to enqueue timeout if buffer is full. This will block the calling thread.
/// If set to -1 milliseconds will block until buffer will have free space.
/// Default 1 second.
/// </summary>
public TimeSpan EnqueueTimeout { get; set; } = TimeSpan.FromSeconds(1);

/// <summary>
/// Size of the dequeue batch.
/// </summary>
public int DequeueBatchSize { get; set; } = 100;
}
}
2 changes: 1 addition & 1 deletion src/Core/Transmission.Abstractions/EventsOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Thor.Core.Transmission.EventHub
namespace Thor.Core.Transmission.Abstractions
{
/// <summary>
/// Options for events processing
Expand Down
4 changes: 2 additions & 2 deletions src/Core/Transmission.Abstractions/FileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ public async IAsyncEnumerable<TData> DequeueAsync(

/// <inheritdoc/>
public async Task EnqueueAsync(
IReadOnlyCollection<TData> batch,
IAsyncEnumerable<TData> batch,
CancellationToken cancellationToken)
{
if (batch == null)
{
throw new ArgumentNullException(nameof(batch));
}

foreach (TData data in batch)
await foreach (TData data in batch.WithCancellation(cancellationToken))
{
await EnqueueAsync(data, cancellationToken);
}
Expand Down
8 changes: 2 additions & 6 deletions src/Core/Transmission.Abstractions/IMemoryBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading;

namespace Thor.Core.Transmission.Abstractions
{
Expand All @@ -8,15 +9,10 @@ namespace Thor.Core.Transmission.Abstractions
public interface IMemoryBuffer<TData>
where TData : class
{
/// <summary>
/// Gets the count of items in buffer.
/// </summary>
int Count { get; }

/// <summary>
/// Dequeue data batch from the buffer.
/// </summary>
IReadOnlyCollection<TData> Dequeue(int count);
IAsyncEnumerable<TData> Dequeue(CancellationToken cancellationToken);

/// <summary>
/// Enqueue data object.
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Transmission.Abstractions/ITransmissionBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ public interface ITransmissionBuffer<TData>
/// <summary>
/// Enqueue a telemetry data object.
/// </summary>
Task Enqueue(TData data, CancellationToken cancellationToken);
Task Enqueue(IAsyncEnumerable<TData> batch, CancellationToken cancellationToken);
}
}
2 changes: 1 addition & 1 deletion src/Core/Transmission.Abstractions/ITransmissionStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public interface ITransmissionStorage<TData>
/// </summary>
/// <param name="batch">A data batch to be stored.</param>
/// <param name="cancellationToken">A cancellation token.</param>
Task EnqueueAsync(IReadOnlyCollection<TData> batch, CancellationToken cancellationToken);
Task EnqueueAsync(IAsyncEnumerable<TData> batch, CancellationToken cancellationToken);
}
}
35 changes: 14 additions & 21 deletions src/Core/Transmission.Abstractions/MemoryBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Thor.Core.Transmission.Abstractions;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;

namespace Thor.Core.Transmission.EventHub
namespace Thor.Core.Transmission.Abstractions
{
/// <summary>
/// A memory transmission buffer
Expand All @@ -12,22 +13,20 @@ public class MemoryBuffer<TData>
: IMemoryBuffer<TData>
where TData : class
{
private readonly BufferOptions _options;
private readonly BlockingCollection<TData> _buffer;
private readonly ChannelWriter<TData> _itemsWrite;
private readonly ChannelReader<TData> _itemsRead;

/// <summary>
/// Initializes a new instance of the <see cref="MemoryBuffer{TData}"/> class.
/// </summary>
/// <param name="options"></param>
public MemoryBuffer(BufferOptions options)
{
_options = options;
_buffer = new BlockingCollection<TData>(_options.Size);
var items = Channel.CreateBounded<TData>(options.Size);
_itemsWrite = items.Writer;
_itemsRead = items.Reader;
}

/// <inheritdoc />
public int Count => _buffer.Count;

/// <inheritdoc />
public void Enqueue(TData data)
{
Expand All @@ -36,23 +35,17 @@ public void Enqueue(TData data)
throw new ArgumentNullException(nameof(data));
}

_buffer.TryAdd(data, _options.EnqueueTimeout);
_itemsWrite.TryWrite(data);
}

/// <inheritdoc />
public IReadOnlyCollection<TData> Dequeue(int count)
public async IAsyncEnumerable<TData> Dequeue(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var batch = new List<TData>(count);

for (var i = 0; i < count; i++)
while (await _itemsRead.WaitToReadAsync(cancellationToken))
{
if (_buffer.TryTake(out TData data))
{
batch.Add(data);
}
yield return await _itemsRead.ReadAsync(cancellationToken);
}

return batch;
}
}
}
20 changes: 20 additions & 0 deletions src/Core/Transmission.Abstractions/TaskHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Thor.Core.Transmission.Abstractions
{
internal static class TaskHelper
{
internal static Task StartLongRunning(
Func<Task> function,
CancellationToken cancellationToken)
{
return Task.Factory.StartNew(
function,
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task EnqueueAsync_BatchNull()
// arrange
string storagePath = "C:\\EnqueueAsync_BatchNull_Test";
BlobStorageTransmissionStorage storage = new BlobStorageTransmissionStorage(storagePath);
AttachmentDescriptor[] batch = null;
IAsyncEnumerable<AttachmentDescriptor> batch = null;

// act
Func<Task> verify = () => storage.EnqueueAsync(batch, default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading;
using Moq;
using Thor.Core.Transmission.Abstractions;
using Thor.Core.Transmission.EventHub;
using Xunit;

namespace Thor.Core.Transmission.BlobStorage.Tests
Expand All @@ -23,7 +22,7 @@ public void Constructor_BufferNull()
ITransmissionSender<AttachmentDescriptor> sender = new Mock<ITransmissionSender<AttachmentDescriptor>>().Object;

// act
Action verify = () => new BlobStorageTransmitter(buffer, storage, sender, new AttachmentsOptions());
Action verify = () => new BlobStorageTransmitter(buffer, storage, sender);

// arrange
Assert.Throws<ArgumentNullException>("storage", verify);
Expand All @@ -38,7 +37,7 @@ public void Constructor_SenderNull()
ITransmissionSender<AttachmentDescriptor> sender = null;

// act
Action verify = () => new BlobStorageTransmitter(buffer, storage, sender, new AttachmentsOptions());
Action verify = () => new BlobStorageTransmitter(buffer, storage, sender);

// arrange
Assert.Throws<ArgumentNullException>("sender", verify);
Expand All @@ -53,7 +52,7 @@ public void Constructor_NoException()
var sender = new Mock<ITransmissionSender<AttachmentDescriptor>>();

// act
Action verify = () => new BlobStorageTransmitter(buffer, storage.Object, sender.Object, new AttachmentsOptions());
Action verify = () => new BlobStorageTransmitter(buffer, storage.Object, sender.Object);

// arrange
Assert.Null(Record.Exception(verify));
Expand All @@ -70,7 +69,7 @@ public void Enqueue_DataNull()
IMemoryBuffer<AttachmentDescriptor> buffer = new Mock<IMemoryBuffer<AttachmentDescriptor>>().Object;
Mock<ITransmissionStorage<AttachmentDescriptor>> storage = CreateEmptyStorage();
ITransmissionSender<AttachmentDescriptor> sender = new Mock<ITransmissionSender<AttachmentDescriptor>>().Object;
ITelemetryAttachmentTransmitter transmitter = new BlobStorageTransmitter(buffer, storage.Object, sender, new AttachmentsOptions());
ITelemetryAttachmentTransmitter transmitter = new BlobStorageTransmitter(buffer, storage.Object, sender);
AttachmentDescriptor data = null;

// act
Expand All @@ -89,7 +88,7 @@ public void Enqueue_NoException()
ITransmissionSender<AttachmentDescriptor> sender = new Mock<ITransmissionSender<AttachmentDescriptor>>().Object;

// act
Action verify = () => new BlobStorageTransmitter(buffer, storage.Object, sender, new AttachmentsOptions());
Action verify = () => new BlobStorageTransmitter(buffer, storage.Object, sender);

// arrange
Assert.Null(Record.Exception(verify));
Expand Down Expand Up @@ -128,7 +127,7 @@ public void Enqueue_TransmissionFlow()
.Callback(() => resetEvent.Set());

ITelemetryAttachmentTransmitter transmitter = new BlobStorageTransmitter(
buffer.Object, storage.Object, sender.Object, new AttachmentsOptions());
buffer.Object, storage.Object, sender.Object);
AttachmentDescriptor data = new Mock<AttachmentDescriptor>().Object;

// act
Expand All @@ -154,7 +153,7 @@ public void Dispose()
Mock<ITransmissionStorage<AttachmentDescriptor>> storage = CreateEmptyStorage();
ITransmissionSender<AttachmentDescriptor> sender = new Mock<ITransmissionSender<AttachmentDescriptor>>().Object;
BlobStorageTransmitter transmitter = new BlobStorageTransmitter(
buffer.Object, storage.Object, sender, new AttachmentsOptions());
buffer.Object, storage.Object, sender);

// act
Action verify = () => transmitter.Dispose();
Expand Down
55 changes: 15 additions & 40 deletions src/Core/Transmission.BlobStorage/BlobStorageTransmitter.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Thor.Core.Abstractions;
using Thor.Core.Transmission.Abstractions;
using Thor.Core.Transmission.EventHub;

namespace Thor.Core.Transmission.BlobStorage
{
Expand All @@ -16,13 +13,11 @@ public sealed class BlobStorageTransmitter
, IDisposable
{
private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource();
private readonly ManualResetEventSlim _resetEvent = new ManualResetEventSlim();
private readonly IMemoryBuffer<AttachmentDescriptor> _buffer;
private readonly ITransmissionStorage<AttachmentDescriptor> _storage;
private readonly ITransmissionSender<AttachmentDescriptor> _sender;
private readonly AttachmentsOptions _options;
private readonly Job _sendJob;
private readonly Job _storeJob;
private readonly Task _storeTask;
private readonly Task _sendTask;
private bool _disposed;

/// <summary>
Expand All @@ -31,22 +26,16 @@ public sealed class BlobStorageTransmitter
public BlobStorageTransmitter(
IMemoryBuffer<AttachmentDescriptor> buffer,
ITransmissionStorage<AttachmentDescriptor> storage,
ITransmissionSender<AttachmentDescriptor> sender,
AttachmentsOptions options)
ITransmissionSender<AttachmentDescriptor> sender)
{
_buffer = buffer ?? throw new ArgumentNullException(nameof(buffer));
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
_sender = sender ?? throw new ArgumentNullException(nameof(sender));
_options = options ?? throw new ArgumentNullException(nameof(options));

_storeJob = Job.Start(
async () => await StoreBatchAsync().ConfigureAwait(false),
() => _buffer.Count == 0,
_disposeToken.Token);

_sendJob = Job.Start(
async () => await SendBatchAsync().ConfigureAwait(false),
_disposeToken.Token);
_storeTask = TaskHelper
.StartLongRunning(StoreAsync, _disposeToken.Token);
_sendTask = TaskHelper
.StartLongRunning(SendAsync, _disposeToken.Token);
}

/// <inheritdoc/>
Expand All @@ -63,20 +52,14 @@ public void Enqueue(AttachmentDescriptor data)
}
}

private async Task StoreBatchAsync()
private async Task StoreAsync()
{
IReadOnlyCollection<AttachmentDescriptor> batch = _buffer.Dequeue(
_options.Buffer.DequeueBatchSize);

if (batch.Count > 0)
{
await _storage
.EnqueueAsync(batch, _disposeToken.Token)
.ConfigureAwait(false);
}
await _storage
.EnqueueAsync(_buffer.Dequeue(_disposeToken.Token), _disposeToken.Token)
.ConfigureAwait(false);
}

private async Task SendBatchAsync()
private async Task SendAsync()
{
await _sender
.SendAsync(_storage.DequeueAsync(_disposeToken.Token), _disposeToken.Token)
Expand All @@ -90,20 +73,12 @@ public void Dispose()
{
_disposeToken.Cancel();

if (!_sendJob.Stopped && !_storeJob.Stopped)
Task.WaitAll(new[]
{
WaitHandle.WaitAll(new[]
{
_sendJob.WaitHandle,
_storeJob.WaitHandle
}, TimeSpan.FromSeconds(5));
}
_sendTask, _storeTask
}, TimeSpan.FromSeconds(5));

_disposeToken?.Dispose();
_resetEvent?.Dispose();
_sendJob?.Dispose();
_storeJob?.Dispose();

_disposed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Thor.Core.Transmission.Abstractions;
using Thor.Core.Transmission.EventHub;

namespace Thor.Core.Transmission.BlobStorage
{
Expand Down
Loading

0 comments on commit 9706d29

Please sign in to comment.