diff --git a/Foundatio.sln b/Foundatio.sln index f4bb98303..505f12cc4 100644 --- a/Foundatio.sln +++ b/Foundatio.sln @@ -14,6 +14,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution src\Directory.Build.props = src\Directory.Build.props samples\Directory.Build.props = samples\Directory.Build.props NuGet.config = NuGet.config + global.json = global.json + Dockerfile = Dockerfile README.md = README.md EndProjectSection EndProject diff --git a/src/Foundatio.AppMetrics/AppMetricsClient.cs b/src/Foundatio.AppMetrics/AppMetricsClient.cs index 4af0cf4be..114ee7469 100644 --- a/src/Foundatio.AppMetrics/AppMetricsClient.cs +++ b/src/Foundatio.AppMetrics/AppMetricsClient.cs @@ -1,5 +1,4 @@ -using System; -using App.Metrics; +using App.Metrics; using App.Metrics.Counter; using App.Metrics.Gauge; using App.Metrics.Timer; diff --git a/src/Foundatio.Extensions.Hosting/Foundatio - Backup.Extensions.Hosting.csproj b/src/Foundatio.Extensions.Hosting/Foundatio - Backup.Extensions.Hosting.csproj new file mode 100644 index 000000000..ecad27af2 --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Foundatio - Backup.Extensions.Hosting.csproj @@ -0,0 +1,15 @@ + + + true + net6.0;net5.0 + + + + + + + + + + + diff --git a/src/Foundatio.Extensions.Hosting/Foundatio.Extensions.Hosting.csproj b/src/Foundatio.Extensions.Hosting/Foundatio.Extensions.Hosting.csproj index ecad27af2..94f3dfb4b 100644 --- a/src/Foundatio.Extensions.Hosting/Foundatio.Extensions.Hosting.csproj +++ b/src/Foundatio.Extensions.Hosting/Foundatio.Extensions.Hosting.csproj @@ -1,10 +1,10 @@  true - net6.0;net5.0 - + + diff --git a/src/Foundatio.Extensions.Hosting/Startup/IStartupAction.cs b/src/Foundatio.Extensions.Hosting/Startup/IStartupAction.cs index 7d38f1570..e811149f7 100644 --- a/src/Foundatio.Extensions.Hosting/Startup/IStartupAction.cs +++ b/src/Foundatio.Extensions.Hosting/Startup/IStartupAction.cs @@ -1,5 +1,4 @@ -using System; -using System.Threading; +using System.Threading; using System.Threading.Tasks; namespace Foundatio.Extensions.Hosting.Startup { diff --git a/src/Foundatio.MetricsNET/MetricsNETClient.cs b/src/Foundatio.MetricsNET/MetricsNETClient.cs index f60048086..6ff08bfca 100644 --- a/src/Foundatio.MetricsNET/MetricsNETClient.cs +++ b/src/Foundatio.MetricsNET/MetricsNETClient.cs @@ -1,5 +1,4 @@ -using System; -using Metrics; +using Metrics; namespace Foundatio.Metrics { public class MetricsNETClient : IMetricsClient { diff --git a/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs b/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs index ec9af67eb..4a9f32212 100644 --- a/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs +++ b/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs @@ -411,15 +411,12 @@ public virtual async Task CanSetMinMaxExpirationAsync() { using (cache) { await cache.RemoveAllAsync(); - using (TestSystemClock.Install()) { - var now = DateTime.UtcNow; - TestSystemClock.SetFrozenTime(now); - - var expires = DateTime.MaxValue - now.AddDays(1); + using (var clock = TestSystemClock.Install()) { + var expires = DateTime.MaxValue - clock.Now.AddDays(1); Assert.True(await cache.SetAsync("test1", 1, expires)); Assert.False(await cache.SetAsync("test2", 1, DateTime.MinValue)); Assert.True(await cache.SetAsync("test3", 1, DateTime.MaxValue)); - Assert.True(await cache.SetAsync("test4", 1, DateTime.MaxValue - now.AddDays(-1))); + Assert.True(await cache.SetAsync("test4", 1, DateTime.MaxValue - clock.Now.AddDays(-1))); Assert.Equal(1, (await cache.GetAsync("test1")).Value); Assert.InRange((await cache.GetExpirationAsync("test1")).Value, expires.Subtract(TimeSpan.FromSeconds(10)), expires); diff --git a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs index 8ff028c95..db3f469bf 100644 --- a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs +++ b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs @@ -108,7 +108,7 @@ await messageBus.PublishAsync(new DerivedSimpleMessageA { public virtual async Task CanSendMappedMessageAsync() { var messageBus = GetMessageBus(b => { - b.MessageTypeMappings.Add(nameof(SimpleMessageA), typeof(SimpleMessageA)); + b.TypeNameSerializer = new DefaultTypeNameSerializer(_logger, new Dictionary {{ nameof(SimpleMessageA), typeof(SimpleMessageA) }}); return b; }); if (messageBus == null) @@ -136,7 +136,43 @@ await messageBus.PublishAsync(new SimpleMessageA { } public virtual async Task CanSendDelayedMessageAsync() { - const int numConcurrentMessages = 1000; + Log.MinimumLevel = LogLevel.Trace; + var messageBus = GetMessageBus(); + if (messageBus == null) + return; + + try { + var countdown = new AsyncCountdownEvent(1); + await messageBus.SubscribeAsync(msg => { + var msgDelay = TimeSpan.FromMilliseconds(msg.Count); + _logger.LogTrace("Got message delayed by {Delay:g}.", msgDelay); + + Assert.Equal("Hello", msg.Data); + countdown.Signal(); + }); + + var sw = Stopwatch.StartNew(); + var delay = TimeSpan.FromMilliseconds(RandomData.GetInt(250, 1500)); + await messageBus.PublishAsync(new SimpleMessageA { + Data = "Hello", + Count = (int)delay.TotalMilliseconds + }, delay); + _logger.LogTrace("Published message..."); + + await countdown.WaitAsync(TimeSpan.FromSeconds(30)); + sw.Stop(); + + _logger.LogTrace("Got message delayed by {Delay:g} in {Duration:g}", delay, sw.Elapsed); + Assert.Equal(0, countdown.CurrentCount); + Assert.InRange(sw.Elapsed.TotalMilliseconds, 50, 2000); + } finally { + await CleanupMessageBusAsync(messageBus); + } + } + + public virtual async Task CanSendParallelDelayedMessagesAsync() { + Log.MinimumLevel = LogLevel.Trace; + const int numConcurrentMessages = 100; var messageBus = GetMessageBus(); if (messageBus == null) return; @@ -158,17 +194,18 @@ await Run.InParallelAsync(numConcurrentMessages, async i => { await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello", Count = i - }, new MessageOptions { DeliveryDelay = TimeSpan.FromMilliseconds(RandomData.GetInt(0, 100)) }); + }, TimeSpan.FromMilliseconds(RandomData.GetInt(100, 500))); if (i % 500 == 0) _logger.LogTrace("Published 500 messages..."); }); + _logger.LogTrace("Done publishing {Count} messages.", numConcurrentMessages); - await countdown.WaitAsync(TimeSpan.FromSeconds(5)); + await countdown.WaitAsync(TimeSpan.FromSeconds(30)); sw.Stop(); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Processed {Processed} in {Duration:g}", numConcurrentMessages - countdown.CurrentCount, sw.Elapsed); + _logger.LogTrace("Processed {Processed} in {Duration:g}", numConcurrentMessages - countdown.CurrentCount, sw.Elapsed); Assert.Equal(0, countdown.CurrentCount); - Assert.InRange(sw.Elapsed.TotalMilliseconds, 50, 5000); + Assert.InRange(sw.Elapsed.TotalMilliseconds, 100, 2000); } finally { await CleanupMessageBusAsync(messageBus); } diff --git a/src/Foundatio.TestHarness/Serializer/SerializerTestsBase.cs b/src/Foundatio.TestHarness/Serializer/SerializerTestsBase.cs index 3f98aa427..0c9f1d049 100644 --- a/src/Foundatio.TestHarness/Serializer/SerializerTestsBase.cs +++ b/src/Foundatio.TestHarness/Serializer/SerializerTestsBase.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using BenchmarkDotNet.Attributes; using Foundatio.Xunit; using Foundatio.Serializer; diff --git a/src/Foundatio/DeepCloner/Helpers/DeepClonerSafeTypes.cs b/src/Foundatio/DeepCloner/Helpers/DeepClonerSafeTypes.cs index 1e29f3a45..de556a4f4 100644 --- a/src/Foundatio/DeepCloner/Helpers/DeepClonerSafeTypes.cs +++ b/src/Foundatio/DeepCloner/Helpers/DeepClonerSafeTypes.cs @@ -2,15 +2,13 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Linq; using System.Reflection; -namespace Foundatio.Force.DeepCloner.Helpers -{ - /// - /// Safe types are types, which can be copied without real cloning. e.g. simple structs or strings (it is immutable) - /// - internal static class DeepClonerSafeTypes +namespace Foundatio.Force.DeepCloner.Helpers { + /// + /// Safe types are types, which can be copied without real cloning. e.g. simple structs or strings (it is immutable) + /// + internal static class DeepClonerSafeTypes { internal static readonly ConcurrentDictionary KnownTypes = new(); diff --git a/src/Foundatio/DeepCloner/Helpers/ShallowObjectCloner.cs b/src/Foundatio/DeepCloner/Helpers/ShallowObjectCloner.cs index 90e4618f3..6835b0c18 100644 --- a/src/Foundatio/DeepCloner/Helpers/ShallowObjectCloner.cs +++ b/src/Foundatio/DeepCloner/Helpers/ShallowObjectCloner.cs @@ -1,8 +1,6 @@ #define NETCORE using System; using System.Linq.Expressions; -using System.Reflection; -using System.Reflection.Emit; namespace Foundatio.Force.DeepCloner.Helpers { diff --git a/src/Foundatio/Messaging/Envelope.cs b/src/Foundatio/Messaging/Envelope.cs new file mode 100644 index 000000000..969119492 --- /dev/null +++ b/src/Foundatio/Messaging/Envelope.cs @@ -0,0 +1,80 @@ +using System; +using System.Collections.Generic; + +namespace Foundatio.Messaging2 { + public interface IEnvelope { + // trace parent id used for distributed tracing + string TraceParentId { get; } + // message type + string MessageType { get; } + // message body + object GetMessage(); + // number of attempts to deliver the message + int Attempts { get; } + // when the message was originally sent + DateTime SentAtUtc { get; } + // when the message should expire + DateTime? ExpiresAtUtc { get; } + // when the message should be delivered when using delayed delivery + DateTime? DeliverAtUtc { get; } + // additional message data to store with the message + IReadOnlyDictionary Properties { get; } + } + + public class Envelope : IEnvelope { + private Lazy _message; + + public Envelope(Func getMessageFunc, string messageType, string coorelationId, DateTime? expiresAtUtc, DateTime? deliverAtUtc, IReadOnlyDictionary properties) { + _message = new Lazy(getMessageFunc); + MessageType = messageType; + TraceParentId = coorelationId; + ExpiresAtUtc = expiresAtUtc; + DeliverAtUtc = deliverAtUtc; + Properties = properties; + } + + public Message(Func getMessageFunc, MessagePublishOptions options) { + _message = new Lazy(getMessageFunc); + TraceParentId = options.CorrelationId; + MessageType = options.MessageType; + ExpiresAtUtc = options.ExpiresAtUtc; + DeliverAtUtc = options.DeliverAtUtc; + Properties = options.Properties; + } + + public string TraceParentId { get; private set; } + public string MessageType { get; private set; } + public int Attempts { get; private set; } + public DateTime SentAtUtc { get; private set; } + public DateTime? ExpiresAtUtc { get; private set; } + public DateTime? DeliverAtUtc { get; private set; } + public IReadOnlyDictionary Properties { get; private set; } + + public object GetMessage() { + return _message.Value; + } + } + + public interface IEnvelope : IEnvelope where T: class { + T Message { get; } + } + + public class Envelope : IEnvelope where T: class { + private readonly IEnvelope _envolope; + + public Envelope(IEnvelope message) { + _envolope = message; + } + + public T Message => (T)GetMessage(); + + public string TraceParentId => _envolope.TraceParentId; + public string MessageType => _envolope.MessageType; + public int Attempts => _envolope.Attempts; + public DateTime SentAtUtc => _envolope.SentAtUtc; + public DateTime? ExpiresAtUtc => _envolope.ExpiresAtUtc; + public DateTime? DeliverAtUtc => _envolope.DeliverAtUtc; + public IReadOnlyDictionary Properties => _envolope.Properties; + public object GetMessage() => _envolope.GetMessage(); + } +} diff --git a/src/Foundatio/Messaging/IMessageBus.cs b/src/Foundatio/Messaging/IMessageBus.cs index 95463bf02..0bca26468 100644 --- a/src/Foundatio/Messaging/IMessageBus.cs +++ b/src/Foundatio/Messaging/IMessageBus.cs @@ -2,12 +2,7 @@ using Foundatio.Utility; namespace Foundatio.Messaging { - public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposable {} - - public class MessageOptions { - public string UniqueId { get; set; } - public string CorrelationId { get; set; } - public TimeSpan? DeliveryDelay { get; set; } - public DataDictionary Properties { get; set; } = new DataDictionary(); + public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposable { + string MessageBusId { get; } } } \ No newline at end of file diff --git a/src/Foundatio/Messaging/IMessageContext.cs b/src/Foundatio/Messaging/IMessageContext.cs new file mode 100644 index 000000000..3d2685944 --- /dev/null +++ b/src/Foundatio/Messaging/IMessageContext.cs @@ -0,0 +1,115 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Foundatio.Messaging { + public interface IMessageContext : IMessage, IDisposable { + // message id + string Id { get; } + // message subscription id that received the message + string SubscriptionId { get; } + // when the message was originally published + DateTime PublishedUtc { get; } + // number of times this message has been delivered + int DeliveryCount { get; } + // acknowledge receipt of message and delete it + Task AcknowledgeAsync(); + // reject the message as not having been successfully processed + Task RejectAsync(); + // used to cancel processing of the current message + CancellationToken CancellationToken { get; } + } + + public interface IMessageContext : IMessageContext, IMessage where T: class {} + + public class MessageContext : IMessageContext where T : class { + private readonly IMessageContext _context; + + public MessageContext(IMessageContext context) { + _context = context; + } + + public string Id => _context.Id; + public string SubscriptionId => _context.SubscriptionId; + public DateTime PublishedUtc => _context.PublishedUtc; + public int DeliveryCount => _context.DeliveryCount; + public CancellationToken CancellationToken => _context.CancellationToken; + public string CorrelationId => _context.CorrelationId; + public Type MessageType => _context.MessageType; + public DateTime? ExpiresAtUtc => _context.ExpiresAtUtc; + public DateTime? DeliverAtUtc => _context.DeliverAtUtc; + public IReadOnlyDictionary Properties => _context.Properties; + public T Body => (T)GetBody(); + + public object GetBody() { + return _context.GetBody(); + } + + public Task AcknowledgeAsync() { + return _context.AcknowledgeAsync(); + } + + public Task RejectAsync() { + return _context.RejectAsync(); + } + + public void Dispose() { + _context.Dispose(); + } + } + + public class MessageContext : IMessageContext { + protected readonly IMessage _message; + protected readonly Func _acknowledgeAction; + protected readonly Func _rejectAction; + protected readonly Action _disposeAction; + + public MessageContext(string id, string subscriptionId, DateTime createdUtc, int deliveryCount, + IMessage message, Func acknowledgeAction, Func rejectAction, Action disposeAction, + CancellationToken cancellationToken = default) { + Id = id; + SubscriptionId = subscriptionId; + PublishedUtc = createdUtc; + DeliveryCount = deliveryCount; + _message = message; + _acknowledgeAction = acknowledgeAction; + _rejectAction = rejectAction; + _disposeAction = disposeAction; + CancellationToken = cancellationToken; + } + + public string Id { get; private set; } + public string SubscriptionId { get; private set; } + public DateTime PublishedUtc { get; private set; } + public int DeliveryCount { get; private set; } + public CancellationToken CancellationToken { get; private set; } + public string CorrelationId => _message.CorrelationId; + public Type MessageType => _message.MessageType; + public DateTime? ExpiresAtUtc => _message.ExpiresAtUtc; + public DateTime? DeliverAtUtc => _message.DeliverAtUtc; + public IReadOnlyDictionary Properties => _message.Properties; + + public object GetBody() { + return _message.GetBody(); + } + + public Task AcknowledgeAsync() { + if (_acknowledgeAction == null) + return Task.CompletedTask; + + return _acknowledgeAction(); + } + + public Task RejectAsync() { + if (_rejectAction == null) + return Task.CompletedTask; + + return _rejectAction(); + } + + public void Dispose() { + _disposeAction?.Invoke(); + } + } +} \ No newline at end of file diff --git a/src/Foundatio/Messaging/IMessagePublisher.cs b/src/Foundatio/Messaging/IMessagePublisher.cs index cfb7078fb..975346921 100644 --- a/src/Foundatio/Messaging/IMessagePublisher.cs +++ b/src/Foundatio/Messaging/IMessagePublisher.cs @@ -1,19 +1,76 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Foundatio.Utility; namespace Foundatio.Messaging { public interface IMessagePublisher { - Task PublishAsync(Type messageType, object message, MessageOptions options = null, CancellationToken cancellationToken = default); + Task PublishAsync(object message, MessagePublishOptions options); + } + + public class MessagePublishOptions { + public Type MessageType { get; set; } + public string CorrelationId { get; set; } + public DateTime? ExpiresAtUtc { get; set; } + public DateTime? DeliverAtUtc { get; set; } + public Dictionary Properties { get; set; } = new Dictionary(); + public CancellationToken CancellationToken { get; set; } + + public MessagePublishOptions WithMessageType(Type messageType) { + MessageType = messageType; + return this; + } + + public MessagePublishOptions WithCorrelationId(string correlationId) { + CorrelationId = correlationId; + return this; + } + + public MessagePublishOptions WithExpiresAtUtc(DateTime? expiresAtUtc) { + ExpiresAtUtc = expiresAtUtc; + return this; + } + + public MessagePublishOptions WithDeliverAtUtc(DateTime? deliverAtUtc) { + DeliverAtUtc = deliverAtUtc; + return this; + } + + public MessagePublishOptions WithProperties(IDictionary properties) { + Properties.AddRange(properties); + return this; + } + + public MessagePublishOptions WithProperty(string name, string value) { + Properties.Add(name, value); + return this; + } + + public MessagePublishOptions WithCancellationToken(CancellationToken cancellationToken) { + CancellationToken = cancellationToken; + return this; + } } public static class MessagePublisherExtensions { - public static Task PublishAsync(this IMessagePublisher publisher, T message, MessageOptions options = null) where T : class { - return publisher.PublishAsync(typeof(T), message, options); + public static Task PublishAsync(this IMessagePublisher publisher, Type messageType, object message, TimeSpan? delay = null, CancellationToken cancellationToken = default) { + var deliverAtUtc = delay.HasValue ? (DateTime?)DateTime.UtcNow.Add(delay.Value) : null; + return publisher.PublishAsync(message, new MessagePublishOptions().WithMessageType(messageType).WithDeliverAtUtc(deliverAtUtc).WithCancellationToken(cancellationToken)); + } + + public static Task PublishAsync(this IMessagePublisher publisher, T message, TimeSpan? delay = null) where T : class { + return publisher.PublishAsync(typeof(T), message, delay); } - public static Task PublishAsync(this IMessagePublisher publisher, T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class { - return publisher.PublishAsync(typeof(T), message, new MessageOptions { DeliveryDelay = delay }, cancellationToken); + public static Task PublishAsync(this IMessagePublisher publisher, T message, MessagePublishOptions options) where T : class { + if (options == null) + options = new MessagePublishOptions(); + + if (options.MessageType == null) + options.MessageType = typeof(T); + + return publisher.PublishAsync(message, options); } } } diff --git a/src/Foundatio/Messaging/IMessageStore.cs b/src/Foundatio/Messaging/IMessageStore.cs new file mode 100644 index 000000000..46d2169de --- /dev/null +++ b/src/Foundatio/Messaging/IMessageStore.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.Utility; +using Microsoft.Extensions.Logging; + +namespace Foundatio.Messaging { + public interface IMessageStore { + Task AddAsync(PersistedMessage message); + Task RemoveAsync(string[] ids); + Task> GetReadyForDeliveryAsync(); + Task RemoveAllAsync(); + } + + public class PersistedMessage { + public string Id { get; set; } + public DateTime PublishedUtc { get; set; } + public string CorrelationId { get; set; } + public string MessageTypeName { get; set; } + public byte[] Body { get; set; } + public DateTime? ExpiresAtUtc { get; set; } + public DateTime? DeliverAtUtc { get; set; } + public IReadOnlyDictionary Properties { get; set; } + } + + public class InMemoryMessageStore : IMessageStore { + private readonly List _messages = new List(); + private readonly ILogger _logger; + + public InMemoryMessageStore(ILogger logger) { + _logger = logger; + } + + public Task AddAsync(PersistedMessage message) { + _messages.Add(new InMemoryPersistedMessage(message)); + return Task.CompletedTask; + } + + public Task> GetReadyForDeliveryAsync() { + var dueList = new List(); + foreach (var message in _messages) { + if (message.IsProcessing) + continue; + + if (message.Message.DeliverAtUtc > SystemClock.UtcNow) + continue; + + if (!message.MarkProcessing()) + continue; + + dueList.Add(message.Message); + + if (dueList.Count >= 100) + break; + } + + if (_messages.Count <= 0) + _logger.LogTrace("No messages ready for delivery."); + else + _logger.LogTrace("Got {Count} / {Total} messages ready for delivery.", dueList.Count, _messages.Count); + + return Task.FromResult>(dueList); + } + + public Task RemoveAllAsync() { + _messages.Clear(); + return Task.CompletedTask; + } + + public Task RemoveAsync(string[] ids) { + _messages.RemoveAll(m => ids.Contains(m.Message.Id)); + return Task.CompletedTask; + } + + protected class InMemoryPersistedMessage { + public InMemoryPersistedMessage(PersistedMessage message) { + Message = message; + } + + public PersistedMessage Message { get; set; } + public bool IsProcessing { + get { + if (_processing == 0) + return false; + + // check for timeout + if (SystemClock.UtcNow.Subtract(_startedProcessing) > TimeSpan.FromMinutes(1)) { + _processing = 0; + return false; + } + + return true; + } + } + + private int _processing = 0; + private DateTime _startedProcessing = DateTime.MinValue; + + public bool MarkProcessing() { + var result = Interlocked.Exchange(ref _processing, 1); + _startedProcessing = SystemClock.Now; + + return result == 0; + } + } + } +} \ No newline at end of file diff --git a/src/Foundatio/Messaging/IMessageSubscriber.cs b/src/Foundatio/Messaging/IMessageSubscriber.cs index e7517d958..d4a43dff0 100644 --- a/src/Foundatio/Messaging/IMessageSubscriber.cs +++ b/src/Foundatio/Messaging/IMessageSubscriber.cs @@ -1,18 +1,78 @@ using System; using System.Threading; using System.Threading.Tasks; +using Foundatio.Utility; namespace Foundatio.Messaging { - public interface IMessageSubscriber { - Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class; + public interface IHandle where T: class { + Task Handle(IMessageContext context); + } + + public interface IMessageSubscriber : IDisposable { + Task SubscribeAsync(MessageSubscriptionOptions options, Func handler); + Task ReceiveAsync(MessageReceiveOptions options); + } + + public class MessageSubscriptionOptions { + public Type MessageType { get; set; } + public int PrefetchCount { get; set; } = 1; + public CancellationToken CancellationToken { get; set; } + + public MessageSubscriptionOptions WithMessageType(Type messageType) { + MessageType = messageType; + return this; + } + + public MessageSubscriptionOptions WithPrefetchCount(int prefetchCount) { + PrefetchCount = prefetchCount; + return this; + } + + public MessageSubscriptionOptions WithCancellationToken(CancellationToken cancellationToken) { + CancellationToken = cancellationToken; + return this; + } + } + + public class MessageReceiveOptions { + public Type MessageType { get; set; } + public TimeSpan Timeout { get; set; } + public CancellationToken CancellationToken { get; set; } + + public MessageReceiveOptions WithMessageType(Type messageType) { + MessageType = messageType; + return this; + } + + public MessageReceiveOptions WithTimeout(TimeSpan timeout) { + Timeout = timeout; + return this; + } + + public MessageReceiveOptions WithCancellationToken(CancellationToken cancellationToken) { + CancellationToken = cancellationToken; + return this; + } } public static class MessageBusExtensions { - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) where T : class { + public static async Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) where T : class { + if (cancellationToken.IsCancellationRequested) + return new MessageSubscription(typeof(T), () => {}); + + var options = new MessageSubscriptionOptions().WithMessageType(typeof(T)).WithCancellationToken(cancellationToken); + var subscription = await subscriber.SubscribeAsync(options, (msg) => handler((T)msg.GetBody(), msg.CancellationToken)).AnyContext(); + if (cancellationToken != CancellationToken.None) + cancellationToken.Register(() => subscription.Dispose()); + + return subscription; + } + + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) where T : class { return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, CancellationToken cancellationToken = default) where T : class { + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, CancellationToken cancellationToken = default) where T : class { return subscriber.SubscribeAsync((msg, token) => { handler(msg); return Task.CompletedTask; diff --git a/src/Foundatio/Messaging/IMessageSubscription.cs b/src/Foundatio/Messaging/IMessageSubscription.cs new file mode 100644 index 000000000..2945d4f86 --- /dev/null +++ b/src/Foundatio/Messaging/IMessageSubscription.cs @@ -0,0 +1,39 @@ +using System; + +namespace Foundatio.Messaging { + public interface IMessageSubscription : IDisposable { + string Id { get; } + string MessageBusId { get; } + Type MessageType { get; } + DateTime CreatedUtc { get; } + bool IsCancelled { get; } + } + + public static class MessageSubscriptionExtensions { + public static bool HandlesMessagesType(this IMessageSubscription subscription, Type type) { + return subscription.MessageType.IsAssignableFrom(type); + } + } + + public class MessageSubscription : IMessageSubscription { + private readonly Action _unsubscribeAction; + + public MessageSubscription(Type messageType, Action unsubscribeAction) { + Id = Guid.NewGuid().ToString("N"); + MessageType = messageType; + CreatedUtc = DateTime.UtcNow; + _unsubscribeAction = unsubscribeAction; + } + + public string Id { get; } + public string MessageBusId { get; } + public Type MessageType { get; } + public DateTime CreatedUtc { get; } + public bool IsCancelled { get; private set; } + + public virtual void Dispose() { + IsCancelled = true; + _unsubscribeAction?.Invoke(); + } + } +} \ No newline at end of file diff --git a/src/Foundatio/Messaging/ITypeNameSerializer.cs b/src/Foundatio/Messaging/ITypeNameSerializer.cs new file mode 100644 index 000000000..c8622b3c1 --- /dev/null +++ b/src/Foundatio/Messaging/ITypeNameSerializer.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Foundatio.Messaging { + public interface ITypeNameSerializer { + string Serialize(Type type); + Type Deserialize(string typeName); + } + + public class DefaultTypeNameSerializer : ITypeNameSerializer { + private readonly ILogger _logger; + private readonly Dictionary _typeNameOverrides; + private readonly ConcurrentDictionary _typeNameCache = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _typeCache = new ConcurrentDictionary(); + + public DefaultTypeNameSerializer(ILogger logger = null, IDictionary typeNameOverrides = null) { + _logger = logger ?? NullLogger.Instance; + if (typeNameOverrides != null) + _typeNameOverrides = new Dictionary(typeNameOverrides); + } + + public Type Deserialize(string typeName) { + return _typeCache.GetOrAdd(typeName, newTypeName => { + if (_typeNameOverrides != null && _typeNameOverrides.ContainsKey(newTypeName)) + return _typeNameOverrides[newTypeName]; + + try { + return Type.GetType(newTypeName); + } catch (Exception ex) { + if (_logger.IsEnabled(LogLevel.Warning)) + _logger.LogWarning(ex, "Error getting message type: {MessageType}", newTypeName); + + return null; + } + }); + } + + public string Serialize(Type type) { + return _typeNameCache.GetOrAdd(type, newType => { + if (_typeNameOverrides != null) { + var reversedMap = _typeNameOverrides.ToDictionary(kvp => kvp.Value, kvp => kvp.Key); + if (reversedMap.ContainsKey(newType)) + return reversedMap[newType]; + } + + return String.Concat(type.FullName, ", ", type.Assembly.GetName().Name); + }); + } + } +} \ No newline at end of file diff --git a/src/Foundatio/Messaging/InMemoryMessageBus.cs b/src/Foundatio/Messaging/InMemoryMessageBus.cs index efeb26854..4899cf62a 100644 --- a/src/Foundatio/Messaging/InMemoryMessageBus.cs +++ b/src/Foundatio/Messaging/InMemoryMessageBus.cs @@ -1,18 +1,25 @@ using System; using System.Collections.Concurrent; +using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Foundatio.Utility; +using Foundatio.Serializer; using Microsoft.Extensions.Logging; namespace Foundatio.Messaging { public class InMemoryMessageBus : MessageBusBase { private readonly ConcurrentDictionary _messageCounts = new(); private long _messagesSent; + private readonly TaskFactory _taskFactory; public InMemoryMessageBus() : this(o => o) {} - public InMemoryMessageBus(InMemoryMessageBusOptions options) : base(options) { } + public InMemoryMessageBus(InMemoryMessageBusOptions options) : base(options) { + // limit message processing to 50 at a time + _taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(50)); + } public InMemoryMessageBus(Builder config) : this(config(new InMemoryMessageBusOptionsBuilder()).Build()) { } @@ -20,11 +27,11 @@ public InMemoryMessageBus(Builder _messagesSent; public long GetMessagesSent(Type messageType) { - return _messageCounts.TryGetValue(GetMappedMessageType(messageType), out long count) ? count : 0; + return _messageCounts.TryGetValue(_typeNameSerializer.Serialize(messageType), out var count) ? count : 0; } public long GetMessagesSent() { - return _messageCounts.TryGetValue(GetMappedMessageType(typeof(T)), out long count) ? count : 0; + return _messageCounts.TryGetValue(_typeNameSerializer.Serialize(typeof(T)), out var count) ? count : 0; } public void ResetMessagesSent() { @@ -32,35 +39,79 @@ public void ResetMessagesSent() { _messageCounts.Clear(); } - protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) { + protected override Task PublishImplAsync(byte[] body, MessagePublishOptions options = null) { Interlocked.Increment(ref _messagesSent); - _messageCounts.AddOrUpdate(messageType, t => 1, (t, c) => c + 1); - var mappedType = GetMappedMessageType(messageType); + var typeName = _typeNameSerializer.Serialize(options.MessageType); + _messageCounts.AddOrUpdate(typeName, t => 1, (t, c) => c + 1); - if (_subscribers.IsEmpty) - return; + if (_subscriptions.Count == 0) + return Task.CompletedTask; - bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); - if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) { - if (isTraceLogLevelEnabled) - _logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds); - SendDelayedMessage(mappedType, message, options.DeliveryDelay.Value); - return; - } + _logger.LogTrace("Message Publish: {MessageType}", options.MessageType.FullName); + + SendMessageToSubscribers(body, options); + return Task.CompletedTask; + } + + protected override Task SubscribeImplAsync(MessageSubscriptionOptions options, Func handler) { + var subscriber = new Subscriber(options.MessageType, handler); + return Task.FromResult(subscriber); + } + + protected void SendMessageToSubscribers(byte[] body, MessagePublishOptions options) { + if (body == null) + throw new ArgumentNullException(nameof(body)); - byte[] body = SerializeMessageBody(messageType, message); - var messageData = new Message(() => DeserializeMessageBody(messageType, body)) { - Type = messageType, - ClrType = mappedType, - Data = body + if (options == null) + throw new ArgumentNullException(nameof(options)); + + var createdUtc = SystemClock.UtcNow; + var messageId = Guid.NewGuid().ToString(); + Func getBody = () => { + return _serializer.Deserialize(body, options.MessageType); }; - try { - await SendMessageToSubscribersAsync(messageData).AnyContext(); - } catch (Exception ex) { - // swallow exceptions from subscriber handlers for the in memory bus - _logger.LogWarning(ex, "Error sending message to subscribers: {ErrorMessage}", ex.Message); + var subscribers = _subscriptions.ToArray().Where(s => s.IsCancelled == false && s.HandlesMessagesType(options.MessageType)).OfType().ToArray(); + bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); + if (isTraceLogLevelEnabled) + _logger.LogTrace("Found {SubscriberCount} subscribers for message type {MessageType}.", subscribers.Length, options.MessageType.Name); + + foreach (var subscriber in subscribers) { + _taskFactory.StartNew(async () => { + if (subscriber.IsCancelled) { + if (isTraceLogLevelEnabled) + _logger.LogTrace("The cancelled subscriber action will not be called: {SubscriberId}", subscriber.Id); + + return; + } + + if (isTraceLogLevelEnabled) + _logger.LogTrace("Calling subscriber action: {SubscriberId}", subscriber.Id); + + try { + var message = new Message(getBody, options.MessageType, options.CorrelationId, options.ExpiresAtUtc, options.DeliverAtUtc, options.Properties); + var context = new MessageContext(messageId, subscriber.Id, createdUtc, 1, message, () => Task.CompletedTask, () => Task.CompletedTask, () => {}, options.CancellationToken); + await subscriber.Action(context).AnyContext(); + if (isTraceLogLevelEnabled) + _logger.LogTrace("Finished calling subscriber action: {SubscriberId}", subscriber.Id); + } catch (Exception ex) { + if (_logger.IsEnabled(LogLevel.Warning)) + _logger.LogWarning(ex, "Error sending message to subscriber: {Message}", ex.Message); + } + }); } + + if (isTraceLogLevelEnabled) + _logger.LogTrace("Done enqueueing message to {SubscriberCount} subscribers for message type {MessageType}.", subscribers.Length, options.MessageType.Name); + } + + [DebuggerDisplay("Id: {Id} Type: {MessageType} IsDisposed: {IsDisposed}")] + protected class Subscriber : MessageSubscription { + public Subscriber(Type messageType, Func action) : base(messageType, () => {}) { + Action = action; + } + + public Func Action { get; } } } } \ No newline at end of file diff --git a/src/Foundatio/Messaging/MessageBusBase.cs b/src/Foundatio/Messaging/MessageBusBase.cs index ba1d3a104..56d5e4d27 100644 --- a/src/Foundatio/Messaging/MessageBusBase.cs +++ b/src/Foundatio/Messaging/MessageBusBase.cs @@ -1,9 +1,6 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; -using System.Reflection; using System.Threading; using System.Threading.Tasks; using Foundatio.Serializer; @@ -12,244 +9,122 @@ using Microsoft.Extensions.Logging.Abstractions; namespace Foundatio.Messaging { - public abstract class MessageBusBase : IMessageBus, IDisposable where TOptions : SharedMessageBusOptions { - private readonly CancellationTokenSource _messageBusDisposedCancellationTokenSource; - protected readonly ConcurrentDictionary _subscribers = new(); + public abstract class MessageBusBase : IMessageBus where TOptions : SharedMessageBusOptions { + protected readonly List _subscriptions = new List(); protected readonly TOptions _options; - protected readonly ILogger _logger; protected readonly ISerializer _serializer; + protected readonly ITypeNameSerializer _typeNameSerializer; + protected readonly IMessageStore _store; + protected readonly ILogger _logger; private bool _isDisposed; + protected readonly ISystemClock _clock; + protected readonly ITimer _maintenanceTimer; public MessageBusBase(TOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); - var loggerFactory = options?.LoggerFactory ?? NullLoggerFactory.Instance; - _logger = loggerFactory.CreateLogger(GetType()); _serializer = options.Serializer ?? DefaultSerializer.Instance; - MessageBusId = _options.Topic + Guid.NewGuid().ToString("N").Substring(10); - _messageBusDisposedCancellationTokenSource = new CancellationTokenSource(); - } - - protected virtual Task EnsureTopicCreatedAsync(CancellationToken cancellationToken) => Task.CompletedTask; - protected abstract Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken); - public async Task PublishAsync(Type messageType, object message, MessageOptions options = null, CancellationToken cancellationToken = default) { - if (messageType == null || message == null) - return; - - await EnsureTopicCreatedAsync(cancellationToken).AnyContext(); - await PublishImplAsync(GetMappedMessageType(messageType), message, options ?? new MessageOptions(), cancellationToken).AnyContext(); - } - - private readonly ConcurrentDictionary _mappedMessageTypesCache = new(); - protected string GetMappedMessageType(Type messageType) { - return _mappedMessageTypesCache.GetOrAdd(messageType, type => { - var reversedMap = _options.MessageTypeMappings.ToDictionary(kvp => kvp.Value, kvp => kvp.Key); - if (reversedMap.ContainsKey(type)) - return reversedMap[type]; - - return String.Concat(messageType.FullName, ", ", messageType.Assembly.GetName().Name); - }); - } - - private readonly ConcurrentDictionary _knownMessageTypesCache = new(); - protected virtual Type GetMappedMessageType(string messageType) { - if (String.IsNullOrEmpty(messageType)) - return null; - - return _knownMessageTypesCache.GetOrAdd(messageType, type => { - if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.ContainsKey(type)) - return _options.MessageTypeMappings[type]; - - try { - return Type.GetType(type); - } catch (Exception) { - try { - string[] typeParts = type.Split(','); - if (typeParts.Length >= 2) - type = String.Join(",", typeParts[0], typeParts[1]); - - // try resolve type without version - return Type.GetType(type); - } catch (Exception ex) { - if (_logger.IsEnabled(LogLevel.Warning)) - _logger.LogWarning(ex, "Error getting message body type: {MessageType}", type); - - return null; - } - } - }); - } - - protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask; - protected virtual Task SubscribeImplAsync(Func handler, CancellationToken cancellationToken) where T : class { - var subscriber = new Subscriber { - CancellationToken = cancellationToken, - Type = typeof(T), - Action = (message, token) => { - if (message is not T) { - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Unable to call subscriber action: {MessageType} cannot be safely casted to {SubscriberType}", message.GetType(), typeof(T)); - return Task.CompletedTask; - } - - return handler((T)message, cancellationToken); - } - }; - - if (!_subscribers.TryAdd(subscriber.Id, subscriber) && _logger.IsEnabled(LogLevel.Error)) - _logger.LogError("Unable to add subscriber {SubscriberId}", subscriber.Id); - - return Task.CompletedTask; + var loggerFactory = options.LoggerFactory ?? NullLoggerFactory.Instance; + _logger = loggerFactory.CreateLogger(GetType()); + _typeNameSerializer = options.TypeNameSerializer ?? new DefaultTypeNameSerializer(_logger); + _store = options.MessageStore ?? new InMemoryMessageStore(_logger); + MessageBusId = Guid.NewGuid().ToString("N"); + _clock = options.SystemClock ?? SystemClock.Instance; + _maintenanceTimer = _clock.Timer(DoMaintenanceAsync, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); } - public async Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class { - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Adding subscriber for {MessageType}.", typeof(T).FullName); - await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext(); - await SubscribeImplAsync(handler, cancellationToken).AnyContext(); - } + public string MessageBusId { get; protected set; } - protected List GetMessageSubscribers(IMessage message) { - return _subscribers.Values.Where(s => SubscriberHandlesMessage(s, message)).ToList(); - } + protected virtual Task ConfigureMessageType(Type messageType, CancellationToken cancellationToken) => Task.CompletedTask; - protected virtual bool SubscriberHandlesMessage(Subscriber subscriber, IMessage message) { - if (subscriber.Type == typeof(IMessage)) - return true; + protected abstract Task PublishImplAsync(byte[] body, MessagePublishOptions options = null); - var clrType = GetMappedMessageType(message.Type); + public async Task PublishAsync(object message, MessagePublishOptions options) { + if (message == null) + return; - if (subscriber.IsAssignableFrom(clrType)) - return true; + if (options.MessageType == null) + options.MessageType = message.GetType(); - return false; - } + if (options.CancellationToken.IsCancellationRequested) + return; - protected virtual byte[] SerializeMessageBody(string messageType, object body) { - if (body == null) - return new byte[0]; + if (options.ExpiresAtUtc.HasValue && options.ExpiresAtUtc.Value < SystemClock.UtcNow) + return; - return _serializer.SerializeToBytes(body); - } - - protected virtual object DeserializeMessageBody(string messageType, byte[] data) { - if (data == null || data.Length == 0) - return null; + await ConfigureMessageType(options.MessageType, options.CancellationToken).AnyContext(); + var body = _serializer.SerializeToBytes(message); + + if (options.DeliverAtUtc.HasValue && options.DeliverAtUtc > SystemClock.UtcNow) { + _logger.LogTrace("Storing message scheduled for delivery at {DeliverAt}.", options.DeliverAtUtc.Value); + var typeName = _typeNameSerializer.Serialize(options.MessageType); + await _store.AddAsync(new PersistedMessage { + Id = Guid.NewGuid().ToString("N"), + PublishedUtc = SystemClock.UtcNow, + CorrelationId = options.CorrelationId, + MessageTypeName = typeName, + Body = body, + ExpiresAtUtc = options.ExpiresAtUtc, + DeliverAtUtc = options.DeliverAtUtc, + Properties = options.Properties + }); - object body; - try { - var clrType = GetMappedMessageType(messageType); - if (clrType != null) - body = _serializer.Deserialize(data, clrType); - else - body = data; - } catch (Exception ex) { - if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(ex, "Error deserializing message body: {Message}", ex.Message); - - return null; + return; } - return body; + await PublishImplAsync(body, options).AnyContext(); } - protected async Task SendMessageToSubscribersAsync(IMessage message) { - bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); - var subscribers = GetMessageSubscribers(message); + protected abstract Task SubscribeImplAsync(MessageSubscriptionOptions options, Func handler); - if (isTraceLogLevelEnabled) - _logger.LogTrace("Found {SubscriberCount} subscribers for message type {MessageType}.", subscribers.Count, message.Type); + public async Task SubscribeAsync(MessageSubscriptionOptions options, Func handler) { + if (options.MessageType == null) + throw new ArgumentNullException("Options must have a MessageType specified."); - if (subscribers.Count == 0) - return; - - if (message.Data == null || message.Data.Length == 0) { - _logger.LogWarning("Unable to send null message for type {MessageType}", message.Type); - return; - } + if (_logger.IsEnabled(LogLevel.Trace)) + _logger.LogTrace("Adding subscription for {MessageType}.", options.MessageType.FullName); - var body = new Lazy(() => DeserializeMessageBody(message.Type, message.Data)); - - var subscriberHandlers = subscribers.Select(subscriber => { - if (subscriber.CancellationToken.IsCancellationRequested) { - if (_subscribers.TryRemove(subscriber.Id, out _)) { - if (isTraceLogLevelEnabled) - _logger.LogTrace("Removed cancelled subscriber: {SubscriberId}", subscriber.Id); - } else if (isTraceLogLevelEnabled) { - _logger.LogTrace("Unable to remove cancelled subscriber: {SubscriberId}", subscriber.Id); - } - - return Task.CompletedTask; - } - - return Task.Run(async () => { - if (subscriber.CancellationToken.IsCancellationRequested) { - if (isTraceLogLevelEnabled) - _logger.LogTrace("The cancelled subscriber action will not be called: {SubscriberId}", subscriber.Id); - - return; - } - - if (isTraceLogLevelEnabled) - _logger.LogTrace("Calling subscriber action: {SubscriberId}", subscriber.Id); - - if (subscriber.Type == typeof(IMessage)) - await subscriber.Action(message, subscriber.CancellationToken).AnyContext(); - else - await subscriber.Action(body.Value, subscriber.CancellationToken).AnyContext(); - - if (isTraceLogLevelEnabled) - _logger.LogTrace("Finished calling subscriber action: {SubscriberId}", subscriber.Id); - }); - }); - - try { - await Task.WhenAll(subscriberHandlers.ToArray()); - } catch (Exception ex) { - _logger.LogWarning(ex, "Error sending message to subscribers: {ErrorMessage}", ex.Message); + if (options.CancellationToken.IsCancellationRequested) + return null; - throw; - } + await ConfigureMessageType(options.MessageType, options.CancellationToken).AnyContext(); + var subscription = await SubscribeImplAsync(options, handler).AnyContext(); + _subscriptions.Add(subscription); - if (isTraceLogLevelEnabled) - _logger.LogTrace("Done enqueueing message to {SubscriberCount} subscribers for message type {MessageType}.", subscribers.Count, message.Type); + return subscription; } - - protected Task AddDelayedMessageAsync(Type messageType, object message, TimeSpan delay) { - if (message == null) - throw new ArgumentNullException(nameof(message)); - SendDelayedMessage(messageType, message, delay); - - return Task.CompletedTask; + public Task ReceiveAsync(MessageReceiveOptions options) { + throw new NotImplementedException(); } - protected void SendDelayedMessage(Type messageType, object message, TimeSpan delay) { - if (message == null) - throw new ArgumentNullException(nameof(message)); - - if (delay <= TimeSpan.Zero) - throw new ArgumentOutOfRangeException(nameof(delay)); + protected bool MessageTypeHasSubscribers(Type messageType) { + var subscribers = _subscriptions.Where(s => s.MessageType.IsAssignableFrom(messageType)).ToList(); + return subscribers.Count == 0; + } - var sendTime = SystemClock.UtcNow.SafeAdd(delay); - Task.Factory.StartNew(async () => { - await SystemClock.SleepSafeAsync(delay, _messageBusDisposedCancellationTokenSource.Token).AnyContext(); + protected async Task DoMaintenanceAsync() { + _logger.LogTrace("Checking for stored messages that are ready for delivery..."); + var pendingMessages = await _store.GetReadyForDeliveryAsync(); + foreach (var pendingMessage in pendingMessages) { + var messageType = _typeNameSerializer.Deserialize(pendingMessage.MessageTypeName); + var properties = new Dictionary(); + if (pendingMessage.Properties != null) + properties.AddRange(pendingMessage.Properties); - bool isTraceLevelEnabled = _logger.IsEnabled(LogLevel.Trace); - if (_messageBusDisposedCancellationTokenSource.IsCancellationRequested) { - if (isTraceLevelEnabled) - _logger.LogTrace("Discarding delayed message scheduled for {SendTime:O} for type {MessageType}", sendTime, messageType); - return; - } - - if (isTraceLevelEnabled) - _logger.LogTrace("Sending delayed message scheduled for {SendTime:O} for type {MessageType}", sendTime, messageType); + await PublishImplAsync(pendingMessage.Body, new MessagePublishOptions { + CorrelationId = pendingMessage.CorrelationId, + DeliverAtUtc = pendingMessage.DeliverAtUtc, + ExpiresAtUtc = pendingMessage.ExpiresAtUtc, + MessageType = messageType, + Properties = properties + }).AnyContext(); + } - await PublishAsync(messageType, message).AnyContext(); - }); + int removed = _subscriptions.RemoveAll(s => s.IsCancelled); + if (removed > 0) + _logger.LogTrace("Removing {CancelledSubscriptionCount} cancelled subscriptions.", removed); } - public string MessageBusId { get; protected set; } - public virtual void Dispose() { if (_isDisposed) { _logger.LogTrace("MessageBus {0} dispose was already called.", MessageBusId); @@ -259,29 +134,12 @@ public virtual void Dispose() { _isDisposed = true; _logger.LogTrace("MessageBus {0} dispose", MessageBusId); - _subscribers?.Clear(); - _messageBusDisposedCancellationTokenSource?.Cancel(); - _messageBusDisposedCancellationTokenSource?.Dispose(); - } - - [DebuggerDisplay("MessageType: {MessageType} SendTime: {SendTime} Message: {Message}")] - protected class DelayedMessage { - public DateTime SendTime { get; set; } - public Type MessageType { get; set; } - public object Message { get; set; } - } - - [DebuggerDisplay("Id: {Id} Type: {Type} CancellationToken: {CancellationToken}")] - protected class Subscriber { - private readonly ConcurrentDictionary _assignableTypesCache = new(); - public string Id { get; private set; } = Guid.NewGuid().ToString("N"); - public CancellationToken CancellationToken { get; set; } - public Type Type { get; set; } - public Func Action { get; set; } + _maintenanceTimer?.Dispose(); - public bool IsAssignableFrom(Type type) { - return _assignableTypesCache.GetOrAdd(type, t => Type.GetTypeInfo().IsAssignableFrom(t)); + if (_subscriptions != null && _subscriptions.Count > 0) { + foreach (var subscription in _subscriptions) + subscription.Dispose(); } } } diff --git a/src/Foundatio/Messaging/NullMessageBus.cs b/src/Foundatio/Messaging/NullMessageBus.cs index a7bce4390..775ba9382 100644 --- a/src/Foundatio/Messaging/NullMessageBus.cs +++ b/src/Foundatio/Messaging/NullMessageBus.cs @@ -1,17 +1,22 @@ -using System; -using System.Threading; +using System; using System.Threading.Tasks; namespace Foundatio.Messaging { public class NullMessageBus : IMessageBus { public static readonly NullMessageBus Instance = new(); - public Task PublishAsync(Type messageType, object message, MessageOptions options = null, CancellationToken cancellationToken = default) { + public string MessageBusId { get; } = Guid.NewGuid().ToString("N"); + + public Task PublishAsync(object message, MessagePublishOptions options = null) { return Task.CompletedTask; } - public Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class { - return Task.CompletedTask; + public Task SubscribeAsync(MessageSubscriptionOptions options, Func handler) { + return Task.FromResult(new MessageSubscription(options.MessageType, () => {})); + } + + public Task ReceiveAsync(MessageReceiveOptions options) { + return Task.FromResult(null); } public void Dispose() {} diff --git a/src/Foundatio/Messaging/SharedMessageBusOptions.cs b/src/Foundatio/Messaging/SharedMessageBusOptions.cs index b7146a1a6..68b5e309c 100644 --- a/src/Foundatio/Messaging/SharedMessageBusOptions.cs +++ b/src/Foundatio/Messaging/SharedMessageBusOptions.cs @@ -1,42 +1,27 @@ -using System; -using System.Collections.Generic; - namespace Foundatio.Messaging { public class SharedMessageBusOptions : SharedOptions { /// - /// The topic name + /// Controls how message types are serialized to/from strings. /// - public string Topic { get; set; } = "messages"; + public ITypeNameSerializer TypeNameSerializer { get; set; } /// - /// Controls which types messages are mapped to. + /// Used to store delayed messages. /// - public Dictionary MessageTypeMappings { get; set; } = new Dictionary(); + public IMessageStore MessageStore { get; set; } } public class SharedMessageBusOptionsBuilder : SharedOptionsBuilder where TOptions : SharedMessageBusOptions, new() where TBuilder : SharedMessageBusOptionsBuilder { - public TBuilder Topic(string topic) { - if (string.IsNullOrEmpty(topic)) - throw new ArgumentNullException(nameof(topic)); - Target.Topic = topic; + + public TBuilder TypeNameSerializer(ITypeNameSerializer typeNameSerializer) { + Target.TypeNameSerializer = typeNameSerializer; return (TBuilder)this; } - - public TBuilder MapMessageType(string name) { - if (Target.MessageTypeMappings == null) - Target.MessageTypeMappings = new Dictionary(); - - Target.MessageTypeMappings[name] = typeof(T); - return (TBuilder)this; - } - - public TBuilder MapMessageTypeToClassName() { - if (Target.MessageTypeMappings == null) - Target.MessageTypeMappings = new Dictionary(); - - Target.MessageTypeMappings[typeof(T).Name] = typeof(T); + + public TBuilder MessageStore(IMessageStore messageStore) { + Target.MessageStore = messageStore; return (TBuilder)this; } } diff --git a/src/Foundatio/Messaging/scenarios.md b/src/Foundatio/Messaging/scenarios.md new file mode 100644 index 000000000..3039bb0e0 --- /dev/null +++ b/src/Foundatio/Messaging/scenarios.md @@ -0,0 +1,18 @@ +- Multiple receivers (pub/sub) + - Fire and forget + - Message acknowledgement +- Worker queues + - Single Worker + - Round robin workers +- Delayed delivery + - Can schedule delivery, messages are persisted to a message store and a background task polls for messages that are due and then sends them out +- Message persistence + - Not all messages need to be persisted and guaranteed delivery +- Message subscriptions are push based with prefetch count setting which should greatly improve throughput +- Can either use generic method overloads or use options to change the message type or topic the message is being published to +- Can subscribe to multiple message types by controlling the message topic instead of using the default topic per .net type +- Request/response + - Publishes message and then does a single message receive on a topic that is for that exact request and waits the specified amount of time +- Receive message (pull model) + - Equivalent of current worker queues pulling a single message at a time + - Ability to receive a batch of messages diff --git a/src/Foundatio/Metrics/IMetricsClient.cs b/src/Foundatio/Metrics/IMetricsClient.cs index 5d2c55b27..44ec6968d 100644 --- a/src/Foundatio/Metrics/IMetricsClient.cs +++ b/src/Foundatio/Metrics/IMetricsClient.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using Foundatio.Utility; namespace Foundatio.Metrics { [Obsolete("IMetricsClient will be removed, use System.Diagnostics.Metrics.Meter instead.")] diff --git a/src/Foundatio/Metrics/InMemoryMetricsClient.cs b/src/Foundatio/Metrics/InMemoryMetricsClient.cs index da60a6563..283fb8efc 100644 --- a/src/Foundatio/Metrics/InMemoryMetricsClient.cs +++ b/src/Foundatio/Metrics/InMemoryMetricsClient.cs @@ -1,5 +1,4 @@ -using System; -using Foundatio.Caching; +using Foundatio.Caching; namespace Foundatio.Metrics { public class InMemoryMetricsClient : CacheBucketMetricsClientBase { diff --git a/src/Foundatio/Metrics/MetricTimer.cs b/src/Foundatio/Metrics/MetricTimer.cs index ff84b479d..aff483d4e 100644 --- a/src/Foundatio/Metrics/MetricTimer.cs +++ b/src/Foundatio/Metrics/MetricTimer.cs @@ -1,7 +1,5 @@ using System; using System.Diagnostics; -using System.Threading.Tasks; -using Foundatio.Utility; namespace Foundatio.Metrics { public class MetricTimer : IDisposable { diff --git a/src/Foundatio/Metrics/NullMetricsClient.cs b/src/Foundatio/Metrics/NullMetricsClient.cs index 409f4b004..de2134e7e 100644 --- a/src/Foundatio/Metrics/NullMetricsClient.cs +++ b/src/Foundatio/Metrics/NullMetricsClient.cs @@ -1,6 +1,4 @@ -using System.Threading.Tasks; - -namespace Foundatio.Metrics { +namespace Foundatio.Metrics { public class NullMetricsClient : IMetricsClient { public static readonly IMetricsClient Instance = new NullMetricsClient(); public void Counter(string name, int value = 1) {} diff --git a/src/Foundatio/Queues/IQueueEntry.cs b/src/Foundatio/Queues/IQueueEntry.cs index 4d178f622..10b2f39d6 100644 --- a/src/Foundatio/Queues/IQueueEntry.cs +++ b/src/Foundatio/Queues/IQueueEntry.cs @@ -1,6 +1,4 @@ -using System; -using System.Threading.Tasks; -using Foundatio.Utility; +using System.Threading.Tasks; namespace Foundatio.Queues { public interface IQueueEntry { diff --git a/src/Foundatio/SystemClock/ISystemClock.cs b/src/Foundatio/SystemClock/ISystemClock.cs new file mode 100644 index 000000000..5315ec43c --- /dev/null +++ b/src/Foundatio/SystemClock/ISystemClock.cs @@ -0,0 +1,21 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Foundatio.Utility { + public interface ISystemClock { + DateTime Now { get; } + DateTime UtcNow { get; } + DateTimeOffset OffsetNow { get; } + DateTimeOffset OffsetUtcNow { get; } + void Sleep(int milliseconds); + Task SleepAsync(int milliseconds, CancellationToken ct = default); + TimeSpan Offset { get; } + void Schedule(Action action, TimeSpan dueTime); + ITimer Timer(Action action, TimeSpan dueTime, TimeSpan period); + } + + public interface ITimer : IDisposable { + bool Change(TimeSpan dueTime, TimeSpan period); + } +} \ No newline at end of file diff --git a/src/Foundatio/SystemClock/ITestSystemClock.cs b/src/Foundatio/SystemClock/ITestSystemClock.cs new file mode 100644 index 000000000..86251e3c7 --- /dev/null +++ b/src/Foundatio/SystemClock/ITestSystemClock.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading; + +namespace Foundatio.Utility { + public interface ITestSystemClock : ISystemClock, IDisposable { + void AddTime(TimeSpan amount); + void SetTime(DateTime time, TimeSpan? offset = null); + WaitHandle NoScheduledWorkItemsDue { get; } + event EventHandler Changed; + } +} \ No newline at end of file diff --git a/src/Foundatio/SystemClock/RealSystemClock.cs b/src/Foundatio/SystemClock/RealSystemClock.cs new file mode 100644 index 000000000..5b5e59ccc --- /dev/null +++ b/src/Foundatio/SystemClock/RealSystemClock.cs @@ -0,0 +1,30 @@ +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Foundatio.Utility { + public class RealSystemClock : ISystemClock { + private readonly WorkScheduler _workScheduler; + + public RealSystemClock(ILoggerFactory loggerFactory) { + loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; + var logger = loggerFactory.CreateLogger(); + _workScheduler = new WorkScheduler(this, logger); + } + + public DateTime Now => DateTime.Now; + public DateTime UtcNow => DateTime.UtcNow; + public DateTimeOffset OffsetNow => DateTimeOffset.Now; + public DateTimeOffset OffsetUtcNow => DateTimeOffset.UtcNow; + public void Sleep(int milliseconds) => Thread.Sleep(milliseconds); + public Task SleepAsync(int milliseconds, CancellationToken ct = default) => Task.Delay(milliseconds, ct); + public TimeSpan Offset => DateTimeOffset.Now.Offset; + public void Schedule(Action action, TimeSpan dueTime) + => _workScheduler.Schedule(action, dueTime); + public ITimer Timer(Action action, TimeSpan dueTime, TimeSpan period) + => _workScheduler.Timer(action, dueTime, period); + } +} \ No newline at end of file diff --git a/src/Foundatio/SystemClock/SystemClock.cs b/src/Foundatio/SystemClock/SystemClock.cs new file mode 100644 index 000000000..59d00feae --- /dev/null +++ b/src/Foundatio/SystemClock/SystemClock.cs @@ -0,0 +1,105 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Foundatio.Utility { + public class SystemClock { + private static AsyncLocal _instance; + private static readonly ISystemClock _realClock = new RealSystemClock(null); + + public static ISystemClock Instance => _instance?.Value ?? _realClock; + + public static void SetInstance(ISystemClock clock, ILoggerFactory loggerFactory) { + var logger = loggerFactory?.CreateLogger("SystemClock") ?? NullLogger.Instance; + _instance = new AsyncLocal(e => { + if (e.ThreadContextChanged) + return; + + if (e.PreviousValue != null && e.CurrentValue != null) { + var diff = e.PreviousValue.Now.Subtract(e.CurrentValue.Now); + logger.LogTrace("SystemClock instance is being changed by {ThreadId} from {OldTime} to {NewTime} diff {Difference:g}", Thread.CurrentThread.ManagedThreadId, e.PreviousValue?.Now, e.CurrentValue?.Now, diff); + } + + if (e.PreviousValue == null) + logger.LogTrace("SystemClock instance is being initially set by {ThreadId} to {NewTime}", Thread.CurrentThread.ManagedThreadId, e.CurrentValue?.Now); + + if (e.CurrentValue == null) + logger.LogTrace("SystemClock instance is being removed set by {ThreadId} from {OldTime}", Thread.CurrentThread.ManagedThreadId, e.PreviousValue?.Now); + }); + + if (clock == null || clock is RealSystemClock) { + if (_instance != null) + _instance.Value = null; + _instance = null; + } else { + _instance.Value = clock; + } + } + + public static DateTime Now => Instance.Now; + public static DateTime UtcNow => Instance.UtcNow; + public static DateTimeOffset OffsetNow => Instance.OffsetNow; + public static DateTimeOffset OffsetUtcNow => Instance.OffsetUtcNow; + public static TimeSpan TimeZoneOffset => Instance.Offset; + public static void Sleep(int milliseconds) => Instance.Sleep(milliseconds); + + public static Task SleepAsync(int milliseconds, CancellationToken cancellationToken = default) + => Instance.SleepAsync(milliseconds, cancellationToken); + + public static void Sleep(TimeSpan delay) + => Instance.Sleep(delay); + + public static Task SleepAsync(TimeSpan delay, CancellationToken cancellationToken = default) + => Instance.SleepAsync(delay, cancellationToken); + + public static Task SleepSafeAsync(int milliseconds, CancellationToken cancellationToken = default) { + return Instance.SleepSafeAsync(milliseconds, cancellationToken); + } + + public static Task SleepSafeAsync(TimeSpan delay, CancellationToken cancellationToken = default) + => Instance.SleepSafeAsync(delay, cancellationToken); + + public static void Schedule(Func action, TimeSpan dueTime) + => Instance.Schedule(action, dueTime); + + public static void Schedule(Action action, TimeSpan dueTime) + => Instance.Schedule(action, dueTime); + + public static void Schedule(Func action, DateTime executeAt) + => Instance.Schedule(action, executeAt); + + public static void Schedule(Action action, DateTime executeAt) + => Instance.Schedule(action, executeAt); + } + + public static class SystemClockExtensions { + public static void Sleep(this ISystemClock clock, TimeSpan delay) + => clock.Sleep((int)delay.TotalMilliseconds); + + public static Task SleepAsync(this ISystemClock clock, TimeSpan delay, CancellationToken cancellationToken = default) + => clock.SleepAsync((int)delay.TotalMilliseconds, cancellationToken); + + public static async Task SleepSafeAsync(this ISystemClock clock, int milliseconds, CancellationToken cancellationToken = default) { + try { + await clock.SleepAsync(milliseconds, cancellationToken).AnyContext(); + } catch (OperationCanceledException) {} + } + + public static Task SleepSafeAsync(this ISystemClock clock, TimeSpan dueTime, CancellationToken cancellationToken = default) + => clock.SleepSafeAsync((int)dueTime.TotalMilliseconds, cancellationToken); + + public static void Schedule(this ISystemClock clock, Action action, DateTime executeAt) => + clock.Schedule(action, executeAt.Subtract(clock.UtcNow)); + + public static void Schedule(this ISystemClock clock, Func action, TimeSpan dueTime) => + clock.Schedule(() => { _ = action(); }, dueTime); + + public static void Schedule(this ISystemClock clock, Func action, DateTime executeAt) => + clock.Schedule(() => { _ = action(); }, executeAt); + + public static ITimer Timer(this ISystemClock clock, Func action, TimeSpan dueTime, TimeSpan period) => + clock.Timer(() => { _ = action(); }, dueTime, period); + } +} \ No newline at end of file diff --git a/src/Foundatio/SystemClock/TestSystemClock.cs b/src/Foundatio/SystemClock/TestSystemClock.cs new file mode 100644 index 000000000..3984f6343 --- /dev/null +++ b/src/Foundatio/SystemClock/TestSystemClock.cs @@ -0,0 +1,84 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Foundatio.Utility { + internal class TestSystemClock : ITestSystemClock { + private DateTime _utcTime = DateTime.UtcNow; + private TimeSpan _offset = DateTimeOffset.Now.Offset; + private readonly ISystemClock _originalClock; + private readonly WorkScheduler _workScheduler; + + public TestSystemClock(ILoggerFactory loggerFactory) { + loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; + var logger = loggerFactory.CreateLogger("Foundatio.Utility.SystemClock"); + _workScheduler = new WorkScheduler(this, logger); + } + + public TestSystemClock(ISystemClock originalTime, ILoggerFactory loggerFactory) : this(loggerFactory) { + _originalClock = originalTime; + } + + public DateTime UtcNow => _utcTime; + public DateTime Now => new DateTime(_utcTime.Add(_offset).Ticks, DateTimeKind.Local); + public DateTimeOffset OffsetNow => new DateTimeOffset(Now.Ticks, _offset); + public DateTimeOffset OffsetUtcNow => new DateTimeOffset(_utcTime); + public TimeSpan Offset => _offset; + public void Schedule(Action action, TimeSpan dueTime) + => _workScheduler.Schedule(action, dueTime); + public ITimer Timer(Action action, TimeSpan dueTime, TimeSpan period) + => _workScheduler.Timer(action, dueTime, period); + + public void AddTime(TimeSpan amount) { + _utcTime = _utcTime.Add(amount); + OnChanged(); + } + + public void SetTime(DateTime time, TimeSpan? offset = null) { + if (time.Kind == DateTimeKind.Local) + _utcTime = time.ToUniversalTime(); + else if (time.Kind == DateTimeKind.Unspecified) + _utcTime = new DateTime(time.Ticks, DateTimeKind.Utc); + else + _utcTime = time; + + if (offset.HasValue) + _offset = offset.Value; + + OnChanged(); + } + + public WaitHandle NoScheduledWorkItemsDue => _workScheduler.NoWorkItemsDue; + + public void Sleep(int milliseconds) { + AddTime(TimeSpan.FromMilliseconds(milliseconds)); + Thread.Sleep(1); + } + + public Task SleepAsync(int milliseconds, CancellationToken ct = default) { + Sleep(milliseconds); + return Task.CompletedTask; + } + + public event EventHandler Changed; + public WorkScheduler Scheduler => _workScheduler; + + private void OnChanged() { + Changed?.Invoke(this, EventArgs.Empty); + } + + public void Dispose() { + if (_originalClock != null) + SystemClock.SetInstance(_originalClock, null); + } + + public static ITestSystemClock Install(ILoggerFactory loggerFactory = null) { + var testClock = new TestSystemClock(SystemClock.Instance, loggerFactory); + SystemClock.SetInstance(testClock, loggerFactory); + + return testClock; + } + } +} \ No newline at end of file diff --git a/src/Foundatio/SystemClock/WorkScheduler.cs b/src/Foundatio/SystemClock/WorkScheduler.cs new file mode 100644 index 000000000..96f42b9e5 --- /dev/null +++ b/src/Foundatio/SystemClock/WorkScheduler.cs @@ -0,0 +1,140 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Foundatio.Utility { + /// + /// Used for scheduling tasks to be completed in the future. Uses the SystemClock so that making use of this makes it easy to test time sensitive code. + /// This is the same as using the thread pool. Long running tasks should not be scheduled on this. Tasks should generally last no longer than a few seconds. + /// + public class WorkScheduler : IDisposable { + private readonly ILogger _logger; + private bool _isDisposed = false; + private readonly SortedQueue _workItems = new SortedQueue(); + private readonly TaskFactory _taskFactory; + private Task _workLoopTask; + private readonly object _lock = new object(); + private readonly AutoResetEvent _workItemScheduled = new AutoResetEvent(false); + private readonly ISystemClock _clock; + private readonly AutoResetEvent _noWorkItemsDue = new AutoResetEvent(false); + + public WorkScheduler(ISystemClock clock, ILogger logger = null) { + _clock = clock; + _logger = logger ?? NullLogger.Instance; + // limit scheduled task processing to 50 at a time + _taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(50)); + } + + public WaitHandle NoWorkItemsDue => _noWorkItemsDue; + + public ITimer Timer(Action action, TimeSpan dueTime, TimeSpan period) { + var executeAt = _clock.UtcNow.Add(dueTime); + if (executeAt.Kind != DateTimeKind.Utc) + executeAt = executeAt.ToUniversalTime(); + + _logger.LogTrace("Scheduling work due at {ExecuteAt} ({DueTime:g} from now)", executeAt, dueTime); + var workItem = new WorkItem(this) { Action = action, ExecuteAtUtc = executeAt, Period = period }; + _workItems.Enqueue(executeAt, workItem); + + EnsureWorkLoopRunning(); + _workItemScheduled.Set(); + + return workItem; + } + + public void Schedule(Action action, TimeSpan dueTime) { + var executeAt = _clock.UtcNow.Add(dueTime); + if (executeAt.Kind != DateTimeKind.Utc) + executeAt = executeAt.ToUniversalTime(); + + _logger.LogTrace("Scheduling work due at {ExecuteAt} ({DueTime:g} from now)", executeAt, dueTime); + _workItems.Enqueue(executeAt, new WorkItem(this) { + Action = action, + ExecuteAtUtc = executeAt + }); + + EnsureWorkLoopRunning(); + _workItemScheduled.Set(); + } + + private void EnsureWorkLoopRunning() { + if (_workLoopTask != null) + return; + + lock (_lock) { + if (_workLoopTask != null) + return; + + _logger.LogTrace("Starting work loop"); + if (_clock is ITestSystemClock testClock) + testClock.Changed += (s, e) => { _workItemScheduled.Set(); }; + + _workLoopTask = Task.Factory.StartNew(WorkLoop, TaskCreationOptions.LongRunning); + } + } + + private void WorkLoop() { + _logger.LogTrace("Work loop started"); + while (!_isDisposed) { + if (_workItems.TryDequeueIf(out var kvp, i => i.ExecuteAtUtc < _clock.UtcNow)) { + _logger.LogTrace("Starting work item due at {DueTime} current time {CurrentTime}", kvp.Key, _clock.UtcNow); + _ = _taskFactory.StartNew(() => { + var startTime = _clock.UtcNow; + kvp.Value.Action(); + if (kvp.Value.Period.HasValue) + Schedule(kvp.Value.Action, kvp.Value.Period.Value); + }); + continue; + } + + _noWorkItemsDue.Set(); + + if (kvp.Key != default) { + var delay = kvp.Key.Subtract(_clock.UtcNow); + _logger.LogTrace("No work items due, next due at {DueTime} ({Delay:g} from now)", kvp.Key, delay); + _workItemScheduled.WaitOne(delay); + } else { + _logger.LogTrace("No work items scheduled"); + _workItemScheduled.WaitOne(TimeSpan.FromMinutes(1)); + } + } + _logger.LogTrace("Work loop stopped"); + } + + public void Dispose() { + _isDisposed = true; + _workLoopTask.Wait(); + _workLoopTask = null; + } + + private class WorkItem : ITimer { + private readonly WorkScheduler _workScheduler; + + public WorkItem(WorkScheduler scheduler) { + _workScheduler = scheduler; + } + + public DateTime ExecuteAtUtc { get; set; } + public Action Action { get; set; } + public TimeSpan? Period { get; set; } + public bool IsCancelled { get; set; } + + public bool Change(TimeSpan dueTime, TimeSpan period) { + if (IsCancelled) + return false; + + IsCancelled = true; + + var workItem = _workScheduler.Timer(Action, dueTime, period); + // TODO: Figure out how to make it so the original ITimer instance can still have access to the currently scheduled workitem + return true; + } + + public void Dispose() { + IsCancelled = true; + } + } + } +} \ No newline at end of file diff --git a/src/Foundatio/Utility/ConnectionStringParser.cs b/src/Foundatio/Utility/ConnectionStringParser.cs index a4ef473f0..4d6dc7e12 100644 --- a/src/Foundatio/Utility/ConnectionStringParser.cs +++ b/src/Foundatio/Utility/ConnectionStringParser.cs @@ -1,11 +1,8 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using System.Globalization; -using System.Linq; using System.Text; using System.Text.RegularExpressions; -using Foundatio.Utility; namespace Foundatio.Utility { public static class ConnectionStringParser { diff --git a/src/Foundatio/Utility/IAsyncLifetime.cs b/src/Foundatio/Utility/IAsyncLifetime.cs index dce9921e2..18c88b540 100644 --- a/src/Foundatio/Utility/IAsyncLifetime.cs +++ b/src/Foundatio/Utility/IAsyncLifetime.cs @@ -1,5 +1,3 @@ -using System; -using System.Runtime.ExceptionServices; using System.Threading.Tasks; namespace Foundatio.Utility { diff --git a/src/Foundatio/Utility/LimitedConcurrencyLevelTaskScheduler.cs b/src/Foundatio/Utility/LimitedConcurrencyLevelTaskScheduler.cs new file mode 100644 index 000000000..c63255e69 --- /dev/null +++ b/src/Foundatio/Utility/LimitedConcurrencyLevelTaskScheduler.cs @@ -0,0 +1,112 @@ +// https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Foundatio.Utility { + // Provides a task scheduler that ensures a maximum concurrency level while + // running on top of the thread pool. + public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler { + // Indicates whether the current thread is processing work items. + [ThreadStatic] + private static bool _currentThreadIsProcessingItems; + + // The list of tasks to be executed + private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) + + // The maximum concurrency level allowed by this scheduler. + private readonly int _maxDegreeOfParallelism; + + // Indicates whether the scheduler is currently processing work items. + private int _delegatesQueuedOrRunning = 0; + + // Creates a new instance with the specified degree of parallelism. + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) { + if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); + _maxDegreeOfParallelism = maxDegreeOfParallelism; + } + + // Queues a task to the scheduler. + protected sealed override void QueueTask(Task task) { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (_tasks) { + _tasks.AddLast(task); + if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) { + ++_delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + // Inform the ThreadPool that there's work to be executed for this scheduler. + private void NotifyThreadPoolOfPendingWork() { + ThreadPool.UnsafeQueueUserWorkItem(_ => { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + _currentThreadIsProcessingItems = true; + try { + // Process all available items in the queue. + while (true) { + Task item; + lock (_tasks) { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (_tasks.Count == 0) { + --_delegatesQueuedOrRunning; + break; + } + + // Get the next item from the queue + item = _tasks.First.Value; + _tasks.RemoveFirst(); + } + + // Execute the task we pulled out of the queue + TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally { _currentThreadIsProcessingItems = false; } + }, null); + } + + // Attempts to execute the specified task on the current thread. + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { + // If this thread isn't already processing a task, we don't support inlining + if (!_currentThreadIsProcessingItems) return false; + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) + // Try to run the task. + if (TryDequeue(task)) + return TryExecuteTask(task); + else + return false; + else + return TryExecuteTask(task); + } + + // Attempt to remove a previously scheduled task from the scheduler. + protected sealed override bool TryDequeue(Task task) { + lock (_tasks) return _tasks.Remove(task); + } + + // Gets the maximum concurrency level supported by this scheduler. + public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } + + // Gets an enumerable of the tasks currently scheduled on this scheduler. + protected sealed override IEnumerable GetScheduledTasks() { + bool lockTaken = false; + try { + Monitor.TryEnter(_tasks, ref lockTaken); + if (lockTaken) return _tasks; + else throw new NotSupportedException(); + } finally { + if (lockTaken) Monitor.Exit(_tasks); + } + } + } +} \ No newline at end of file diff --git a/src/Foundatio/Utility/MaintenanceBase.cs b/src/Foundatio/Utility/MaintenanceBase.cs index 44e6f8237..ec185d3b2 100644 --- a/src/Foundatio/Utility/MaintenanceBase.cs +++ b/src/Foundatio/Utility/MaintenanceBase.cs @@ -19,6 +19,9 @@ protected void InitializeMaintenance(TimeSpan? dueTime = null, TimeSpan? interva } protected void ScheduleNextMaintenance(DateTime utcDate) { + if (_maintenanceTimer == null) + return; + _maintenanceTimer.ScheduleNext(utcDate); } diff --git a/src/Foundatio/Utility/OptionsBuilder.cs b/src/Foundatio/Utility/OptionsBuilder.cs index 3d4399d54..ec79cfa43 100644 --- a/src/Foundatio/Utility/OptionsBuilder.cs +++ b/src/Foundatio/Utility/OptionsBuilder.cs @@ -1,6 +1,4 @@ -using System; - -namespace Foundatio { +namespace Foundatio { public interface IOptionsBuilder { object Target { get; } } diff --git a/src/Foundatio/Utility/SharedOptions.cs b/src/Foundatio/Utility/SharedOptions.cs index 37b564bd1..e8490e040 100644 --- a/src/Foundatio/Utility/SharedOptions.cs +++ b/src/Foundatio/Utility/SharedOptions.cs @@ -1,11 +1,12 @@ -using System; using Foundatio.Serializer; +using Foundatio.Utility; using Microsoft.Extensions.Logging; namespace Foundatio { public class SharedOptions { public ISerializer Serializer { get; set; } public ILoggerFactory LoggerFactory { get; set; } + public ISystemClock SystemClock { get; set; } } public class SharedOptionsBuilder : OptionsBuilder @@ -20,5 +21,10 @@ public TBuilder LoggerFactory(ILoggerFactory loggerFactory) { Target.LoggerFactory = loggerFactory; return (TBuilder)this; } + + public TBuilder SystemClock(ISystemClock systemClock) { + Target.SystemClock = systemClock; + return (TBuilder)this; + } } } diff --git a/src/Foundatio/Utility/SortedQueue.cs b/src/Foundatio/Utility/SortedQueue.cs new file mode 100644 index 000000000..04917bc18 --- /dev/null +++ b/src/Foundatio/Utility/SortedQueue.cs @@ -0,0 +1,126 @@ +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; + +namespace Foundatio.Utility { + public class SortedQueue : IProducerConsumerCollection> + where TKey : IComparable { + + private readonly object _lock = new object(); + private readonly SortedDictionary _sortedDictionary = new SortedDictionary(); + + public SortedQueue() { } + + public SortedQueue(IEnumerable> collection) { + if (collection == null) + throw new ArgumentNullException(nameof(collection)); + + foreach (var kvp in collection) + _sortedDictionary.Add(kvp.Key, kvp.Value); + } + + public void Enqueue(TKey key, TValue value) { + Enqueue(new KeyValuePair(key, value)); + } + + public void Enqueue(KeyValuePair item) { + lock (_lock) + _sortedDictionary.Add(item.Key, item.Value); + } + + public bool TryDequeue(out KeyValuePair item) { + item = default; + + lock (_lock) { + if (_sortedDictionary.Count > 0) { + item = _sortedDictionary.First(); + return _sortedDictionary.Remove(item.Key); + } + } + + return false; + } + + public bool TryDequeueIf(out KeyValuePair item, Predicate condition) { + item = default; + + lock (_lock) { + if (_sortedDictionary.Count > 0) { + item = _sortedDictionary.First(); + if (!condition(item.Value)) + return false; + + return _sortedDictionary.Remove(item.Key); + } + } + + return false; + } + + public bool TryPeek(out KeyValuePair item) { + item = default; + + lock (_lock) { + if (_sortedDictionary.Count > 0) { + item = _sortedDictionary.First(); + return true; + } + } + + return false; + } + + public void Clear() { + lock (_lock) + _sortedDictionary.Clear(); + } + + public bool IsEmpty => Count == 0; + + public IEnumerator> GetEnumerator() { + var array = ToArray(); + return array.AsEnumerable().GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + void ICollection.CopyTo(Array array, int index) { + lock (_lock) + ((ICollection)_sortedDictionary).CopyTo(array, index); + } + + public int Count { + get { + lock (_lock) + return _sortedDictionary.Count; + } + } + + object ICollection.SyncRoot => _lock; + + bool ICollection.IsSynchronized => true; + + public void CopyTo(KeyValuePair[] array, int index) { + lock (_lock) + _sortedDictionary.CopyTo(array, index); + } + + bool IProducerConsumerCollection>.TryAdd(KeyValuePair item) { + Enqueue(item); + return true; + } + + bool IProducerConsumerCollection>.TryTake(out KeyValuePair item) { + return TryDequeue(out item); + } + + public KeyValuePair[] ToArray() { + lock (_lock) + return _sortedDictionary.ToArray(); + } + } +} \ No newline at end of file diff --git a/src/Foundatio/Utility/SystemClock.cs b/src/Foundatio/Utility/SystemClock.cs deleted file mode 100644 index 9b92ddc61..000000000 --- a/src/Foundatio/Utility/SystemClock.cs +++ /dev/null @@ -1,202 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Foundatio.Utility { - public interface ISystemClock { - DateTime Now(); - DateTime UtcNow(); - DateTimeOffset OffsetNow(); - DateTimeOffset OffsetUtcNow(); - void Sleep(int milliseconds); - Task SleepAsync(int milliseconds, CancellationToken ct); - TimeSpan TimeZoneOffset(); - } - - public class RealSystemClock : ISystemClock { - public static readonly RealSystemClock Instance = new(); - - public DateTime Now() => DateTime.Now; - public DateTime UtcNow() => DateTime.UtcNow; - public DateTimeOffset OffsetNow() => DateTimeOffset.Now; - public DateTimeOffset OffsetUtcNow() => DateTimeOffset.UtcNow; - public void Sleep(int milliseconds) => Thread.Sleep(milliseconds); - public Task SleepAsync(int milliseconds, CancellationToken ct) => Task.Delay(milliseconds, ct); - public TimeSpan TimeZoneOffset() => DateTimeOffset.Now.Offset; - } - - internal class TestSystemClockImpl : ISystemClock, IDisposable { - private DateTime? _fixedUtc = null; - private TimeSpan _offset = TimeSpan.Zero; - private TimeSpan _timeZoneOffset = DateTimeOffset.Now.Offset; - private bool _fakeSleep = false; - private ISystemClock _originalClock; - - public TestSystemClockImpl() {} - - public TestSystemClockImpl(ISystemClock originalTime) { - _originalClock = originalTime; - } - - public DateTime UtcNow() => (_fixedUtc ?? DateTime.UtcNow).Add(_offset); - public DateTime Now() => new(UtcNow().Ticks + TimeZoneOffset().Ticks, DateTimeKind.Local); - public DateTimeOffset OffsetNow() => new(UtcNow().Ticks + TimeZoneOffset().Ticks, TimeZoneOffset()); - public DateTimeOffset OffsetUtcNow() => new(UtcNow().Ticks, TimeSpan.Zero); - public TimeSpan TimeZoneOffset() => _timeZoneOffset; - - public void SetTimeZoneOffset(TimeSpan offset) => _timeZoneOffset = offset; - public void AddTime(TimeSpan amount) => _offset = _offset.Add(amount); - public void SubtractTime(TimeSpan amount) => _offset = _offset.Subtract(amount); - public void UseFakeSleep() => _fakeSleep = true; - public void UseRealSleep() => _fakeSleep = false; - - public void Sleep(int milliseconds) { - if (!_fakeSleep) { - Thread.Sleep(milliseconds); - return; - } - - AddTime(TimeSpan.FromMilliseconds(milliseconds)); - Thread.Sleep(1); - } - - public Task SleepAsync(int milliseconds, CancellationToken ct) { - if (!_fakeSleep) - return Task.Delay(milliseconds, ct); - - Sleep(milliseconds); - return Task.CompletedTask; - } - - public void Freeze() { - SetFrozenTime(Now()); - } - - public void Unfreeze() { - SetTime(Now()); - } - - public void SetFrozenTime(DateTime time) { - SetTime(time, true); - } - - public void SetTime(DateTime time, bool freeze = false) { - var now = DateTime.Now; - if (freeze) { - if (time.Kind == DateTimeKind.Unspecified) - time = time.ToUniversalTime(); - - if (time.Kind == DateTimeKind.Utc) { - _fixedUtc = time; - } else if (time.Kind == DateTimeKind.Local) { - _fixedUtc = new DateTime(time.Ticks - TimeZoneOffset().Ticks, DateTimeKind.Utc); - } - } else { - _fixedUtc = null; - - if (time.Kind == DateTimeKind.Unspecified) - time = time.ToUniversalTime(); - - if (time.Kind == DateTimeKind.Utc) { - _offset = now.ToUniversalTime().Subtract(time); - } else if (time.Kind == DateTimeKind.Local) { - _offset = now.Subtract(time); - } - } - } - - public void Dispose() { - if (_originalClock == null) - return; - - var originalClock = Interlocked.Exchange(ref _originalClock, null); - if (originalClock != null) - SystemClock.Instance = originalClock; - } - - public static TestSystemClockImpl Instance { - get { - if (!(SystemClock.Instance is TestSystemClockImpl testClock)) - throw new ArgumentException("You must first install TestSystemClock using TestSystemClock.Install"); - - return testClock; - } - } - } - - public class TestSystemClock { - public static void SetTimeZoneOffset(TimeSpan offset) => TestSystemClockImpl.Instance.SetTimeZoneOffset(offset); - public static void AddTime(TimeSpan amount) => TestSystemClockImpl.Instance.AddTime(amount); - public static void SubtractTime(TimeSpan amount) => TestSystemClockImpl.Instance.SubtractTime(amount); - public static void UseFakeSleep() => TestSystemClockImpl.Instance.UseFakeSleep(); - public static void UseRealSleep() => TestSystemClockImpl.Instance.UseRealSleep(); - public static void Freeze() => TestSystemClockImpl.Instance.Freeze(); - public static void Unfreeze() => TestSystemClockImpl.Instance.Unfreeze(); - public static void SetFrozenTime(DateTime time) => TestSystemClockImpl.Instance.SetFrozenTime(time); - public static void SetTime(DateTime time, bool freeze = false) => TestSystemClockImpl.Instance.SetTime(time, freeze); - - public static IDisposable Install() { - var testClock = new TestSystemClockImpl(SystemClock.Instance); - SystemClock.Instance = testClock; - - return testClock; - } - } - - public static class SystemClock { - private static AsyncLocal _instance; - - public static ISystemClock Instance { - get => _instance?.Value ?? RealSystemClock.Instance; - set { - if (_instance == null) - _instance = new AsyncLocal(); - - _instance.Value = value; - } - } - - public static DateTime Now => Instance.Now(); - public static DateTime UtcNow => Instance.UtcNow(); - public static DateTimeOffset OffsetNow => Instance.OffsetNow(); - public static DateTimeOffset OffsetUtcNow => Instance.OffsetUtcNow(); - public static TimeSpan TimeZoneOffset => Instance.TimeZoneOffset(); - public static void Sleep(int milliseconds) => Instance.Sleep(milliseconds); - public static Task SleepAsync(int milliseconds, CancellationToken cancellationToken = default) - => Instance.SleepAsync(milliseconds, cancellationToken); - - #region Extensions - - public static void Sleep(TimeSpan delay) - => Instance.Sleep(delay); - - public static Task SleepAsync(TimeSpan delay, CancellationToken cancellationToken = default) - => Instance.SleepAsync(delay, cancellationToken); - - public static Task SleepSafeAsync(int milliseconds, CancellationToken cancellationToken = default) { - return Instance.SleepSafeAsync(milliseconds, cancellationToken); - } - - public static Task SleepSafeAsync(TimeSpan delay, CancellationToken cancellationToken = default) - => Instance.SleepSafeAsync(delay, cancellationToken); - - #endregion - } - - public static class TimeExtensions { - public static void Sleep(this ISystemClock time, TimeSpan delay) - => time.Sleep((int)delay.TotalMilliseconds); - - public static Task SleepAsync(this ISystemClock time, TimeSpan delay, CancellationToken cancellationToken = default) - => time.SleepAsync((int)delay.TotalMilliseconds, cancellationToken); - - public static async Task SleepSafeAsync(this ISystemClock time, int milliseconds, CancellationToken cancellationToken = default) { - try { - await time.SleepAsync(milliseconds, cancellationToken).AnyContext(); - } catch (OperationCanceledException) {} - } - - public static Task SleepSafeAsync(this ISystemClock time, TimeSpan delay, CancellationToken cancellationToken = default) - => time.SleepSafeAsync((int)delay.TotalMilliseconds, cancellationToken); - } -} \ No newline at end of file diff --git a/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs b/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs index da4c4cec6..e56c19e87 100644 --- a/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs +++ b/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs @@ -1,5 +1,4 @@ -using System; -using System.Threading.Tasks; +using System.Threading.Tasks; using Foundatio.Caching; using Foundatio.Messaging; using Microsoft.Extensions.Logging; diff --git a/tests/Foundatio.Tests/Jobs/JobTests.cs b/tests/Foundatio.Tests/Jobs/JobTests.cs index ec352e7c9..746bd42ec 100644 --- a/tests/Foundatio.Tests/Jobs/JobTests.cs +++ b/tests/Foundatio.Tests/Jobs/JobTests.cs @@ -92,7 +92,7 @@ public async Task CanRunMultipleInstances() { [Fact] public async Task CanCancelContinuousJobs() { - using (TestSystemClock.Install()) { + using (var clock = TestSystemClock.Install()) { var job = new HelloWorldJob(Log); var timeoutCancellationTokenSource = new CancellationTokenSource(100); await job.RunContinuousAsync(TimeSpan.FromSeconds(1), 5, timeoutCancellationTokenSource.Token); @@ -101,7 +101,7 @@ public async Task CanCancelContinuousJobs() { timeoutCancellationTokenSource = new CancellationTokenSource(500); var runnerTask = new JobRunner(job, Log, instanceCount: 5, iterationLimit: 10000, interval: TimeSpan.FromMilliseconds(1)).RunAsync(timeoutCancellationTokenSource.Token); - await SystemClock.SleepAsync(TimeSpan.FromSeconds(1)); + await clock.SleepAsync(TimeSpan.FromSeconds(1)); await runnerTask; } } @@ -137,11 +137,9 @@ public async Task CanRunThrottledJobs() { [Fact] public async Task CanRunJobsWithInterval() { - using (TestSystemClock.Install()) { + using (var clock = TestSystemClock.Install()) { var time = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); - - TestSystemClock.SetFrozenTime(time); - TestSystemClock.UseFakeSleep(); + clock.SetTime(time); var job = new HelloWorldJob(Log); var interval = TimeSpan.FromHours(.75); @@ -155,11 +153,9 @@ public async Task CanRunJobsWithInterval() { [Fact] public async Task CanRunJobsWithIntervalBetweenFailingJob() { - using (TestSystemClock.Install()) { + using (var clock = TestSystemClock.Install()) { var time = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); - - TestSystemClock.SetFrozenTime(time); - TestSystemClock.UseFakeSleep(); + clock.SetTime(time); var job = new FailingJob(Log); var interval = TimeSpan.FromHours(.75); diff --git a/tests/Foundatio.Tests/Messaging/InMemoryMessageBusTests.cs b/tests/Foundatio.Tests/Messaging/InMemoryMessageBusTests.cs index d71668849..9566a58a3 100644 --- a/tests/Foundatio.Tests/Messaging/InMemoryMessageBusTests.cs +++ b/tests/Foundatio.Tests/Messaging/InMemoryMessageBusTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; using Foundatio.Messaging; using Xunit; @@ -64,6 +64,11 @@ public override Task CanSendDelayedMessageAsync() { return base.CanSendDelayedMessageAsync(); } + [Fact] + public override Task CanSendParallelDelayedMessagesAsync() { + return base.CanSendParallelDelayedMessagesAsync(); + } + [Fact] public override Task CanSubscribeConcurrentlyAsync() { return base.CanSubscribeConcurrentlyAsync(); diff --git a/tests/Foundatio.Tests/Metrics/StatsDMetricsTests.cs b/tests/Foundatio.Tests/Metrics/StatsDMetricsTests.cs index ce9adcdef..d7d53bfdb 100644 --- a/tests/Foundatio.Tests/Metrics/StatsDMetricsTests.cs +++ b/tests/Foundatio.Tests/Metrics/StatsDMetricsTests.cs @@ -1,14 +1,10 @@ using System; -using System.Collections.Generic; -using System.Diagnostics; using System.Linq; -using System.Threading; using System.Threading.Tasks; using Foundatio.Xunit; using Foundatio.Metrics; using Foundatio.Tests.Utility; using Foundatio.Utility; -using Microsoft.Extensions.Logging; using Xunit; using Xunit.Abstractions; diff --git a/tests/Foundatio.Tests/Serializer/JsonNetSerializerTests.cs b/tests/Foundatio.Tests/Serializer/JsonNetSerializerTests.cs index e20dda87e..1047ed899 100644 --- a/tests/Foundatio.Tests/Serializer/JsonNetSerializerTests.cs +++ b/tests/Foundatio.Tests/Serializer/JsonNetSerializerTests.cs @@ -1,5 +1,4 @@ -using System; -using Foundatio.Serializer; +using Foundatio.Serializer; using Foundatio.TestHarness.Utility; using Microsoft.Extensions.Logging; using Xunit; diff --git a/tests/Foundatio.Tests/Serializer/MessagePackSerializerTests.cs b/tests/Foundatio.Tests/Serializer/MessagePackSerializerTests.cs index 3c072aeff..554c085aa 100644 --- a/tests/Foundatio.Tests/Serializer/MessagePackSerializerTests.cs +++ b/tests/Foundatio.Tests/Serializer/MessagePackSerializerTests.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using BenchmarkDotNet.Attributes; -using BenchmarkDotNet.Exporters.Json; -using BenchmarkDotNet.Loggers; -using BenchmarkDotNet.Reports; -using Foundatio.Serializer; +using Foundatio.Serializer; using Foundatio.TestHarness.Utility; using Microsoft.Extensions.Logging; using Xunit; diff --git a/tests/Foundatio.Tests/Serializer/Utf8JsonSerializerTests.cs b/tests/Foundatio.Tests/Serializer/Utf8JsonSerializerTests.cs index 1c25b2773..96aa6992d 100644 --- a/tests/Foundatio.Tests/Serializer/Utf8JsonSerializerTests.cs +++ b/tests/Foundatio.Tests/Serializer/Utf8JsonSerializerTests.cs @@ -1,5 +1,4 @@ -using System; -using Foundatio.Serializer; +using Foundatio.Serializer; using Foundatio.TestHarness.Utility; using Microsoft.Extensions.Logging; using Xunit; diff --git a/tests/Foundatio.Tests/Utility/ConnectionStringParserTests.cs b/tests/Foundatio.Tests/Utility/ConnectionStringParserTests.cs index 1cf3a6e75..08f7956ba 100644 --- a/tests/Foundatio.Tests/Utility/ConnectionStringParserTests.cs +++ b/tests/Foundatio.Tests/Utility/ConnectionStringParserTests.cs @@ -1,8 +1,5 @@ using System; -using System.Collections.Generic; -using Foundatio.Xunit; using Xunit; -using Xunit.Abstractions; using Foundatio.Utility; namespace Foundatio.Tests.Utility { diff --git a/tests/Foundatio.Tests/Utility/SystemClockTests.cs b/tests/Foundatio.Tests/Utility/SystemClockTests.cs index 12513ba01..6fa845e4d 100644 --- a/tests/Foundatio.Tests/Utility/SystemClockTests.cs +++ b/tests/Foundatio.Tests/Utility/SystemClockTests.cs @@ -11,106 +11,87 @@ public class SystemClockTests : TestWithLoggingBase { public SystemClockTests(ITestOutputHelper output) : base(output) {} [Fact] - public void CanGetTime() { - using (TestSystemClock.Install()) { - var now = DateTime.UtcNow; - TestSystemClock.SetFrozenTime(now); - Assert.Equal(now, SystemClock.UtcNow); - Assert.Equal(now.ToLocalTime(), SystemClock.Now); - Assert.Equal(now, SystemClock.OffsetUtcNow); - Assert.Equal(now.ToLocalTime(), SystemClock.OffsetNow); - Assert.Equal(DateTimeOffset.Now.Offset, SystemClock.TimeZoneOffset); + public void CanSetTime() { + using (var clock = TestSystemClock.Install()) { + var now = DateTime.Now; + clock.SetTime(now); + Assert.Equal(now, clock.Now); + Assert.Equal(DateTimeOffset.Now.Offset, clock.Offset); + Assert.Equal(now.ToUniversalTime(), clock.UtcNow); + Assert.Equal(now.ToLocalTime(), clock.Now); + Assert.Equal(now.ToUniversalTime(), clock.OffsetUtcNow); + + // set using utc + now = DateTime.UtcNow; + clock.SetTime(now); + Assert.Equal(now, clock.UtcNow); + Assert.Equal(DateTimeOffset.Now.Offset, clock.Offset); + Assert.Equal(now.ToUniversalTime(), clock.UtcNow); + Assert.Equal(now.ToLocalTime(), clock.Now); + Assert.Equal(now.ToUniversalTime(), clock.OffsetUtcNow); } } [Fact] - public void CanSleep() { - using (TestSystemClock.Install()) { - var sw = Stopwatch.StartNew(); - SystemClock.Sleep(250); - sw.Stop(); - Assert.InRange(sw.ElapsedMilliseconds, 225, 400); - - TestSystemClock.UseFakeSleep(); - - var now = SystemClock.UtcNow; - sw.Restart(); - SystemClock.Sleep(1000); - sw.Stop(); - var afterSleepNow = SystemClock.UtcNow; - - Assert.InRange(sw.ElapsedMilliseconds, 0, 30); - Assert.True(afterSleepNow > now); - Assert.InRange(afterSleepNow.Subtract(now).TotalMilliseconds, 950, 1100); + public void CanSetTimeWithOffset() { + using (var clock = TestSystemClock.Install()) { + var now = DateTimeOffset.Now; + clock.SetTime(now.LocalDateTime, now.Offset); + Assert.Equal(now, clock.OffsetNow); + Assert.Equal(now.Offset, clock.Offset); + Assert.Equal(now.UtcDateTime, clock.UtcNow); + Assert.Equal(now.DateTime, clock.Now); + Assert.Equal(now.ToUniversalTime(), clock.OffsetUtcNow); + + clock.SetTime(now.UtcDateTime, now.Offset); + Assert.Equal(now, clock.OffsetNow); + Assert.Equal(now.Offset, clock.Offset); + Assert.Equal(now.UtcDateTime, clock.UtcNow); + Assert.Equal(now.DateTime, clock.Now); + Assert.Equal(now.ToUniversalTime(), clock.OffsetUtcNow); + + now = new DateTimeOffset(now.DateTime, TimeSpan.FromHours(1)); + clock.SetTime(now.LocalDateTime, now.Offset); + Assert.Equal(now, clock.OffsetNow); + Assert.Equal(now.Offset, clock.Offset); + Assert.Equal(now.UtcDateTime, clock.UtcNow); + Assert.Equal(now.DateTime, clock.Now); + Assert.Equal(now.ToUniversalTime(), clock.OffsetUtcNow); } } [Fact] - public async Task CanSleepAsync() { - using (TestSystemClock.Install()) { - var sw = Stopwatch.StartNew(); - await SystemClock.SleepAsync(250); - sw.Stop(); - - Assert.InRange(sw.ElapsedMilliseconds, 225, 3000); - - TestSystemClock.UseFakeSleep(); - - var now = SystemClock.UtcNow; - sw.Restart(); - await SystemClock.SleepAsync(1000); - sw.Stop(); - var afterSleepNow = SystemClock.UtcNow; - - Assert.InRange(sw.ElapsedMilliseconds, 0, 30); - Assert.True(afterSleepNow > now); - Assert.InRange(afterSleepNow.Subtract(now).TotalMilliseconds, 950, 5000); - } + public void CanRealSleep() { + var clock = new RealSystemClock(Log); + var sw = Stopwatch.StartNew(); + clock.Sleep(250); + sw.Stop(); + Assert.InRange(sw.ElapsedMilliseconds, 100, 500); } [Fact] - public void CanSetTimeZone() { - using (TestSystemClock.Install()) { - var utcNow = DateTime.UtcNow; - var now = new DateTime(utcNow.AddHours(1).Ticks, DateTimeKind.Local); - TestSystemClock.SetFrozenTime(utcNow); - TestSystemClock.SetTimeZoneOffset(TimeSpan.FromHours(1)); - - Assert.Equal(utcNow, SystemClock.UtcNow); - Assert.Equal(utcNow, SystemClock.OffsetUtcNow); - Assert.Equal(now, SystemClock.Now); - Assert.Equal(new DateTimeOffset(now.Ticks, TimeSpan.FromHours(1)), SystemClock.OffsetNow); - Assert.Equal(TimeSpan.FromHours(1), SystemClock.TimeZoneOffset); + public void CanTestSleep() { + using (var clock = TestSystemClock.Install(Log)) { + var startTime = clock.UtcNow; + clock.Sleep(250); + Assert.Equal(250, clock.UtcNow.Subtract(startTime).TotalMilliseconds); } } - [Fact] - public void CanSetLocalFixedTime() { - using (TestSystemClock.Install()) { - var now = DateTime.Now; - var utcNow = now.ToUniversalTime(); - TestSystemClock.SetFrozenTime(now); - - Assert.Equal(now, SystemClock.Now); - Assert.Equal(now, SystemClock.OffsetNow); - Assert.Equal(utcNow, SystemClock.UtcNow); - Assert.Equal(utcNow, SystemClock.OffsetUtcNow); - Assert.Equal(DateTimeOffset.Now.Offset, SystemClock.TimeZoneOffset); - } + public async Task CanRealSleepAsync() { + var clock = new RealSystemClock(Log); + var sw = Stopwatch.StartNew(); + await clock.SleepAsync(250); + sw.Stop(); + Assert.InRange(sw.ElapsedMilliseconds, 100, 500); } [Fact] - public void CanSetUtcFixedTime() { - using (TestSystemClock.Install()) { - var utcNow = DateTime.UtcNow; - var now = utcNow.ToLocalTime(); - TestSystemClock.SetFrozenTime(utcNow); - - Assert.Equal(now, SystemClock.Now); - Assert.Equal(now, SystemClock.OffsetNow); - Assert.Equal(utcNow, SystemClock.UtcNow); - Assert.Equal(utcNow, SystemClock.OffsetUtcNow); - Assert.Equal(DateTimeOffset.Now.Offset, SystemClock.TimeZoneOffset); + public async Task CanTestSleepAsync() { + using (var clock = TestSystemClock.Install(Log)) { + var startTime = clock.UtcNow; + await clock.SleepAsync(250); + Assert.Equal(250, clock.UtcNow.Subtract(startTime).TotalMilliseconds); } } } diff --git a/tests/Foundatio.Tests/Utility/WorkSchedulerTests.cs b/tests/Foundatio.Tests/Utility/WorkSchedulerTests.cs new file mode 100644 index 000000000..d98582d5f --- /dev/null +++ b/tests/Foundatio.Tests/Utility/WorkSchedulerTests.cs @@ -0,0 +1,96 @@ +using System; +using System.Threading; +using Foundatio.AsyncEx; +using Foundatio.Logging.Xunit; +using Foundatio.Utility; +using Microsoft.Extensions.Logging; +using Xunit; +using Xunit.Abstractions; + +namespace Foundatio.Tests.Utility { + public class WorkSchedulerTests : TestWithLoggingBase { + public WorkSchedulerTests(ITestOutputHelper output) : base(output) { } + + [Fact] + public void CanScheduleWork() { + Log.MinimumLevel = LogLevel.Trace; + _logger.LogTrace("Starting test on thread {ThreadId} time {Time}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); + using (var clock = TestSystemClock.Install(Log)) { + var countdown = new CountdownEvent(1); + SystemClock.Schedule(() => { + _logger.LogTrace("Doing work"); + countdown.Signal(); + }, TimeSpan.FromMinutes(5)); + clock.NoScheduledWorkItemsDue.WaitOne(TimeSpan.FromMilliseconds(100)); + Assert.Equal(1, countdown.CurrentCount); + _logger.LogTrace("Adding 6 minutes to current time."); + SystemClock.Sleep(TimeSpan.FromMinutes(6)); + clock.NoScheduledWorkItemsDue.WaitOne(TimeSpan.FromMilliseconds(100)); + countdown.Wait(); + Assert.Equal(0, countdown.CurrentCount); + } + _logger.LogTrace("Ending test on thread {ThreadId} time {Time}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); + } + + [Fact] + public void CanScheduleMultipleUnorderedWorkItems() { + Log.MinimumLevel = LogLevel.Trace; + _logger.LogTrace("Starting test on thread {ThreadId} time {Time}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); + using (var clock = TestSystemClock.Install(Log)) { + var work1Event = new ManualResetEvent(false); + var work2Event = new ManualResetEvent(false); + var work3Event = new ManualResetEvent(false); + + // schedule work due in 5 minutes + clock.Schedule(() => { + _logger.LogTrace("Doing 5 minute work"); + work1Event.Set(); + }, TimeSpan.FromMinutes(5)); + + // schedule work due in 1 second + clock.Schedule(() => { + _logger.LogTrace("Doing 1 second work"); + work2Event.Set(); + }, TimeSpan.FromSeconds(1)); + + // schedule work that is already past due + clock.Schedule(() => { + _logger.LogTrace("Doing past due work"); + work3Event.Set(); + }, TimeSpan.FromSeconds(-1)); + + // wait until we get signal that no items are currently due + _logger.LogTrace("Waiting for past due items to be started"); + Assert.True(clock.NoScheduledWorkItemsDue.WaitOne(), "Wait for all due work items to be scheduled"); + _logger.LogTrace("Waiting for past due work to be completed"); + // work can be done before we even get here, but wait one to be sure it's done + Assert.True(work3Event.WaitOne(TimeSpan.FromSeconds(1))); + + // verify additional work will not happen until time changes + Assert.False(work2Event.WaitOne(TimeSpan.FromMilliseconds(100))); + + _logger.LogTrace("Adding 1 minute to current time"); + // sleeping for a minute to make 1 second work due + clock.Sleep(TimeSpan.FromMinutes(1)); + Assert.True(clock.NoScheduledWorkItemsDue.WaitOne()); + Assert.True(work2Event.WaitOne(TimeSpan.FromSeconds(1))); + + _logger.LogTrace("Adding 5 minutes to current time"); + // sleeping for 5 minutes to make 5 minute work due + clock.Sleep(TimeSpan.FromMinutes(5)); + _logger.LogTrace("Waiting for no work items due"); + Assert.True(clock.NoScheduledWorkItemsDue.WaitOne()); + _logger.LogTrace("Waiting for 5 minute work to be completed"); + Assert.True(work1Event.WaitOne(TimeSpan.FromSeconds(1))); + } + _logger.LogTrace("Ending test on thread {ThreadId} time {Time}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); + } + + // long running work item won't block other work items from running, this is bad usage, but we should make sure it works. + // can run with no work scheduled + // work items that throw don't affect other work items + // do we need to do anything for unhandled exceptions or would users just use the normal unhandled exception handler since the tasks are just being run on the normal thread pool + // test overall performance, what is the throughput of this scheduler? Should be good since it's just using the normal thread pool, but we may want to increase max concurrent or base it on ThreadPool.GetMaxThreads to avoid thread starvation solely coming from Foundatio. + // verify multiple tests manipulating the systemclock don't affect each other + } +}