From 44661c789fb81eb4c0ef86a082af72ab493d8d2f Mon Sep 17 00:00:00 2001 From: hugui <254963746@qq.com> Date: Fri, 4 Mar 2016 20:18:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8E=E5=8F=B0=E6=B7=BB=E5=8A=A0=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=97=B6,=E4=B8=8D=E9=9C=80=E8=A6=81=E5=8F=8D?= =?UTF-8?q?=E9=A6=88=E5=AE=A2=E6=88=B7=E7=AB=AF=E7=9A=84,=E4=B8=8D?= =?UTF-8?q?=E7=94=A8=E9=80=89=E6=8B=A9=E6=8F=90=E4=BA=A4=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E7=BB=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 ++ .../web/controller/api/JobQueueApiController.java | 4 +++- .../webapp/WEB-INF/views/templates/cronJobQueue.vm | 4 ++-- .../WEB-INF/views/templates/executableJobQueue.vm | 4 ++-- .../main/webapp/WEB-INF/views/templates/jobAdd.vm | 4 ++-- .../java/com/lts/jobclient/RetryJobClient.java | 2 -- .../jobtracker/complete/chain/JobProcessChain.java | 14 ++++++++++++-- lts-startup/README.md | 2 ++ .../java/com/lts/startup/TaskTrackerStartup.java | 6 ------ 9 files changed, 25 insertions(+), 17 deletions(-) create mode 100644 lts-startup/README.md diff --git a/README.md b/README.md index 6370db163..f04b6235c 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,7 @@ lts-{version}-bin的文件结构 ```java JobClient jobClient = new RetryJobClient(); jobClient.setNodeGroup("test_jobClient"); +jobClient.setClusterName("test_cluster"); jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); jobClient.start(); @@ -222,6 +223,7 @@ TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(MyJobRunner.class); taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); +taskTracker.setClusterName("test_cluster"); taskTracker.setWorkThreads(20); taskTracker.start(); ``` diff --git a/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java b/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java index 79ce7002e..db40b65e6 100644 --- a/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java +++ b/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java @@ -276,7 +276,9 @@ public RestfulResponse jobAdd(JobQueueRequest request) { try { Assert.hasLength(request.getTaskId(), "taskId不能为空!"); Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!"); - Assert.hasLength(request.getSubmitNodeGroup(), "submitNodeGroup不能为空!"); + if(request.getNeedFeedback()){ + Assert.hasLength(request.getSubmitNodeGroup(), "submitNodeGroup不能为空!"); + } if (StringUtils.isNotEmpty(request.getCronExpression())) { try { diff --git a/lts-admin/src/main/webapp/WEB-INF/views/templates/cronJobQueue.vm b/lts-admin/src/main/webapp/WEB-INF/views/templates/cronJobQueue.vm index 52c10b24b..4c4b8fab8 100644 --- a/lts-admin/src/main/webapp/WEB-INF/views/templates/cronJobQueue.vm +++ b/lts-admin/src/main/webapp/WEB-INF/views/templates/cronJobQueue.vm @@ -314,8 +314,8 @@ sweetAlert("优先级格式错误", "必须为数字,数值越小,优先级越大【必填】", "error"); return; } - if (!params['submitNodeGroup']) { - sweetAlert("请选择提交节点组", "如果列表中没有,请在节点组管理中添加,并启动改节点。", "error"); + if (!params['submitNodeGroup'] && params['needFeedback'] == 'true') { + sweetAlert("请选择提交节点组", "需要反馈客户端必须选择提交节点组,如果列表中没有,请在节点组管理中添加,并启动改节点。", "error"); return; } if (!params['taskTrackerNodeGroup']) { diff --git a/lts-admin/src/main/webapp/WEB-INF/views/templates/executableJobQueue.vm b/lts-admin/src/main/webapp/WEB-INF/views/templates/executableJobQueue.vm index cd2cb51b7..0612f0dd6 100644 --- a/lts-admin/src/main/webapp/WEB-INF/views/templates/executableJobQueue.vm +++ b/lts-admin/src/main/webapp/WEB-INF/views/templates/executableJobQueue.vm @@ -321,8 +321,8 @@ sweetAlert("优先级格式错误", "必须为数字,数值越小,优先级越大【必填】", "error"); return; } - if (!params['submitNodeGroup']) { - sweetAlert("请选择提交节点组", "如果列表中没有,请在节点组管理中添加,并启动改节点。", "error"); + if (!params['submitNodeGroup'] && params['needFeedback'] == 'true') { + sweetAlert("请选择提交节点组", "需要反馈客户端必须选择提交节点组,如果列表中没有,请在节点组管理中添加,并启动改节点。", "error"); return; } var extParams = params['extParams']; diff --git a/lts-admin/src/main/webapp/WEB-INF/views/templates/jobAdd.vm b/lts-admin/src/main/webapp/WEB-INF/views/templates/jobAdd.vm index baecf3667..2ce8ade1d 100644 --- a/lts-admin/src/main/webapp/WEB-INF/views/templates/jobAdd.vm +++ b/lts-admin/src/main/webapp/WEB-INF/views/templates/jobAdd.vm @@ -186,8 +186,8 @@ sweetAlert("优先级格式错误", "必须为数字,数值越小,优先级越大【必填】", "error"); return; } - if (!params['submitNodeGroup']) { - sweetAlert("请选择提交节点组", "如果列表中没有,请在节点组管理中添加,并启动改节点。", "error"); + if (!params['submitNodeGroup'] && params['needFeedback'] == 'true') { + sweetAlert("请选择提交节点组", "需要反馈客户端必须选择提交节点组,如果列表中没有,请在节点组管理中添加,并启动改节点。", "error"); return; } if (!params['taskTrackerNodeGroup']) { diff --git a/lts-jobclient/src/main/java/com/lts/jobclient/RetryJobClient.java b/lts-jobclient/src/main/java/com/lts/jobclient/RetryJobClient.java index 0b06e9589..e3c20b236 100644 --- a/lts-jobclient/src/main/java/com/lts/jobclient/RetryJobClient.java +++ b/lts-jobclient/src/main/java/com/lts/jobclient/RetryJobClient.java @@ -65,8 +65,6 @@ public Response submitJob(List jobs) { response.setFailedJobs(jobs); response.setCode(ResponseCode.SUBMIT_TOO_BUSY_AND_SAVE_FOR_LATER); response.setMsg(response.getMsg() + ", submit too busy , save local fail store and send later !"); - LOGGER.warn(JSON.toJSONString(response)); - return response; } if (!response.isSuccess()) { try { diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/complete/chain/JobProcessChain.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/complete/chain/JobProcessChain.java index 334166806..34dc9ff8f 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/complete/chain/JobProcessChain.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/complete/chain/JobProcessChain.java @@ -1,6 +1,7 @@ package com.lts.jobtracker.complete.chain; import com.lts.core.commons.utils.CollectionUtils; +import com.lts.core.commons.utils.StringUtils; import com.lts.core.constant.Constants; import com.lts.core.domain.Action; import com.lts.core.domain.Job; @@ -21,6 +22,7 @@ /** * 任务完成 Chian + * * @author Robert HG (254963746@qq.com) on 11/11/15. */ public class JobProcessChain implements JobCompletedChain { @@ -82,7 +84,7 @@ private void singleResultsProcess(List results) { if (!needRetry(result)) { // 这种情况下,如果要反馈客户端的,直接反馈客户端,不进行重试 - if (result.getJobWrapper().getJob().isNeedFeedback()) { + if (isNeedFeedback(result.getJobWrapper().getJob())) { clientNotifier.send(results); } else { jobFinishHandler.onComplete(results); @@ -131,7 +133,7 @@ private void multiResultsProcess(List results) { retryResults = new ArrayList(); } retryResults.add(result); - } else if (result.getJobWrapper().getJob().isNeedFeedback()) { + } else if (isNeedFeedback(result.getJobWrapper().getJob())) { // 需要反馈给客户端 if (feedbackResults == null) { feedbackResults = new ArrayList(); @@ -156,4 +158,12 @@ private void multiResultsProcess(List results) { retryHandler.onComplete(retryResults); } + private boolean isNeedFeedback(Job job) { + if (job == null) { + return false; + } + // 容错,如果没有提交节点组,那么不反馈 + return !StringUtils.isEmpty(job.getSubmitNodeGroup()) && job.isNeedFeedback(); + } + } diff --git a/lts-startup/README.md b/lts-startup/README.md new file mode 100644 index 000000000..cdcd2b583 --- /dev/null +++ b/lts-startup/README.md @@ -0,0 +1,2 @@ + +lts-startup 模块主要是提供一些脚本启动的扩展 \ No newline at end of file diff --git a/lts-startup/lts-startup-tasktracker/src/main/java/com/lts/startup/TaskTrackerStartup.java b/lts-startup/lts-startup-tasktracker/src/main/java/com/lts/startup/TaskTrackerStartup.java index 5760cef94..2bef39a04 100644 --- a/lts-startup/lts-startup-tasktracker/src/main/java/com/lts/startup/TaskTrackerStartup.java +++ b/lts-startup/lts-startup-tasktracker/src/main/java/com/lts/startup/TaskTrackerStartup.java @@ -2,8 +2,6 @@ import com.lts.tasktracker.TaskTracker; -import java.util.Map; - /** * @author Robert HG (254963746@qq.com) on 9/11/15. */ @@ -26,10 +24,6 @@ public static void start(String cfgPath) { taskTracker = DefaultStartup.start(cfg); } - for (Map.Entry config : cfg.getConfigs().entrySet()) { - taskTracker.addConfig(config.getKey(), config.getValue()); - } - taskTracker.start(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {