Skip to content

Commit

Permalink
Merge pull request #41 from Tolyandre/hosted-service
Browse files Browse the repository at this point in the history
Access MongoDB lasily. Wait jobs to stop gracefully.
  • Loading branch information
Bobreshovr authored Oct 23, 2020
2 parents 99858b7 + ca57a86 commit 8424c6b
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 42 deletions.
13 changes: 1 addition & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Add nuget-package Horarium.AspNetCore
dotnet add package Horarium.AspNetCore
```

Add ```Horarium``` in Asp.NET Core DI
Add ```Horarium Server```. This regiters Horarium as a [hosted service](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services), so .Net core runtime automatically starts and gracefully stops Horarium.

```csharp
public void ConfigureServices(IServiceCollection services)
Expand All @@ -69,17 +69,6 @@ public void ConfigureServices(IServiceCollection services)
}
```

Start HorariumServer in Asp.NET Core application

```csharp
public void Configure(IApplicationBuilder app)
{
//...
app.ApplicationServices.StartHorariumServer();
//...
}
```

Inject interface ```IHorarium``` into Controller

```csharp
Expand Down
1 change: 1 addition & 0 deletions src/Horarium.AspNetCore/Horarium.AspNetCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
</ItemGroup>

<ItemGroup>
Expand Down
29 changes: 29 additions & 0 deletions src/Horarium.AspNetCore/HorariumServerHostedService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Threading;
using System.Threading.Tasks;
using Horarium.Interfaces;
using Microsoft.Extensions.Hosting;

namespace Horarium.AspNetCore
{
public class HorariumServerHostedService : IHostedService
{
private readonly HorariumServer _horariumServer;

public HorariumServerHostedService(IHorarium horarium)
{
_horariumServer = (HorariumServer) horarium;
}

public Task StartAsync(CancellationToken cancellationToken)
{
_horariumServer.Start();

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return _horariumServer.Stop();
}
}
}
8 changes: 2 additions & 6 deletions src/Horarium.AspNetCore/RegistrationHorariumExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public static IServiceCollection AddHorariumServer(this IServiceCollection servi
return new HorariumServer(jobRepository, settings);
});

service.AddHostedService<HorariumServerHostedService>();

return service;
}

Expand All @@ -52,12 +54,6 @@ public static IServiceCollection AddHorariumClient(this IServiceCollection servi
return service;
}

public static void StartHorariumServer(this IServiceProvider serviceProvider)
{
var server = (HorariumServer)serviceProvider.GetService<IHorarium>();
server.Start();
}

private static void PrepareSettings(HorariumSettings settings, IServiceProvider serviceProvider)
{
if (settings.JobScopeFactory is DefaultJobScopeFactory)
Expand Down
26 changes: 22 additions & 4 deletions src/Horarium.Mongo/MongoClientProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ public sealed class MongoClientProvider : IMongoClientProvider
{
private readonly ConcurrentDictionary<Type, string> _collectionNameCache = new ConcurrentDictionary<Type, string>();

private readonly Lazy<MongoClient> _mongoClient;
private readonly MongoClient _mongoClient;
private readonly string _databaseName;
private bool _initialized;
private object _lockObject = new object();

public MongoClientProvider(MongoUrl mongoUrl)
{
_databaseName = mongoUrl.DatabaseName;
_mongoClient = new Lazy<MongoClient>(() => new MongoClient(mongoUrl));
CreateIndexes();
_mongoClient = new MongoClient(mongoUrl);
}

public MongoClientProvider(string mongoConnectionString): this (new MongoUrl(mongoConnectionString))
Expand All @@ -35,8 +36,25 @@ private string GetCollectionName(Type entityType)

public IMongoCollection<TEntity> GetCollection<TEntity>()
{
EnsureInitialized();

var collectionName = _collectionNameCache.GetOrAdd(typeof(TEntity), GetCollectionName);
return _mongoClient.Value.GetDatabase(_databaseName).GetCollection<TEntity>(collectionName);
return _mongoClient.GetDatabase(_databaseName).GetCollection<TEntity>(collectionName);
}

private void EnsureInitialized()
{
if (_initialized)
return;

lock (_lockObject)
{
if (_initialized)
return;

_initialized = true;
CreateIndexes();
}
}

private void CreateIndexes()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using Horarium.AspNetCore;
using Horarium.Interfaces;
using Horarium.Repository;
Expand All @@ -13,27 +14,23 @@ public class RegistrationHorariumExtensionTest
[Fact]
public void AddHorariumServer_DefaultSettings_ReplaceForAspNetCore()
{
var serviceMock = new Mock<IServiceCollection>();

var service = serviceMock.Object;

ServiceDescriptor descriptor = null;
var service = new ServiceCollection();

var settings = new HorariumSettings();

serviceMock.Setup(x => x.Add(It.IsAny<ServiceDescriptor>()))
.Callback<ServiceDescriptor>(x => descriptor = x);

service.AddHorariumServer(Mock.Of<IJobRepository>(),
provider => settings);

var descriptor = service.Single(x => x.ServiceType == typeof(IHorarium));
var horarium = descriptor.ImplementationFactory(Mock.Of<IServiceProvider>());

Assert.Equal(ServiceLifetime.Singleton, descriptor.Lifetime);
Assert.Equal(typeof(IHorarium), descriptor.ServiceType);
Assert.Equal(typeof(JobScopeFactory), settings.JobScopeFactory.GetType());
Assert.Equal(typeof(HorariumLogger), settings.Logger.GetType());
Assert.Equal(typeof(HorariumServer), horarium.GetType());

Assert.Contains(service, x => x.ImplementationType == typeof(HorariumServerHostedService));
}

[Fact]
Expand Down
11 changes: 11 additions & 0 deletions src/Horarium.Test/Mongo/MongoRepositoryFactoryTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using Horarium.Mongo;
using MongoDB.Driver;
using Xunit;
Expand All @@ -22,5 +23,15 @@ public void Create_NullMongoUrl_Exception()

Assert.Throws<ArgumentNullException>(() => MongoRepositoryFactory.Create(mongoUrl));
}

[Fact]
public async Task Create_WellFormedUrl_AccessMongoLazily()
{
const string stubMongoUrl = "mongodb://fake-url:27017/fake_database_name/?serverSelectionTimeoutMs=100";

var mongoRepository = MongoRepositoryFactory.Create(stubMongoUrl);

await Assert.ThrowsAsync<TimeoutException>(() => mongoRepository.GetJobStatistic());
}
}
}
73 changes: 70 additions & 3 deletions src/Horarium.Test/RunnerJobTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public async Task Start_Stop()
new HorariumSettings(),
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>());
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

