Skip to content

Commit

Permalink
chore: Optimize blocking strategy query (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
wzh425 authored Jul 22, 2024
1 parent bcf6653 commit 0b9fbb9
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,60 +98,66 @@ public async Task StartTaskAsync(StartTaskDomainEvent @event)

var filterStatus = new List<TaskRunStatus>() { TaskRunStatus.Running, TaskRunStatus.WaitToRetry, TaskRunStatus.Idle, TaskRunStatus.WaitToRun };

var otherRunningTaskList = await _schedulerTaskRepository.GetPaginatedListAsync(t => filterStatus.Contains(t.TaskStatus) && t.JobId == task.JobId && t.Id != task.Id, 0, 10);

var allowEnqueue = true;

if (task.SchedulerTime == DateTimeOffset.MinValue)
{
task.UpdateTaskSchedulerTime(@event.Request.ExcuteTime);
}

if (otherRunningTaskList.Any())
switch (task.Job.ScheduleBlockStrategy)
{
switch (task.Job.ScheduleBlockStrategy)
{
case ScheduleBlockStrategyTypes.Serial:
if (otherRunningTaskList.Any(p => p.TaskStatus == TaskRunStatus.Running || p.TaskStatus == TaskRunStatus.WaitToRetry))
{
task.Wait();
allowEnqueue = false;
_logger.LogWarning("Other task is running, trigger serial block strategy, waiting now", WriterTypes.Server, task.Id, task.JobId);
}
break;
case ScheduleBlockStrategyTypes.Discard:
case ScheduleBlockStrategyTypes.Serial:
if (await _schedulerTaskRepository.AnyAsync(t => t.JobId == task.JobId && t.Id != task.Id && (t.TaskStatus == TaskRunStatus.Running || t.TaskStatus == TaskRunStatus.WaitToRetry)))
{
task.Wait();
allowEnqueue = false;
_logger.LogWarning("Other task is running, trigger serial block strategy, waiting now", WriterTypes.Server, task.Id, task.JobId);
}
else
{
task.TaskSchedule(@event.Request.OperatorId);
}
break;
case ScheduleBlockStrategyTypes.Parallel:
task.TaskSchedule(@event.Request.OperatorId);
break;
case ScheduleBlockStrategyTypes.Discard:
if (await _schedulerTaskRepository.AnyAsync(t => filterStatus.Contains(t.TaskStatus) && t.JobId == task.JobId && t.Id != task.Id))
{
task.Discard();
allowEnqueue = false;
_logger.LogWarning("Trigger discard block strategy, task failed", WriterTypes.Server, task.Id, task.JobId);
break;
case ScheduleBlockStrategyTypes.Cover:
foreach (var otherRunningTask in otherRunningTaskList)
}
else
{
task.TaskSchedule(@event.Request.OperatorId);
}
break;
case ScheduleBlockStrategyTypes.Cover:
var otherRunningTask = await _schedulerTaskRepository.FindAsync(t => filterStatus.Contains(t.TaskStatus) && t.JobId == task.JobId && t.Id != task.Id);
if (otherRunningTask != null)
{
if (otherRunningTask.TaskStatus == TaskRunStatus.Running)
{
if (otherRunningTask.TaskStatus == TaskRunStatus.Running)
{
await _serverManager.StopTask(otherRunningTask.Id, otherRunningTask.WorkerHost);
}
else
{
await _quartzUtils.RemoveDelayTask(otherRunningTask.Id, task.Job.Id);
}

otherRunningTask.TaskEnd(TaskRunStatus.Failure, "Stop by SchedulerBlockStrategy");
await _schedulerTaskRepository.UpdateAsync(otherRunningTask);

await _signalRUtils.SendNoticationByGroup(ConstStrings.GLOBAL_GROUP, SignalRMethodConsts.GET_NOTIFICATION, _mapper.Map<SchedulerTaskDto>(otherRunningTask));
_logger.LogWarning($"Trigger cover block strategy by TaskId: {task.Id}, task failed", WriterTypes.Server, otherRunningTask.Id, otherRunningTask.JobId);
await _serverManager.StopTask(otherRunningTask.Id, otherRunningTask.WorkerHost);
}
task.TaskSchedule(@event.Request.OperatorId);
break;
default:
task.TaskSchedule(@event.Request.OperatorId);
break;
}
}
else
{
task.TaskSchedule(@event.Request.OperatorId);
else
{
await _quartzUtils.RemoveDelayTask(otherRunningTask.Id, task.Job.Id);
}

otherRunningTask.TaskEnd(TaskRunStatus.Failure, "Stop by SchedulerBlockStrategy");
await _schedulerTaskRepository.UpdateAsync(otherRunningTask);

await _signalRUtils.SendNoticationByGroup(ConstStrings.GLOBAL_GROUP, SignalRMethodConsts.GET_NOTIFICATION, _mapper.Map<SchedulerTaskDto>(otherRunningTask));
_logger.LogWarning($"Trigger cover block strategy by TaskId: {task.Id}, task failed", WriterTypes.Server, otherRunningTask.Id, otherRunningTask.JobId);
}
task.TaskSchedule(@event.Request.OperatorId);
break;
default:
task.TaskSchedule(@event.Request.OperatorId);
break;
}

if (task.TaskStatus != TaskRunStatus.WaitToRun)
Expand Down
Loading

0 comments on commit 0b9fbb9

Please sign in to comment.