Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/unified messaging #140

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2626516
Working on new unified messaging abstraction
ejsmith Mar 7, 2019
45487ca
Move IMessage interface to publisher for easier review
ejsmith Mar 7, 2019
0ca70c5
Add message queue options interface
ejsmith Mar 7, 2019
bef031e
Remove priority for now, can be added later
ejsmith Mar 7, 2019
41cf785
More changes
ejsmith Mar 7, 2019
34c6859
Minor
ejsmith Mar 7, 2019
1a207c5
Some comments
ejsmith Mar 23, 2019
e98bde9
Added some more notes for messaging
ejsmith Apr 8, 2019
0f0898f
Flow message type through methods in MessageBusBase
ejsmith May 7, 2019
a358b92
Adding ITypeNameSerializer to control how message types are converted…
ejsmith May 8, 2019
9d0363e
Some cleanup
ejsmith May 8, 2019
cf56b0f
Progress on new messaging implementation
ejsmith May 10, 2019
4052645
Change from Headers to Properties
ejsmith May 10, 2019
2d48478
Simplify IMessageStore
ejsmith May 10, 2019
cf168d5
Limit in memory message sending to 50 at a time
ejsmith May 11, 2019
3bfcadd
Cleanup unused namespaces
ejsmith May 11, 2019
ad4a4cc
Adding new WorkScheduler and other messaging progress
ejsmith May 12, 2019
96ce1b0
Adding 1st work scheduler test
ejsmith May 12, 2019
fa07c94
Minor
ejsmith May 13, 2019
476d7ac
More work scheduler tests
ejsmith May 13, 2019
ab0d114
Changing tests to make use of SystemClock
ejsmith May 13, 2019
95bbbbb
Fix broken test, freeze time and use fake sleep automatically when us…
ejsmith May 13, 2019
9f77042
Little more progress
ejsmith May 13, 2019
6a5421c
Some work scheduler test changes
ejsmith May 15, 2019
c8889b3
Progress
ejsmith May 31, 2019
d99451c
More systemclock changes
ejsmith Jun 3, 2019
1d66777
More changes to system clock to allow timers
ejsmith Jun 4, 2019
41ffe35
Update deps and fix some tests
ejsmith Jun 12, 2019
8cf4d1e
Minor
ejsmith May 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Foundatio.sln
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
src\Directory.Build.props = src\Directory.Build.props
samples\Directory.Build.props = samples\Directory.Build.props
NuGet.config = NuGet.config
global.json = global.json
Dockerfile = Dockerfile
README.md = README.md
EndProjectSection
EndProject
Expand Down
3 changes: 1 addition & 2 deletions src/Foundatio.AppMetrics/AppMetricsClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using App.Metrics;
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Gauge;
using App.Metrics.Timer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>net6.0;net5.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Foundatio\Foundatio.csproj" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Foundatio\Extensions\TaskExtensions.cs" Link="Extensions\TaskExtensions.cs" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>net6.0;net5.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.AspNetCore.Diagnostics.HealthChecks" Version="2.2" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Foundatio\Foundatio.csproj" />
Expand Down
3 changes: 1 addition & 2 deletions src/Foundatio.Extensions.Hosting/Startup/IStartupAction.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;