// Act
runnerJobs.Start();
Expand Down Expand Up @@ -55,7 +56,8 @@ public async Task Start_RecoverAfterIntervalTimeout_AfterFailedDB()
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>());
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

jobRepositoryMock.SetupSequence(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()))
.ThrowsAsync(new Exception())
Expand Down Expand Up @@ -84,7 +86,8 @@ public async Task Start_WontRecoverBeforeIntervalTimeout_AfterFailedDB()
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>());
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

jobRepositoryMock.SetupSequence(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()))
.ThrowsAsync(new Exception())
Expand All @@ -97,5 +100,69 @@ public async Task Start_WontRecoverBeforeIntervalTimeout_AfterFailedDB()
// Assert
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Once);
}

[Fact]
public async Task Start_NextJobStarted_AddsJobTaskToUncompletedTasks()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();
var uncompletedTaskList = new Mock<IUncompletedTaskList>();

uncompletedTaskList.Setup(x => x.Add(It.IsAny<Task>()));

jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()))
.ReturnsAsync(new JobDb
{
JobType = typeof(object).ToString(),
});

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
new HorariumSettings
{
IntervalStartJob = TimeSpan.FromHours(1), // prevent second job from starting
},
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
uncompletedTaskList.Object);

// Act
runnerJobs.Start();
await Task.Delay(TimeSpan.FromSeconds(5));
await runnerJobs.Stop();

// Assert
uncompletedTaskList.Verify(x=>x.Add(It.IsAny<Task>()), Times.Once);
}

[Fact]
public async Task StopAsync_AwaitsWhenAllCompleted()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();
var uncompletedTaskList = new Mock<IUncompletedTaskList>();

var settings = new HorariumSettings
{
IntervalStartJob = TimeSpan.FromSeconds(2),
};

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
uncompletedTaskList.Object);

jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()));

// Act
runnerJobs.Start();
await Task.Delay(TimeSpan.FromSeconds(1));
await runnerJobs.Stop();

// Assert
uncompletedTaskList.Verify(x => x.WhenAllCompleted(), Times.Once);
}
}
}
82 changes: 82 additions & 0 deletions src/Horarium.Test/UncompletedTaskListTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Threading.Tasks;
using Horarium.Handlers;
using Xunit;

namespace Horarium.Test
{
public class UncompletedTaskListTests
{
private readonly UncompletedTaskList _uncompletedTaskList = new UncompletedTaskList();

[Fact]
public async Task Add_TaskWithAnyResult_KeepsTaskUntilCompleted()
{
var tcs1 = new TaskCompletionSource<bool>();
var tcs2 = new TaskCompletionSource<bool>();
var tcs3 = new TaskCompletionSource<bool>();

_uncompletedTaskList.Add(tcs1.Task);
_uncompletedTaskList.Add(tcs2.Task);
_uncompletedTaskList.Add(tcs3.Task);

Assert.Equal(3, _uncompletedTaskList.Count);

tcs1.SetResult(false);
await Task.Delay(TimeSpan.FromSeconds(1)); // give a chance to finish continuations
Assert.Equal(2, _uncompletedTaskList.Count);

tcs2.SetException(new ApplicationException());
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal(1, _uncompletedTaskList.Count);

tcs3.SetCanceled();
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal(0, _uncompletedTaskList.Count);
}

[Fact]
public async Task WhenAllCompleted_NoTasks_ReturnsCompletedTask()
{
// Act
var whenAll = _uncompletedTaskList.WhenAllCompleted();

// Assert
Assert.True(whenAll.IsCompletedSuccessfully);
await whenAll;
}

[Fact]
public async Task WhenAllCompleted_TaskNotCompleted_AwaitsUntilTaskCompleted()
{
// Arrange
var tcs = new TaskCompletionSource<bool>();
_uncompletedTaskList.Add(tcs.Task);

// Act
var whenAll = _uncompletedTaskList.WhenAllCompleted();

// Assert
await Task.Delay(TimeSpan.FromSeconds(1)); // give a chance to finish any running tasks
Assert.False(whenAll.IsCompleted);

tcs.SetResult(false);
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.True(whenAll.IsCompletedSuccessfully);

await whenAll;
}

[Fact]
public async Task WhenAllCompleted_TaskFaulted_DoesNotThrow()
{
// Arrange
_uncompletedTaskList.Add(Task.FromException(new ApplicationException()));

// Act
var whenAll = _uncompletedTaskList.WhenAllCompleted();

await whenAll;
}
}
}
Loading

0 comments on commit 8424c6b

Please sign in to comment.