Skip to content

Commit

Permalink
后台添加任务时,不需要反馈客户端的,不用选择提交节点组
Browse files Browse the repository at this point in the history
  • Loading branch information
qq254963746 committed Mar 4, 2016
1 parent e771950 commit 44661c7
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 17 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ lts-{version}-bin的文件结构
```java
JobClient jobClient = new RetryJobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setClusterName("test_cluster");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
jobClient.start();

Expand Down Expand Up @@ -222,6 +223,7 @@ TaskTracker taskTracker = new TaskTracker();
taskTracker.setJobRunnerClass(MyJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
taskTracker.setNodeGroup("test_trade_TaskTracker");
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(20);
taskTracker.start();
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ public RestfulResponse jobAdd(JobQueueRequest request) {
try {
Assert.hasLength(request.getTaskId(), "taskId不能为空!");
Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!");
Assert.hasLength(request.getSubmitNodeGroup(), "submitNodeGroup不能为空!");
if(request.getNeedFeedback()){
Assert.hasLength(request.getSubmitNodeGroup(), "submitNodeGroup不能为空!");
}

if (StringUtils.isNotEmpty(request.getCronExpression())) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@
sweetAlert("优先级格式错误", "必须为数字,数值越小,优先级越大【必填】", "error");
return;
}
if (!params['submitNodeGroup']) {
sweetAlert("请选择提交节点组", "如果列表中没有,请在节点组管理中添加,并启动改节点。", "error");
if (!params['submitNodeGroup'] && params['needFeedback'] == 'true') {
sweetAlert("请选择提交节点组", "需要反馈客户端必须选择提交节点组,如果列表中没有,请在节点组管理中添加,并启动改节点。", "error");
return;
}
if (!params['taskTrackerNodeGroup']) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@
sweetAlert("优先级格式错误", "必须为数字,数值越小,优先级越大【必填】", "error");
return;
}
if (!params['submitNodeGroup']) {
sweetAlert("请选择提交节点组", "如果列表中没有,请在节点组管理中添加,并启动改节点。", "error");
if (!params['submitNodeGroup'] && params['needFeedback'] == 'true') {
sweetAlert("请选择提交节点组", "需要反馈客户端必须选择提交节点组,如果列表中没有,请在节点组管理中添加,并启动改节点。", "error");
return;
}
var extParams = params['extParams'];
Expand Down
4 changes: 2 additions & 2 deletions lts-admin/src/main/webapp/WEB-INF/views/templates/jobAdd.vm
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@
sweetAlert("优先级格式错误", "必须为数字,数值越小,优先级越大【必填】", "error");
return;
}
if (!params['submitNodeGroup']) {
sweetAlert("请选择提交节点组", "如果列表中没有,请在节点组管理中添加,并启动改节点。", "error");
if (!params['submitNodeGroup'] && params['needFeedback'] == 'true') {
sweetAlert("请选择提交节点组", "需要反馈客户端必须选择提交节点组,如果列表中没有,请在节点组管理中添加,并启动改节点。", "error");
return;
}
if (!params['taskTrackerNodeGroup']) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public Response submitJob(List<Job> jobs) {
response.setFailedJobs(jobs);
response.setCode(ResponseCode.SUBMIT_TOO_BUSY_AND_SAVE_FOR_LATER);
response.setMsg(response.getMsg() + ", submit too busy , save local fail store and send later !");
LOGGER.warn(JSON.toJSONString(response));
return response;
}
if (!response.isSuccess()) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.lts.jobtracker.complete.chain;

import com.lts.core.commons.utils.CollectionUtils;
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.constant.Constants;
import com.lts.core.domain.Action;
import com.lts.core.domain.Job;
Expand All @@ -21,6 +22,7 @@

/**
* 任务完成 Chian
*
* @author Robert HG ([email protected]) on 11/11/15.
*/
public class JobProcessChain implements JobCompletedChain {
Expand Down Expand Up @@ -82,7 +84,7 @@ private void singleResultsProcess(List<TaskTrackerJobResult> results) {

if (!needRetry(result)) {
// 这种情况下,如果要反馈客户端的,直接反馈客户端,不进行重试
if (result.getJobWrapper().getJob().isNeedFeedback()) {
if (isNeedFeedback(result.getJobWrapper().getJob())) {
clientNotifier.send(results);
} else {
jobFinishHandler.onComplete(results);
Expand Down Expand Up @@ -131,7 +133,7 @@ private void multiResultsProcess(List<TaskTrackerJobResult> results) {
retryResults = new ArrayList<TaskTrackerJobResult>();
}
retryResults.add(result);
} else if (result.getJobWrapper().getJob().isNeedFeedback()) {
} else if (isNeedFeedback(result.getJobWrapper().getJob())) {
// 需要反馈给客户端
if (feedbackResults == null) {
feedbackResults = new ArrayList<TaskTrackerJobResult>();
Expand All @@ -156,4 +158,12 @@ private void multiResultsProcess(List<TaskTrackerJobResult> results) {
retryHandler.onComplete(retryResults);
}

private boolean isNeedFeedback(Job job) {
if (job == null) {
return false;
}
// 容错,如果没有提交节点组,那么不反馈
return !StringUtils.isEmpty(job.getSubmitNodeGroup()) && job.isNeedFeedback();
}

}
2 changes: 2 additions & 0 deletions lts-startup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

lts-startup 模块主要是提供一些脚本启动的扩展
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.lts.tasktracker.TaskTracker;

import java.util.Map;

/**
* @author Robert HG ([email protected]) on 9/11/15.
*/
Expand All @@ -26,10 +24,6 @@ public static void start(String cfgPath) {
taskTracker = DefaultStartup.start(cfg);
}

for (Map.Entry<String, String> config : cfg.getConfigs().entrySet()) {
taskTracker.addConfig(config.getKey(), config.getValue());
}

taskTracker.start();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
Expand Down

0 comments on commit 44661c7

Please sign in to comment.