namespace Foundatio.Extensions.Hosting.Startup {
Expand Down
3 changes: 1 addition & 2 deletions src/Foundatio.MetricsNET/MetricsNETClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using Metrics;
using Metrics;

namespace Foundatio.Metrics {
public class MetricsNETClient : IMetricsClient {
Expand Down
9 changes: 3 additions & 6 deletions src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -411,15 +411,12 @@ public virtual async Task CanSetMinMaxExpirationAsync() {
using (cache) {
await cache.RemoveAllAsync();

using (TestSystemClock.Install()) {
var now = DateTime.UtcNow;
TestSystemClock.SetFrozenTime(now);

var expires = DateTime.MaxValue - now.AddDays(1);
using (var clock = TestSystemClock.Install()) {
var expires = DateTime.MaxValue - clock.Now.AddDays(1);
Assert.True(await cache.SetAsync("test1", 1, expires));
Assert.False(await cache.SetAsync("test2", 1, DateTime.MinValue));
Assert.True(await cache.SetAsync("test3", 1, DateTime.MaxValue));
Assert.True(await cache.SetAsync("test4", 1, DateTime.MaxValue - now.AddDays(-1)));
Assert.True(await cache.SetAsync("test4", 1, DateTime.MaxValue - clock.Now.AddDays(-1)));

Assert.Equal(1, (await cache.GetAsync<int>("test1")).Value);
Assert.InRange((await cache.GetExpirationAsync("test1")).Value, expires.Subtract(TimeSpan.FromSeconds(10)), expires);
Expand Down
49 changes: 43 additions & 6 deletions src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ await messageBus.PublishAsync(new DerivedSimpleMessageA {

public virtual async Task CanSendMappedMessageAsync() {
var messageBus = GetMessageBus(b => {
b.MessageTypeMappings.Add(nameof(SimpleMessageA), typeof(SimpleMessageA));
b.TypeNameSerializer = new DefaultTypeNameSerializer(_logger, new Dictionary<string, Type> {{ nameof(SimpleMessageA), typeof(SimpleMessageA) }});
return b;
});
if (messageBus == null)
Expand Down Expand Up @@ -136,7 +136,43 @@ await messageBus.PublishAsync(new SimpleMessageA {
}

public virtual async Task CanSendDelayedMessageAsync() {
const int numConcurrentMessages = 1000;
Log.MinimumLevel = LogLevel.Trace;
var messageBus = GetMessageBus();
if (messageBus == null)
return;

try {
var countdown = new AsyncCountdownEvent(1);
await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
var msgDelay = TimeSpan.FromMilliseconds(msg.Count);
_logger.LogTrace("Got message delayed by {Delay:g}.", msgDelay);

Assert.Equal("Hello", msg.Data);
countdown.Signal();
});

var sw = Stopwatch.StartNew();
var delay = TimeSpan.FromMilliseconds(RandomData.GetInt(250, 1500));
await messageBus.PublishAsync(new SimpleMessageA {
Data = "Hello",
Count = (int)delay.TotalMilliseconds
}, delay);
_logger.LogTrace("Published message...");

await countdown.WaitAsync(TimeSpan.FromSeconds(30));
sw.Stop();

_logger.LogTrace("Got message delayed by {Delay:g} in {Duration:g}", delay, sw.Elapsed);
Assert.Equal(0, countdown.CurrentCount);
Assert.InRange(sw.Elapsed.TotalMilliseconds, 50, 2000);
} finally {
await CleanupMessageBusAsync(messageBus);
}
}

public virtual async Task CanSendParallelDelayedMessagesAsync() {
Log.MinimumLevel = LogLevel.Trace;
const int numConcurrentMessages = 100;
var messageBus = GetMessageBus();
if (messageBus == null)
return;
Expand All @@ -158,17 +194,18 @@ await Run.InParallelAsync(numConcurrentMessages, async i => {
await messageBus.PublishAsync(new SimpleMessageA {
Data = "Hello",
Count = i
}, new MessageOptions { DeliveryDelay = TimeSpan.FromMilliseconds(RandomData.GetInt(0, 100)) });
}, TimeSpan.FromMilliseconds(RandomData.GetInt(100, 500)));
if (i % 500 == 0)
_logger.LogTrace("Published 500 messages...");
});
_logger.LogTrace("Done publishing {Count} messages.", numConcurrentMessages);

await countdown.WaitAsync(TimeSpan.FromSeconds(5));
await countdown.WaitAsync(TimeSpan.FromSeconds(30));
sw.Stop();

if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Processed {Processed} in {Duration:g}", numConcurrentMessages - countdown.CurrentCount, sw.Elapsed);
_logger.LogTrace("Processed {Processed} in {Duration:g}", numConcurrentMessages - countdown.CurrentCount, sw.Elapsed);
Assert.Equal(0, countdown.CurrentCount);
Assert.InRange(sw.Elapsed.TotalMilliseconds, 50, 5000);
Assert.InRange(sw.Elapsed.TotalMilliseconds, 100, 2000);
} finally {
await CleanupMessageBusAsync(messageBus);
}
Expand Down
3 changes: 1 addition & 2 deletions src/Foundatio.TestHarness/Serializer/SerializerTestsBase.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using BenchmarkDotNet.Attributes;
using Foundatio.Xunit;
using Foundatio.Serializer;
Expand Down
12 changes: 5 additions & 7 deletions src/Foundatio/DeepCloner/Helpers/DeepClonerSafeTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;

namespace Foundatio.Force.DeepCloner.Helpers
{
/// <summary>
/// Safe types are types, which can be copied without real cloning. e.g. simple structs or strings (it is immutable)
/// </summary>
internal static class DeepClonerSafeTypes
namespace Foundatio.Force.DeepCloner.Helpers {
/// <summary>
/// Safe types are types, which can be copied without real cloning. e.g. simple structs or strings (it is immutable)
/// </summary>
internal static class DeepClonerSafeTypes
{
internal static readonly ConcurrentDictionary<Type, bool> KnownTypes = new();

Expand Down
2 changes: 0 additions & 2 deletions src/Foundatio/DeepCloner/Helpers/ShallowObjectCloner.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#define NETCORE
using System;
using System.Linq.Expressions;
using System.Reflection;
using System.Reflection.Emit;

namespace Foundatio.Force.DeepCloner.Helpers
{
Expand Down
80 changes: 80 additions & 0 deletions src/Foundatio/Messaging/Envelope.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System;
using System.Collections.Generic;

namespace Foundatio.Messaging2 {
public interface IEnvelope {
// trace parent id used for distributed tracing
string TraceParentId { get; }
// message type
string MessageType { get; }
// message body
object GetMessage();
// number of attempts to deliver the message
int Attempts { get; }
// when the message was originally sent
DateTime SentAtUtc { get; }
// when the message should expire
DateTime? ExpiresAtUtc { get; }
// when the message should be delivered when using delayed delivery
DateTime? DeliverAtUtc { get; }
// additional message data to store with the message
IReadOnlyDictionary<string, string> Properties { get; }
}

public class Envelope : IEnvelope {
private Lazy<object> _message;

public Envelope(Func<object> getMessageFunc, string messageType, string coorelationId, DateTime? expiresAtUtc, DateTime? deliverAtUtc, IReadOnlyDictionary<string, string> properties) {
_message = new Lazy<object>(getMessageFunc);
MessageType = messageType;
TraceParentId = coorelationId;
ExpiresAtUtc = expiresAtUtc;
DeliverAtUtc = deliverAtUtc;
Properties = properties;
}

public Message(Func<object> getMessageFunc, MessagePublishOptions options) {
_message = new Lazy<object>(getMessageFunc);
TraceParentId = options.CorrelationId;
MessageType = options.MessageType;
ExpiresAtUtc = options.ExpiresAtUtc;
DeliverAtUtc = options.DeliverAtUtc;
Properties = options.Properties;
}

public string TraceParentId { get; private set; }
public string MessageType { get; private set; }
public int Attempts { get; private set; }
public DateTime SentAtUtc { get; private set; }
public DateTime? ExpiresAtUtc { get; private set; }
public DateTime? DeliverAtUtc { get; private set; }
public IReadOnlyDictionary<string, string> Properties { get; private set; }

public object GetMessage() {
return _message.Value;
}
}

public interface IEnvelope<out T> : IEnvelope where T: class {
T Message { get; }
}

public class Envelope<T> : IEnvelope<T> where T: class {
private readonly IEnvelope _envolope;

public Envelope(IEnvelope message) {
_envolope = message;
}

public T Message => (T)GetMessage();

public string TraceParentId => _envolope.TraceParentId;
public string MessageType => _envolope.MessageType;
public int Attempts => _envolope.Attempts;
public DateTime SentAtUtc => _envolope.SentAtUtc;
public DateTime? ExpiresAtUtc => _envolope.ExpiresAtUtc;
public DateTime? DeliverAtUtc => _envolope.DeliverAtUtc;
public IReadOnlyDictionary<string, string> Properties => _envolope.Properties;
public object GetMessage() => _envolope.GetMessage();
}
}
9 changes: 2 additions & 7 deletions src/Foundatio/Messaging/IMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@
using Foundatio.Utility;

namespace Foundatio.Messaging {
public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposable {}

public class MessageOptions {
public string UniqueId { get; set; }
public string CorrelationId { get; set; }
public TimeSpan? DeliveryDelay { get; set; }
public DataDictionary Properties { get; set; } = new DataDictionary();
public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposable {
string MessageBusId { get; }
}
}
Loading