diff --git a/.drone.yml b/.drone.yml index 388fd61..c408044 100644 --- a/.drone.yml +++ b/.drone.yml @@ -17,6 +17,7 @@ steps: - dotnet build Horarium.sln -c Release - dotnet test Horarium.Test/Horarium.Test.csproj --no-restore /p:CollectCoverage=true /p:CoverletOutputFormat=opencover - DataBase=MongoDB dotnet test Horarium.IntegrationTest/Horarium.IntegrationTest.csproj --no-restore + - DataBase=Memory dotnet test Horarium.IntegrationTest/Horarium.IntegrationTest.csproj --no-restore - name: coverage image: plugins/codecov diff --git a/README.md b/README.md index e9263ed..c0eed03 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Support Databases | Database | Support | | ---------- | ----------------------------------------------------------------------- | | MongoDB | Yes | -| In Memory | Not yet [#5](https://github.com/TinkoffCreditSystems/Horarium/issues/5) | +| In Memory | Yes | | PostgreSQL | Not yet [#6](https://github.com/TinkoffCreditSystems/Horarium/issues/6) | ## Getting started @@ -44,7 +44,7 @@ public class TestJob : IJob Create ```HorariumServer``` and schedule ```TestJob``` ```csharp -var horarium = new HorariumServer(MongoRepositoryFactory.Create("mongodb://localhost:27017/horarium")); +var horarium = new HorariumServer(new InMemoryRepository()); horarium.Start(); await horarium.Create(666) .Schedule(); @@ -258,3 +258,4 @@ new HorariumSettings MaxRepeatCount = 1 }); ``` + diff --git a/src/Horarium.InMemory/Horarium.InMemory.csproj b/src/Horarium.InMemory/Horarium.InMemory.csproj new file mode 100644 index 0000000..402f53c --- /dev/null +++ b/src/Horarium.InMemory/Horarium.InMemory.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.0 + + + + + + + diff --git a/src/Horarium.InMemory/InMemoryRepository.cs b/src/Horarium.InMemory/InMemoryRepository.cs new file mode 100644 index 0000000..639e239 --- /dev/null +++ b/src/Horarium.InMemory/InMemoryRepository.cs @@ -0,0 +1,118 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using Horarium.Repository; + +namespace Horarium.InMemory +{ + public class InMemoryRepository : IJobRepository + { + private readonly OperationsProcessor _processor = new OperationsProcessor(); + + private readonly JobsStorage _storage = new JobsStorage(); + + private readonly ConcurrentDictionary _settingsStorage = + new ConcurrentDictionary(); + + public Task GetReadyJob(string machineName, TimeSpan obsoleteTime) + { + JobDb Query() + { + var job = _storage.FindReadyJob(obsoleteTime); + if (job == null) return null; + + _storage.Remove(job.JobId); + + job.Status = JobStatus.Executing; + job.ExecutedMachine = machineName; + job.StartedExecuting = DateTime.UtcNow; + job.CountStarted++; + + _storage.Add(job); + + return job; + } + + return _processor.Execute(Query); + } + + public Task AddJob(JobDb job) + { + return _processor.Execute(() => _storage.Add(job.Copy())); + } + + public Task FailedJob(string jobId, Exception error) + { + return _processor.Execute(() => + { + var job = _storage.GetById(jobId); + if (job == null) return; + + _storage.Remove(job); + + job.Status = JobStatus.Failed; + job.Error = error.Message + error.StackTrace; + + _storage.Add(job); + }); + } + + public Task RemoveJob(string jobId) + { + return _processor.Execute(() => _storage.Remove(jobId)); + } + + public Task RepeatJob(string jobId, DateTime startAt, Exception error) + { + return _processor.Execute(() => + { + var job = _storage.GetById(jobId); + if (job == null) return; + + _storage.Remove(job); + + job.Status = JobStatus.RepeatJob; + job.StartAt = startAt; + job.Error = error.Message + error.StackTrace; + + _storage.Add(job); + }); + } + + public Task AddRecurrentJob(JobDb job) + { + return _processor.Execute(() => + { + var foundJob = _storage.FindRecurrentJobToUpdate(job.JobKey) ?? job.Copy(); + + _storage.Remove(foundJob); + + foundJob.Cron = job.Cron; + foundJob.StartAt = job.StartAt; + + _storage.Add(foundJob); + }); + } + + public Task AddRecurrentJobSettings(RecurrentJobSettings settings) + { + _settingsStorage.AddOrUpdate(settings.JobKey, settings, (_, __) => settings); + + return Task.CompletedTask; + } + + public Task GetCronForRecurrentJob(string jobKey) + { + if (!_settingsStorage.TryGetValue(jobKey, out var settings)) + throw new Exception($"Settings for recurrent job (jobKey = {jobKey}) aren't found"); + + return Task.FromResult(settings.Cron); + } + + public Task> GetJobStatistic() + { + return Task.FromResult(_storage.GetStatistics()); + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/Comparers/JobIdComparer.cs b/src/Horarium.InMemory/Indexes/Comparers/JobIdComparer.cs new file mode 100644 index 0000000..04bc9ca --- /dev/null +++ b/src/Horarium.InMemory/Indexes/Comparers/JobIdComparer.cs @@ -0,0 +1,13 @@ +using System.Collections.Generic; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes.Comparers +{ + internal class JobIdComparer : IComparer + { + public int Compare(JobDb x, JobDb y) + { + return StaticJobIdComparer.Compare(x, y); + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/Comparers/JobKeyComparer.cs b/src/Horarium.InMemory/Indexes/Comparers/JobKeyComparer.cs new file mode 100644 index 0000000..058bc1c --- /dev/null +++ b/src/Horarium.InMemory/Indexes/Comparers/JobKeyComparer.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes.Comparers +{ + internal class JobKeyComparer : IComparer + { + public int Compare(JobDb x, JobDb y) + { + var result = string.Compare(x.JobKey, y.JobKey, StringComparison.Ordinal); + + return result == 0 ? StaticJobIdComparer.Compare(x, y) : result; + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/Comparers/StartAtComparer.cs b/src/Horarium.InMemory/Indexes/Comparers/StartAtComparer.cs new file mode 100644 index 0000000..c126091 --- /dev/null +++ b/src/Horarium.InMemory/Indexes/Comparers/StartAtComparer.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes.Comparers +{ + internal class StartAtComparer : IComparer + { + public int Compare(JobDb x, JobDb y) + { + if (x.StartAt < y.StartAt) return -1; + if (x.StartAt > y.StartAt) return 1; + + return StaticJobIdComparer.Compare(x, y); + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/Comparers/StartedExecutingComparer.cs b/src/Horarium.InMemory/Indexes/Comparers/StartedExecutingComparer.cs new file mode 100644 index 0000000..b1b0ad6 --- /dev/null +++ b/src/Horarium.InMemory/Indexes/Comparers/StartedExecutingComparer.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes.Comparers +{ + internal class StartedExecutingComparer : IComparer + { + public int Compare(JobDb x, JobDb y) + { + if (x.StartedExecuting < y.StartedExecuting) return -1; + if (x.StartedExecuting > y.StartedExecuting) return 1; + + return StaticJobIdComparer.Compare(x, y); + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/Comparers/StaticJobIdComparer.cs b/src/Horarium.InMemory/Indexes/Comparers/StaticJobIdComparer.cs new file mode 100644 index 0000000..8f842fd --- /dev/null +++ b/src/Horarium.InMemory/Indexes/Comparers/StaticJobIdComparer.cs @@ -0,0 +1,13 @@ +using System; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes.Comparers +{ + internal static class StaticJobIdComparer + { + public static int Compare(JobDb x, JobDb y) + { + return string.Compare(x.JobId, y.JobId, StringComparison.Ordinal); + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/ExecutingJobIndex.cs b/src/Horarium.InMemory/Indexes/ExecutingJobIndex.cs new file mode 100644 index 0000000..d0740cb --- /dev/null +++ b/src/Horarium.InMemory/Indexes/ExecutingJobIndex.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using Horarium.InMemory.Indexes.Comparers; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes +{ + internal class ExecutingJobIndex : IAddRemoveIndex + { + private readonly SortedSet _startedExecutingIndex = new SortedSet(new StartedExecutingComparer()); + + private readonly JobKeyIndex _jobKeyIndex = new JobKeyIndex(); + + public void Add(JobDb job) + { + if (job.Status != JobStatus.Executing) return; + + _startedExecutingIndex.Add(job); + _jobKeyIndex.Add(job); + } + + public void Remove(JobDb job) + { + _startedExecutingIndex.Remove(job); + _jobKeyIndex.Remove(job); + } + + public int Count() + { + return _startedExecutingIndex.Count; + } + + public JobDb GetJobKeyEqual(string jobKey) + { + return _jobKeyIndex.Get(jobKey); + } + + public JobDb GetStartedExecutingLessThan(DateTime startedExecuting) + { + if (_startedExecutingIndex.Count != 0 && _startedExecutingIndex.Min.StartAt < startedExecuting) + return _startedExecutingIndex.Min; + + return null; + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/FailedJobIndex.cs b/src/Horarium.InMemory/Indexes/FailedJobIndex.cs new file mode 100644 index 0000000..03d0dcd --- /dev/null +++ b/src/Horarium.InMemory/Indexes/FailedJobIndex.cs @@ -0,0 +1,27 @@ +using System.Collections.Generic; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes +{ + internal class FailedJobIndex : IAddRemoveIndex + { + private readonly Dictionary _index = new Dictionary(); + + public void Add(JobDb job) + { + if (job.Status != JobStatus.Failed) return; + + _index.Add(job.JobId, job); + } + + public void Remove(JobDb job) + { + _index.Remove(job.JobId); + } + + public int Count() + { + return _index.Count; + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/IAddRemoveIndex.cs b/src/Horarium.InMemory/Indexes/IAddRemoveIndex.cs new file mode 100644 index 0000000..791b173 --- /dev/null +++ b/src/Horarium.InMemory/Indexes/IAddRemoveIndex.cs @@ -0,0 +1,13 @@ +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes +{ + internal interface IAddRemoveIndex + { + void Add(JobDb job); + + void Remove(JobDb job); + + int Count(); + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/JobKeyIndex.cs b/src/Horarium.InMemory/Indexes/JobKeyIndex.cs new file mode 100644 index 0000000..4e3e8f7 --- /dev/null +++ b/src/Horarium.InMemory/Indexes/JobKeyIndex.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using Horarium.InMemory.Indexes.Comparers; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes +{ + internal class JobKeyIndex : IAddRemoveIndex + { + private readonly Dictionary> _index = new Dictionary>(); + + public void Add(JobDb job) + { + if (string.IsNullOrEmpty(job.JobKey)) return; + + if (!_index.TryGetValue(job.JobKey, out var set)) + _index[job.JobKey] = new SortedSet(new JobIdComparer()) {job}; + else + set.Add(job); + } + + public void Remove(JobDb job) + { + if (string.IsNullOrEmpty(job.JobKey)) return; + + if (!_index.TryGetValue(job.JobKey, out var set)) return; + + set.Remove(job); + } + + public int Count() + { + throw new NotImplementedException(); + } + + public JobDb Get(string jobKey) + { + if (!_index.TryGetValue(jobKey, out var set)) return null; + + if (set.Count != 0) return set.Min; + + return null; + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/ReadyJobIndex.cs b/src/Horarium.InMemory/Indexes/ReadyJobIndex.cs new file mode 100644 index 0000000..5ef3213 --- /dev/null +++ b/src/Horarium.InMemory/Indexes/ReadyJobIndex.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using Horarium.InMemory.Indexes.Comparers; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes +{ + internal class ReadyJobIndex : IAddRemoveIndex + { + private readonly SortedSet _startAtIndex = new SortedSet(new StartAtComparer()); + + private readonly JobKeyIndex _jobKeyIndex = new JobKeyIndex(); + + public void Add(JobDb job) + { + if (job.Status != JobStatus.Ready) return; + + _startAtIndex.Add(job); + _jobKeyIndex.Add(job); + } + + public void Remove(JobDb job) + { + _startAtIndex.Remove(job); + _jobKeyIndex.Remove(job); + } + + public int Count() + { + return _startAtIndex.Count; + } + + public JobDb GetStartAtLessThan(DateTime startAt) + { + if (_startAtIndex.Count != 0 && _startAtIndex.Min.StartAt < startAt) + return _startAtIndex.Min; + + return null; + } + + public JobDb GetJobKeyEqual(string jobKey) + { + return _jobKeyIndex.Get(jobKey); + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/Indexes/RepeatJobIndex.cs b/src/Horarium.InMemory/Indexes/RepeatJobIndex.cs new file mode 100644 index 0000000..509bfd0 --- /dev/null +++ b/src/Horarium.InMemory/Indexes/RepeatJobIndex.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using Horarium.InMemory.Indexes.Comparers; +using Horarium.Repository; + +namespace Horarium.InMemory.Indexes +{ + internal class RepeatJobIndex : IAddRemoveIndex + { + private readonly SortedSet _startAtIndex = new SortedSet(new StartAtComparer()); + + public void Add(JobDb job) + { + if (job.Status != JobStatus.RepeatJob) return; + + _startAtIndex.Add(job); + } + + public void Remove(JobDb job) + { + _startAtIndex.Remove(job); + } + + public int Count() + { + return _startAtIndex.Count; + } + + public JobDb GetStartAtLessThan(DateTime startAt) + { + if (_startAtIndex.Count != 0 && _startAtIndex.Min.StartAt < startAt) + return _startAtIndex.Min; + + return null; + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/JobDbExtension.cs b/src/Horarium.InMemory/JobDbExtension.cs new file mode 100644 index 0000000..3b7d084 --- /dev/null +++ b/src/Horarium.InMemory/JobDbExtension.cs @@ -0,0 +1,31 @@ +using Horarium.Repository; + +namespace Horarium.InMemory +{ + internal static class JobDbExtension + { + public static JobDb Copy(this JobDb source) + { + return new JobDb + { + JobKey = source.JobKey, + JobId = source.JobId, + Status = source.Status, + JobType = source.JobType, + JobParamType = source.JobParamType, + JobParam = source.JobParam, + CountStarted = source.CountStarted, + StartedExecuting = source.StartedExecuting, + ExecutedMachine = source.ExecutedMachine, + StartAt = source.StartAt, + NextJob = source.NextJob?.Copy(), + Cron = source.Cron, + Delay = source.Delay, + ObsoleteInterval = source.ObsoleteInterval, + RepeatStrategy = source.RepeatStrategy, + MaxRepeatCount = source.MaxRepeatCount + + }; + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/JobsStorage.cs b/src/Horarium.InMemory/JobsStorage.cs new file mode 100644 index 0000000..b13143f --- /dev/null +++ b/src/Horarium.InMemory/JobsStorage.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Generic; +using Horarium.InMemory.Indexes; +using Horarium.Repository; + +namespace Horarium.InMemory +{ + internal class JobsStorage + { + private readonly Dictionary _jobs = new Dictionary(); + + private readonly ReadyJobIndex _readyJobIndex = new ReadyJobIndex(); + private readonly ExecutingJobIndex _executingJobIndex = new ExecutingJobIndex(); + private readonly RepeatJobIndex _repeatJobIndex = new RepeatJobIndex(); + private readonly FailedJobIndex _failedJobIndex = new FailedJobIndex(); + + private readonly List _indexes; + + public JobsStorage() + { + _indexes = new List + { + _readyJobIndex, + _executingJobIndex, + _repeatJobIndex, + _failedJobIndex + }; + } + + public void Add(JobDb job) + { + _jobs.Add(job.JobId, job); + + _indexes.ForEach(x => x.Add(job)); + } + + public void Remove(string jobId) + { + if (!_jobs.TryGetValue(jobId, out var job)) return; + + Remove(job); + } + + public void Remove(JobDb job) + { + _jobs.Remove(job.JobId); + + _indexes.ForEach(x => x.Remove(job)); + } + + public Dictionary GetStatistics() + { + return new Dictionary + { + {JobStatus.Ready, _readyJobIndex.Count()}, + {JobStatus.Executing, _executingJobIndex.Count()}, + {JobStatus.RepeatJob, _repeatJobIndex.Count()}, + {JobStatus.Failed, _failedJobIndex.Count()} + }; + } + + public JobDb GetById(string jobId) + { + if (!_jobs.TryGetValue(jobId, out var job)) return null; + + return job; + } + + public JobDb FindRecurrentJobToUpdate(string jobKey) + { + return _readyJobIndex.GetJobKeyEqual(jobKey) ?? _executingJobIndex.GetJobKeyEqual(jobKey); + } + + public JobDb FindReadyJob(TimeSpan obsoleteTime) + { + var now = DateTime.UtcNow; + + return _readyJobIndex.GetStartAtLessThan(now) ?? + _repeatJobIndex.GetStartAtLessThan(now) ?? + _executingJobIndex.GetStartedExecutingLessThan(now - obsoleteTime); + } + } +} \ No newline at end of file diff --git a/src/Horarium.InMemory/OperationsProcessor.cs b/src/Horarium.InMemory/OperationsProcessor.cs new file mode 100644 index 0000000..795f7b1 --- /dev/null +++ b/src/Horarium.InMemory/OperationsProcessor.cs @@ -0,0 +1,85 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Horarium.Repository; + +namespace Horarium.InMemory +{ + internal class OperationsProcessor + { + private readonly ConcurrentQueue _queue = new ConcurrentQueue(); + + public OperationsProcessor() + { + ProcessQueue(); + } + + private void ProcessQueue() + { + while (_queue.TryDequeue(out var operation)) + { + operation.Execute(); + } + + Task.Delay(TimeSpan.FromMilliseconds(10)) + .ContinueWith(x => ProcessQueue()); + } + + public Task Execute(Action command) + { + var wrapped = new CommandWrapper(command); + _queue.Enqueue(wrapped); + + return wrapped.Task; + } + + public Task Execute(Func query) + { + var wrapped = new QueryWrapper(query); + _queue.Enqueue(wrapped); + + return wrapped.Task; + } + + private abstract class BaseWrapper + { + protected readonly TaskCompletionSource CompletionSource = new TaskCompletionSource(); + + public abstract void Execute(); + + public Task Task => CompletionSource.Task; + } + + private class QueryWrapper : BaseWrapper + { + private readonly Func _query; + + public QueryWrapper(Func query) + { + _query = query; + } + + public override void Execute() + { + CompletionSource.SetResult(_query()?.Copy()); + } + } + + private class CommandWrapper : BaseWrapper + { + private readonly Action _command; + + public CommandWrapper(Action command) + { + _command = command; + } + + public override void Execute() + { + _command(); + + CompletionSource.SetResult(null); + } + } + } +} \ No newline at end of file diff --git a/src/Horarium.IntegrationTest/Horarium.IntegrationTest.csproj b/src/Horarium.IntegrationTest/Horarium.IntegrationTest.csproj index 2dc00b4..fe4f6df 100644 --- a/src/Horarium.IntegrationTest/Horarium.IntegrationTest.csproj +++ b/src/Horarium.IntegrationTest/Horarium.IntegrationTest.csproj @@ -18,6 +18,7 @@ + diff --git a/src/Horarium.IntegrationTest/IntegrationTestBase.cs b/src/Horarium.IntegrationTest/IntegrationTestBase.cs index 72817da..68baa1b 100644 --- a/src/Horarium.IntegrationTest/IntegrationTestBase.cs +++ b/src/Horarium.IntegrationTest/IntegrationTestBase.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using Horarium.InMemory; using Horarium.Mongo; using Horarium.Repository; @@ -23,6 +24,9 @@ protected HorariumServer CreateHorariumServer() case "MongoDB": jobRepository = MongoRepositoryFactory.Create(ConnectionMongo); break; + case "Memory": + jobRepository = new InMemoryRepository(); + break; default: throw new ArgumentOutOfRangeException(nameof(dataBase), dataBase, null); } diff --git a/src/Horarium.IntegrationTest/Jobs/SequenceJob.cs b/src/Horarium.IntegrationTest/Jobs/SequenceJob.cs new file mode 100644 index 0000000..d6f1c19 --- /dev/null +++ b/src/Horarium.IntegrationTest/Jobs/SequenceJob.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Horarium.Interfaces; + +namespace Horarium.IntegrationTest.Jobs +{ + public class SequenceJob : IJob + { + public static readonly ConcurrentQueue QueueJobs = new ConcurrentQueue(); + + public Task Execute(int param) + { + QueueJobs.Enqueue(param); + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/Horarium.IntegrationTest/Jobs/TestJobParam.cs b/src/Horarium.IntegrationTest/Jobs/TestJobParam.cs new file mode 100644 index 0000000..97aadc8 --- /dev/null +++ b/src/Horarium.IntegrationTest/Jobs/TestJobParam.cs @@ -0,0 +1,7 @@ +namespace Horarium.IntegrationTest.Jobs +{ + public class TestJobParam + { + public int Counter { get; set; } + } +} \ No newline at end of file diff --git a/src/Horarium.IntegrationTest/OneTimeJobTest.cs b/src/Horarium.IntegrationTest/OneTimeJobTest.cs index 9de19d0..c70fae0 100644 --- a/src/Horarium.IntegrationTest/OneTimeJobTest.cs +++ b/src/Horarium.IntegrationTest/OneTimeJobTest.cs @@ -12,8 +12,6 @@ public async Task OneTimeJob_RunAfterAdded() { var horarium = CreateHorariumServer(); - horarium.Start(); - await horarium.Create(5).Schedule(); await Task.Delay(1000); diff --git a/src/Horarium.IntegrationTest/SequenceJobTest.cs b/src/Horarium.IntegrationTest/SequenceJobTest.cs new file mode 100644 index 0000000..baef85a --- /dev/null +++ b/src/Horarium.IntegrationTest/SequenceJobTest.cs @@ -0,0 +1,31 @@ +using System.Threading.Tasks; +using Horarium.IntegrationTest.Jobs; +using Xunit; + +namespace Horarium.IntegrationTest +{ + [Collection(IntegrationTestCollection)] + public class SequenceJobTest : IntegrationTestBase + { + [Fact] + public async Task SequenceJobsAdded_ExecutedSequence() + { + var horarium = CreateHorariumServer(); + + await horarium.Create(0) + .Next(1) + .Next(2) + .Schedule(); + + await Task.Delay(1000); + + horarium.Dispose(); + + var queueJobs = SequenceJob.QueueJobs.ToArray(); + + Assert.NotEmpty(queueJobs); + + Assert.Equal(new [] {0,1,2}, queueJobs); + } + } +} \ No newline at end of file diff --git a/src/Horarium.IntegrationTest/TestParallelsWorkTwoManagers.cs b/src/Horarium.IntegrationTest/TestParallelsWorkTwoManagers.cs index 20674c6..d55431c 100644 --- a/src/Horarium.IntegrationTest/TestParallelsWorkTwoManagers.cs +++ b/src/Horarium.IntegrationTest/TestParallelsWorkTwoManagers.cs @@ -2,6 +2,7 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using Horarium.InMemory; using Horarium.IntegrationTest.Jobs; using Xunit; @@ -22,10 +23,8 @@ public async Task TestParallels() await Task.Delay(10); } - await Task.Delay(10000); await Task.Delay(10000); - firstScheduler.Dispose(); firstScheduler.Dispose(); secondScheduler.Dispose(); diff --git a/src/Horarium.sln b/src/Horarium.sln index e91dc68..2a56923 100644 --- a/src/Horarium.sln +++ b/src/Horarium.sln @@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Horarium.Mongo", "Horarium. EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Horarium.AspNetCore", "Horarium.AspNetCore\Horarium.AspNetCore.csproj", "{BDA3804A-15B5-46A3-8F09-FD991FAAF1D0}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Horarium.InMemory", "Horarium.InMemory\Horarium.InMemory.csproj", "{567A7F77-22BB-43D0-AE4B-AF929E7FD826}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -45,6 +47,10 @@ Global {BDA3804A-15B5-46A3-8F09-FD991FAAF1D0}.Debug|Any CPU.Build.0 = Debug|Any CPU {BDA3804A-15B5-46A3-8F09-FD991FAAF1D0}.Release|Any CPU.ActiveCfg = Release|Any CPU {BDA3804A-15B5-46A3-8F09-FD991FAAF1D0}.Release|Any CPU.Build.0 = Release|Any CPU + {567A7F77-22BB-43D0-AE4B-AF929E7FD826}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {567A7F77-22BB-43D0-AE4B-AF929E7FD826}.Debug|Any CPU.Build.0 = Debug|Any CPU + {567A7F77-22BB-43D0-AE4B-AF929E7FD826}.Release|Any CPU.ActiveCfg = Release|Any CPU + {567A7F77-22BB-43D0-AE4B-AF929E7FD826}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Horarium/Handlers/ExecutorJob.cs b/src/Horarium/Handlers/ExecutorJob.cs index f4bba52..94b5dec 100644 --- a/src/Horarium/Handlers/ExecutorJob.cs +++ b/src/Horarium/Handlers/ExecutorJob.cs @@ -63,7 +63,7 @@ private async Task ExecuteJob(JobMetadata jobMetadata) if (jobMetadata.NextJob != null) { - jobMetadata.NextJob.StartAt = DateTime.Now + jobMetadata.NextJob.Delay.GetValueOrDefault(); + jobMetadata.NextJob.StartAt = DateTime.UtcNow + jobMetadata.NextJob.Delay.GetValueOrDefault(); await _jobRepository.AddJob(JobDb.CreatedJobDb(jobMetadata.NextJob, _settings.JsonSerializerSettings)); @@ -158,7 +158,7 @@ private DateTime GetNextStartFailedJobTime(JobMetadata jobMetadata) strategy = _settings.FailedRepeatStrategy; } - return DateTime.Now + strategy.GetNextStartInterval(jobMetadata.CountStarted); + return DateTime.UtcNow + strategy.GetNextStartInterval(jobMetadata.CountStarted); } private async Task ScheduleRecurrentNextTime(JobMetadata metadata)