Skip to content

Commit

Permalink
优化注释
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Dec 30, 2021
1 parent 60c5fdb commit 1e96c93
Showing 1 changed file with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
@Slf4j
public class ClusterRuleEngine implements RuleEngine {

//调度器注册中心
private final SchedulerRegistry schedulerRegistry;

//任务快照仓库
private final TaskSnapshotRepository repository;

//调度器选择器
private final SchedulerSelector schedulerSelector;

public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotRepository repository) {
Expand All @@ -39,6 +42,7 @@ public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotReposi

@Override
public Mono<Void> shutdown(String instanceId) {
//从注册中心中获取调度器来停止指定的规则实例
return schedulerRegistry
.getSchedulers()
.flatMap(scheduler -> scheduler.shutdown(instanceId))
Expand All @@ -58,7 +62,7 @@ public Flux<Task> startRule(String instanceId, RuleModel model) {
.findByInstanceId(instanceId)
.flatMap(snapshot -> {
ScheduleJob job = jobs.get(snapshot.getJob().getNodeId());
//新的任务减少了task
//新的规则减少了任务,则尝试移除旧的任务
if (job == null) {
return this
.getTaskBySnapshot(snapshot)
Expand All @@ -81,6 +85,7 @@ public Flux<Task> startRule(String instanceId, RuleModel model) {
.thenReturn(task))));

})
//没有任务调度信息说明可能是新启动的规则
.switchIfEmpty(doStart(jobs.values()))
.doOnNext(startedTask::add)
.onErrorResume(err -> {
Expand All @@ -97,11 +102,14 @@ protected Flux<Task> doStart(Collection<ScheduleJob> jobs) {
.defer(() -> Flux
.fromIterable(jobs)
.flatMap(this::scheduleTask)
//将所有Task创建好之后再统一启动
.collectList()
.flatMapIterable(Function.identity())
//统一启动
.flatMap(task -> task.start().thenReturn(task)))
.collectList()
.map(Flux::fromIterable)
//保存快照信息
.flatMapMany(tasks -> repository
.saveTaskSnapshots(tasks.flatMap(Task::dump))
.thenMany(tasks));
Expand Down

0 comments on commit 1e96c93

Please sign in to comment.