Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/worker cluster #1003

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/CAServer.Application.Contracts/Commons/CronHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Text;

namespace CAServer.Commons;

public static class CronHelper
{
public static string GetCronExpression(int seconds)
{
if (seconds is <= 0 or >= 36000)
{
throw new ArgumentOutOfRangeException(nameof(seconds), "Invalid seconds");
}

var hours = (seconds / 3600) % 24;
var minutes = (seconds % 3600) / 60;
var secondNum = seconds % 60;
var builder = new StringBuilder();
builder.Append(secondNum > 0 ? $"*/{secondNum} " : "* ");
builder.Append(minutes > 0 ? $"*/{minutes} " : "* ");
builder.Append(hours > 0 ? $"*/{hours} " : "* ");
builder.Append("* * ?");

return builder.ToString();
}
}
1 change: 1 addition & 0 deletions src/CAServer.Application/CAServer.Application.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<PackageReference Include="MassTransit" Version="8.1.0" />
<PackageReference Include="MassTransit.Abstractions" Version="8.1.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.1.0" />
<PackageReference Include="Hangfire.Core" Version="1.8.3" />

</ItemGroup>

Expand Down
2 changes: 2 additions & 0 deletions src/CAServer.Application/CAServerApplicationModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using CAServer.Market;
using CAServer.Options;
using CAServer.RedPackage;
using CAServer.ScheduledTask;
using CAServer.Search;
using CAServer.Settings;
using CAServer.Signature;
Expand Down Expand Up @@ -184,6 +185,7 @@ public override void ConfigureServices(ServiceConfigurationContext context)
Configure<PortkeyV1Options>(configuration.GetSection("PortkeyV1"));
Configure<ETransferOptions>(configuration.GetSection("ETransfer"));
Configure<ActivityDateRangeOptions>(configuration.GetSection("ETransfer"));
Configure<ScheduledTaskOptions>(configuration.GetSection("ScheduledTask"));

AddMessagePushService(context, configuration);
}
Expand Down
43 changes: 43 additions & 0 deletions src/CAServer.Application/ScheduledTask/IInitWorkersService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using System.Collections.Generic;
using Hangfire;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;

namespace CAServer.ScheduledTask;

public interface IInitWorkersService
{
void InitRecurringWorkers();
}

public class InitWorkersService : IInitWorkersService, ISingletonDependency
{
private readonly IRecurringJobManager _recurringJobs;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<InitWorkersService> _logger;

public InitWorkersService(IRecurringJobManager recurringJobs,
IServiceProvider serviceProvider, ILogger<InitWorkersService> logger)
{
_recurringJobs = recurringJobs;
_serviceProvider = serviceProvider;
_logger = logger;
}

public void InitRecurringWorkers()
{
var tasks = _serviceProvider.GetRequiredService<IScheduledTaskManager>();
if (tasks == null || tasks.ScheduledTasks.IsNullOrEmpty())
{
_logger.LogWarning("There is no worker.");
}

foreach (var task in tasks.ScheduledTasks)
{
_logger.LogInformation("Add or update worker, name:{0}, corn:{1}", task.GetType().FullName, task.Corn);
_recurringJobs.AddOrUpdate(task.GetType().FullName, () => task.ExecuteAsync(), task.Corn);
}
}
}
62 changes: 62 additions & 0 deletions src/CAServer.Application/ScheduledTask/IScheduledTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Threading.Tasks;
using CAServer.Commons;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DistributedLocking;

namespace CAServer.ScheduledTask;

public abstract class ScheduledTaskBase : IScheduledTask
{
public IAbpLazyServiceProvider LazyServiceProvider { get; set; }
protected ILoggerFactory LoggerFactory => LazyServiceProvider.LazyGetRequiredService<ILoggerFactory>();

protected ILogger Logger => LazyServiceProvider.LazyGetService<ILogger>(provider =>
LoggerFactory?.CreateLogger(GetType().FullName) ?? NullLogger.Instance);

private IAbpDistributedLock DistributedLock => LazyServiceProvider.LazyGetRequiredService<IAbpDistributedLock>();

/// <summary>
/// seconds
/// </summary>
protected int Period { get; set; } = 15;

// The user can customize this attribute.
public string Corn
{
get => CronHelper.GetCronExpression(Period);
}

protected abstract Task DoWorkAsync();

public async Task ExecuteAsync()
{
await using var handle = await DistributedLock.TryAcquireAsync(this.GetType().FullName);

if (handle == null)
{
Logger.LogWarning("The worker is not finish. {0}", this.GetType().FullName);
return;
}

try
{
Logger.LogInformation("The worker will execute. {0}", this.GetType().FullName);
await DoWorkAsync();
Logger.LogInformation("The worker execute finish. {0}", this.GetType().FullName);
}
catch (Exception e)
{
Logger.LogError(e, "The worker execute error. {0}, errorMsg:{1}, stackTrace:{2}", this.GetType().FullName,
e.Message, e.StackTrace ?? "-");
}
}
}

