Skip to content

Commit

Permalink
Merge pull request #30 from TinkoffCreditSystems/feature/5
Browse files Browse the repository at this point in the history
Feature/5
  • Loading branch information
Bobreshovr authored Sep 6, 2019
2 parents 07cde64 + 0dc39bc commit c5188cd
Show file tree
Hide file tree
Showing 27 changed files with 690 additions and 8 deletions.
1 change: 1 addition & 0 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ steps:
- dotnet build Horarium.sln -c Release
- dotnet test Horarium.Test/Horarium.Test.csproj --no-restore /p:CollectCoverage=true /p:CoverletOutputFormat=opencover
- DataBase=MongoDB dotnet test Horarium.IntegrationTest/Horarium.IntegrationTest.csproj --no-restore
- DataBase=Memory dotnet test Horarium.IntegrationTest/Horarium.IntegrationTest.csproj --no-restore

- name: coverage
image: plugins/codecov
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Support Databases
| Database | Support |
| ---------- | ----------------------------------------------------------------------- |
| MongoDB | Yes |
| In Memory | Not yet [#5](https://github.com/TinkoffCreditSystems/Horarium/issues/5) |
| In Memory | Yes |
| PostgreSQL | Not yet [#6](https://github.com/TinkoffCreditSystems/Horarium/issues/6) |

## Getting started
Expand Down Expand Up @@ -44,7 +44,7 @@ public class TestJob : IJob<int>
Create ```HorariumServer``` and schedule ```TestJob```

```csharp
var horarium = new HorariumServer(MongoRepositoryFactory.Create("mongodb://localhost:27017/horarium"));
var horarium = new HorariumServer(new InMemoryRepository());
horarium.Start();
await horarium.Create<TestJob, int>(666)
.Schedule();
Expand Down Expand Up @@ -258,3 +258,4 @@ new HorariumSettings
MaxRepeatCount = 1
});
```

11 changes: 11 additions & 0 deletions src/Horarium.InMemory/Horarium.InMemory.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Horarium\Horarium.csproj" />
</ItemGroup>

</Project>
118 changes: 118 additions & 0 deletions src/Horarium.InMemory/InMemoryRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Horarium.Repository;

namespace Horarium.InMemory
{
public class InMemoryRepository : IJobRepository
{
private readonly OperationsProcessor _processor = new OperationsProcessor();

private readonly JobsStorage _storage = new JobsStorage();

private readonly ConcurrentDictionary<string, RecurrentJobSettings> _settingsStorage =
new ConcurrentDictionary<string, RecurrentJobSettings>();

public Task<JobDb> GetReadyJob(string machineName, TimeSpan obsoleteTime)
{
JobDb Query()
{
var job = _storage.FindReadyJob(obsoleteTime);
if (job == null) return null;

_storage.Remove(job.JobId);

job.Status = JobStatus.Executing;
job.ExecutedMachine = machineName;
job.StartedExecuting = DateTime.UtcNow;
job.CountStarted++;

_storage.Add(job);

return job;
}

return _processor.Execute(Query);
}

public Task AddJob(JobDb job)
{
return _processor.Execute(() => _storage.Add(job.Copy()));
}

public Task FailedJob(string jobId, Exception error)
{
return _processor.Execute(() =>
{
var job = _storage.GetById(jobId);
if (job == null) return;

_storage.Remove(job);

job.Status = JobStatus.Failed;
job.Error = error.Message + error.StackTrace;

_storage.Add(job);
});
}

public Task RemoveJob(string jobId)
{
return _processor.Execute(() => _storage.Remove(jobId));
}

public Task RepeatJob(string jobId, DateTime startAt, Exception error)
{
return _processor.Execute(() =>
{
var job = _storage.GetById(jobId);
if (job == null) return;

_storage.Remove(job);

job.Status = JobStatus.RepeatJob;
job.StartAt = startAt;
job.Error = error.Message + error.StackTrace;

_storage.Add(job);
});
}

public Task AddRecurrentJob(JobDb job)
{
return _processor.Execute(() =>
{
var foundJob = _storage.FindRecurrentJobToUpdate(job.JobKey) ?? job.Copy();

_storage.Remove(foundJob);

foundJob.Cron = job.Cron;
foundJob.StartAt = job.StartAt;

_storage.Add(foundJob);
});
}

public Task AddRecurrentJobSettings(RecurrentJobSettings settings)
{
_settingsStorage.AddOrUpdate(settings.JobKey, settings, (_, __) => settings);

return Task.CompletedTask;
}

public Task<string> GetCronForRecurrentJob(string jobKey)
{
if (!_settingsStorage.TryGetValue(jobKey, out var settings))
throw new Exception($"Settings for recurrent job (jobKey = {jobKey}) aren't found");

return Task.FromResult(settings.Cron);
}

public Task<Dictionary<JobStatus, int>> GetJobStatistic()
{
return Task.FromResult(_storage.GetStatistics());
}
}
}
13 changes: 13 additions & 0 deletions src/Horarium.InMemory/Indexes/Comparers/JobIdComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Collections.Generic;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes.Comparers
{
internal class JobIdComparer : IComparer<JobDb>
{
public int Compare(JobDb x, JobDb y)
{
return StaticJobIdComparer.Compare(x, y);
}
}
}
16 changes: 16 additions & 0 deletions src/Horarium.InMemory/Indexes/Comparers/JobKeyComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes.Comparers
{
internal class JobKeyComparer : IComparer<JobDb>
{
public int Compare(JobDb x, JobDb y)
{
var result = string.Compare(x.JobKey, y.JobKey, StringComparison.Ordinal);

return result == 0 ? StaticJobIdComparer.Compare(x, y) : result;
}
}
}
16 changes: 16 additions & 0 deletions src/Horarium.InMemory/Indexes/Comparers/StartAtComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Collections.Generic;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes.Comparers
{
internal class StartAtComparer : IComparer<JobDb>
{
public int Compare(JobDb x, JobDb y)
{
if (x.StartAt < y.StartAt) return -1;
if (x.StartAt > y.StartAt) return 1;

return StaticJobIdComparer.Compare(x, y);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Collections.Generic;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes.Comparers
{
internal class StartedExecutingComparer : IComparer<JobDb>
{
public int Compare(JobDb x, JobDb y)
{
if (x.StartedExecuting < y.StartedExecuting) return -1;
if (x.StartedExecuting > y.StartedExecuting) return 1;

return StaticJobIdComparer.Compare(x, y);
}
}
}
13 changes: 13 additions & 0 deletions src/Horarium.InMemory/Indexes/Comparers/StaticJobIdComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes.Comparers
{
internal static class StaticJobIdComparer
{
public static int Compare(JobDb x, JobDb y)
{
return string.Compare(x.JobId, y.JobId, StringComparison.Ordinal);
}
}
}
46 changes: 46 additions & 0 deletions src/Horarium.InMemory/Indexes/ExecutingJobIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using Horarium.InMemory.Indexes.Comparers;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes
{
internal class ExecutingJobIndex : IAddRemoveIndex
{
private readonly SortedSet<JobDb> _startedExecutingIndex = new SortedSet<JobDb>(new StartedExecutingComparer());

private readonly JobKeyIndex _jobKeyIndex = new JobKeyIndex();

public void Add(JobDb job)
{
if (job.Status != JobStatus.Executing) return;

_startedExecutingIndex.Add(job);
_jobKeyIndex.Add(job);
}

public void Remove(JobDb job)
{
_startedExecutingIndex.Remove(job);
_jobKeyIndex.Remove(job);
}

public int Count()
{
return _startedExecutingIndex.Count;
}

public JobDb GetJobKeyEqual(string jobKey)
{
return _jobKeyIndex.Get(jobKey);
}

public JobDb GetStartedExecutingLessThan(DateTime startedExecuting)
{
if (_startedExecutingIndex.Count != 0 && _startedExecutingIndex.Min.StartAt < startedExecuting)
return _startedExecutingIndex.Min;

return null;
}
}
}
27 changes: 27 additions & 0 deletions src/Horarium.InMemory/Indexes/FailedJobIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.Collections.Generic;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes
{
internal class FailedJobIndex : IAddRemoveIndex
{
private readonly Dictionary<string, JobDb> _index = new Dictionary<string, JobDb>();

public void Add(JobDb job)
{
if (job.Status != JobStatus.Failed) return;

_index.Add(job.JobId, job);
}

public void Remove(JobDb job)
{
_index.Remove(job.JobId);
}

public int Count()
{
return _index.Count;
}
}
}
13 changes: 13 additions & 0 deletions src/Horarium.InMemory/Indexes/IAddRemoveIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Horarium.Repository;

namespace Horarium.InMemory.Indexes
{
internal interface IAddRemoveIndex
{
void Add(JobDb job);

void Remove(JobDb job);

int Count();
}
}
45 changes: 45 additions & 0 deletions src/Horarium.InMemory/Indexes/JobKeyIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using Horarium.InMemory.Indexes.Comparers;
using Horarium.Repository;

namespace Horarium.InMemory.Indexes
{
internal class JobKeyIndex : IAddRemoveIndex
{
private readonly Dictionary<string, SortedSet<JobDb>> _index = new Dictionary<string, SortedSet<JobDb>>();

public void Add(JobDb job)
{
if (string.IsNullOrEmpty(job.JobKey)) return;

if (!_index.TryGetValue(job.JobKey, out var set))
_index[job.JobKey] = new SortedSet<JobDb>(new JobIdComparer()) {job};
else
set.Add(job);
}

public void Remove(JobDb job)
{
if (string.IsNullOrEmpty(job.JobKey)) return;

if (!_index.TryGetValue(job.JobKey, out var set)) return;

set.Remove(job);
}

public int Count()
{
throw new NotImplementedException();
}

public JobDb Get(string jobKey)
{
if (!_index.TryGetValue(jobKey, out var set)) return null;

if (set.Count != 0) return set.Min;

return null;
}
}
}
Loading

0 comments on commit c5188cd

Please sign in to comment.