public interface IScheduledTask : ISingletonDependency
{
string Corn { get; }
Task ExecuteAsync();
}
20 changes: 20 additions & 0 deletions src/CAServer.Application/ScheduledTask/IScheduledTaskManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Collections.Generic;
using Volo.Abp.DependencyInjection;

namespace CAServer.ScheduledTask;

public interface IScheduledTaskManager
{
public List<IScheduledTask> ScheduledTasks { get; }
public void Add(IScheduledTask task);
}

public class ScheduledTaskManager : IScheduledTaskManager, ISingletonDependency
{
public List<IScheduledTask> ScheduledTasks { get; set; } = new();

public void Add(IScheduledTask task)
{
ScheduledTasks.Add(task);
}
}
26 changes: 26 additions & 0 deletions src/CAServer.Application/ScheduledTask/ScheduledTaskExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp;

namespace CAServer.ScheduledTask;

public static class ScheduledTaskExtension
{
public static ApplicationInitializationContext AddWorker<TWorker>(
this ApplicationInitializationContext context, CancellationToken cancellationToken = default)
where TWorker : IScheduledTask
{
Check.NotNull(context, nameof(context));
if (!typeof(TWorker).IsAssignableTo<IScheduledTask>())
{
throw new AbpException(
$"Given type ({typeof(TWorker).AssemblyQualifiedName}) must implement the {typeof(IScheduledTask).AssemblyQualifiedName} interface, but it doesn't!");
}

var manager = context.ServiceProvider.GetRequiredService<IScheduledTaskManager>();
manager.Add((IScheduledTask)context.ServiceProvider.GetRequiredService(typeof(TWorker)));

return context;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace CAServer.ScheduledTask;

public class ScheduledTaskOptions
{
/// <summary>
/// seconds
/// </summary>
public int DefaultPeriod { get; set; } = 15;
}
15 changes: 5 additions & 10 deletions src/CAServer.ContractEventHandler.Core/Worker/ChainHeightWorker.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
using System.Threading.Tasks;
using CAServer.ContractEventHandler.Core.Application;
using Microsoft.Extensions.DependencyInjection;
using CAServer.ScheduledTask;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Threading;

namespace CAServer.ContractEventHandler.Core.Worker;

public class ChainHeightWorker : AsyncPeriodicBackgroundWorkerBase
public class ChainHeightWorker : ScheduledTaskBase
{
private readonly IChainHeightService _chainHeightService;

public ChainHeightWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,
IChainHeightService chainHeightService, IOptionsSnapshot<SyncChainHeightOptions> options) : base(timer,
serviceScopeFactory)
public ChainHeightWorker(IChainHeightService chainHeightService, IOptionsSnapshot<SyncChainHeightOptions> options)
{
Period = options.Value.Period;
_chainHeightService = chainHeightService;
Timer.Period = 1000 * options.Value.Period;
Timer.RunOnStart = true;
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
await _chainHeightService.SetChainHeightAsync();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,57 +1,33 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using CAServer.Common;
using CAServer.ContractEventHandler.Core.Application;
using CAServer.Nightingale;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using CAServer.ScheduledTask;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Threading;

namespace CAServer.ContractEventHandler.Core.Worker;

public class ContractSyncWorker : AsyncPeriodicBackgroundWorkerBase
public class ContractSyncWorker : ScheduledTaskBase
{
private readonly IContractAppService _contractAppService;
private readonly ContractSyncOptions _contractSyncOptions;
private readonly IBackgroundWorkerRegistrarProvider _registrarProvider;
private readonly N9EClientFactory _n9EClientFactory;

private readonly ILogger<ContractSyncWorker> _logger;
private const string WorkerName = "ContractSyncWorker";


public ContractSyncWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,

public ContractSyncWorker(
IContractAppService contractAppService, IOptions<ContractSyncOptions> workerOptions,
IBackgroundWorkerRegistrarProvider registrarProvider, IHostApplicationLifetime hostApplicationLifetime,
N9EClientFactory n9EClientFactory, ILogger<ContractSyncWorker> logger) : base(
timer,
serviceScopeFactory)
N9EClientFactory n9EClientFactory, ILogger<ContractSyncWorker> logger)
{
_contractSyncOptions = workerOptions.Value;
_contractAppService = contractAppService;
_registrarProvider = registrarProvider;
_n9EClientFactory = n9EClientFactory;
Timer.Period = 1000 * _contractSyncOptions.Sync;
Period = workerOptions.Value.Sync;
_logger = logger;

hostApplicationLifetime.ApplicationStopped.Register(() =>
{
_registrarProvider.TryRemoveWorkerNodeAsync(WorkerName);
});
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
if (!await _registrarProvider.RegisterUniqueWorkerNodeAsync(WorkerName, _contractSyncOptions.Sync,
_contractSyncOptions.WorkerNodeExpirationTime))
{
return;
}

var stopwatch = Stopwatch.StartNew();
try
{
Expand All @@ -66,7 +42,6 @@ protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext worker
finally
{
stopwatch.Stop();
await _registrarProvider.TryRemoveWorkerNodeAsync(WorkerName);
await _n9EClientFactory.TrackTransactionSync(N9EClientConstant.Biz, WorkerName,
stopwatch.ElapsedMilliseconds);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,54 +1,37 @@
using System.Diagnostics;
using System.Threading.Tasks;
using CAServer.Common;
using CAServer.ContractEventHandler.Core.Application;
using CAServer.Nightingale;
using CAServer.ScheduledTask;
using CAServer.UserAssets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Threading;

namespace CAServer.ContractEventHandler.Core.Worker;

public class NftTraitsProportionCalculateWorker : AsyncPeriodicBackgroundWorkerBase
public class NftTraitsProportionCalculateWorker : ScheduledTaskBase
{
private readonly IUserAssetsAppService _userAssetsAppService;
private readonly ContractSyncOptions _contractSyncOptions;
private readonly IBackgroundWorkerRegistrarProvider _registrarProvider;
private readonly N9EClientFactory _n9EClientFactory;
private readonly NFTTraitsSyncOptions _nftTraitsSyncOptions;
private readonly ILogger<NftTraitsProportionCalculateWorker> _logger;

private const string WorkerName = "NftTraitsProportionCalculateWorker";

public NftTraitsProportionCalculateWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,
public NftTraitsProportionCalculateWorker(
IUserAssetsAppService userAssetsAppService,
IBackgroundWorkerRegistrarProvider registrarProvider, N9EClientFactory n9EClientFactory,
N9EClientFactory n9EClientFactory,
IOptionsSnapshot<NFTTraitsSyncOptions> nftTraitsSyncOptions,
IOptionsSnapshot<ContractSyncOptions> contractSyncOptions, ILogger<NftTraitsProportionCalculateWorker> logger) : base(timer,
serviceScopeFactory)
ILogger<NftTraitsProportionCalculateWorker> logger)
{
_userAssetsAppService = userAssetsAppService;
_registrarProvider = registrarProvider;
_n9EClientFactory = n9EClientFactory;
_logger = logger;
_contractSyncOptions = contractSyncOptions.Value;
_nftTraitsSyncOptions = nftTraitsSyncOptions.Value;
Timer.Period = 1000 * _nftTraitsSyncOptions.Sync;
Period = nftTraitsSyncOptions.Value.Sync;
}


protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
_logger.LogInformation("Start Sync NFT traits begin");
if (!await _registrarProvider.RegisterUniqueWorkerNodeAsync(WorkerName, _nftTraitsSyncOptions.Sync,
_contractSyncOptions.WorkerNodeExpirationTime))
{
return;
}

var stopwatch = Stopwatch.StartNew();
try
{
Expand All @@ -57,7 +40,6 @@ protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext worker
finally
{
stopwatch.Stop();
await _registrarProvider.TryRemoveWorkerNodeAsync(WorkerName);
await _n9EClientFactory.TrackTransactionSync(N9EClientConstant.Biz, WorkerName,
stopwatch.ElapsedMilliseconds);
}
Expand Down
Loading
Loading