Skip to content

Commit

Permalink
update getJobDetail
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Dec 4, 2024
1 parent 3dde178 commit 928b022
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class JobRequest extends PageRequest {
private JobDO job = new JobDO(true);
private List<FollowerDO> taskParties;
private List<String> datasetList;
private Boolean onlyMeta = Boolean.FALSE;
private Boolean onlyMeta = Boolean.TRUE;

public JobRequest() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ public WeDPRResponse queryFollowerJobByCondition(String user, JobRequest request
this.projectMapperWrapper
.getProjectMapper()
.queryFollowerJobByCondition(
false, user, WeDPRCommonConfig.getAgency(), request.getJob());
request.getOnlyMeta(),
user,
WeDPRCommonConfig.getAgency(),
request.getJob());
if (jobDOList == null) {
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
select
<choose>
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`, `parties`, `create_time`, `last_update_time`
`id`, `owner`, `owner_agency`, `job_type`, `parties`, `create_time`, `last_update_time`, `status`
</when>
<otherwise>
*
Expand Down Expand Up @@ -308,11 +308,14 @@
where `user_name` = #{followerUser} and `agency` = #{followerAgency})as follower_table
left join
(select
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`
<otherwise>
*
</otherwise>
<choose>
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`, `last_update_time`
</when>
<otherwise>
*
</otherwise>
</choose>
from `wedpr_job_table` where 1 = 1
<choose>
<when test="condition != null and condition != ''">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package com.webank.wedpr.components.scheduler;

import com.webank.wedpr.components.scheduler.impl.JobDetailRequest;

public interface SchedulerService {
// query the job detail
public abstract Object queryJobDetail(String user, String agency, String jobID)
throws Exception;
public abstract Object queryJobDetail(
String user, String agency, JobDetailRequest jobDetailRequest) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class GetTaskResultRequest implements BaseRequest {
String jobID;
String user;
String jobType;
Boolean onlyFetchLog;
Boolean fetchLog = Boolean.FALSE;
Boolean fetchJobResult = Boolean.FALSE;

public GetTaskResultRequest() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.scheduler.impl;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
public class JobDetailRequest {
private String jobID;
private Boolean fetchJobDetail = Boolean.TRUE;
private Boolean fetchJobResult = Boolean.FALSE;
private Boolean fetchLog = Boolean.FALSE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,53 @@ public class SchedulerServiceImpl implements SchedulerService {
private LoadBalancer loadBalancer;

@Override
public Object queryJobDetail(String user, String agency, String jobID) throws Exception {
List<JobDO> jobDOList = this.projectMapperWrapper.queryJobDetail(jobID, user, agency);
if (jobDOList == null || jobDOList.isEmpty()) {
throw new WeDPRException("queryJobDetail failed for the job " + jobID + " not exist!");
}
public Object queryJobDetail(String user, String agency, JobDetailRequest jobDetailRequest)
throws Exception {
JobDO jobDO = null;
JobDetailResponse response = new JobDetailResponse(jobDO);
// query the jobDetail
JobDO jobDO = jobDOList.get(0);
if (jobDetailRequest.getFetchJobDetail()) {
List<JobDO> jobDOList =
this.projectMapperWrapper.queryJobDetail(
jobDetailRequest.getJobID(), user, agency);
if (jobDOList == null || jobDOList.isEmpty()) {
throw new WeDPRException(
"queryJobDetail failed for the job "
+ jobDetailRequest.getJobID()
+ " not exist!");
}
jobDO = jobDOList.get(0);
response.setJob(jobDO);
}
// run failed, no need to fetch the result, only fetch the log
if (jobDO.getType().mlJob() && !JobStatus.success(jobDO.getStatus())) {
if (jobDetailRequest.getFetchLog()
&& jobDO.getType().mlJob()
&& !JobStatus.success(jobDO.getStatus())) {
Object logDetail = null;
if (jobDO.getJobStatus().finished()) {
GetTaskResultRequest getTaskResultRequest =
new GetTaskResultRequest(user, jobDO.getId(), jobDO.getJobType());
getTaskResultRequest.setOnlyFetchLog(Boolean.TRUE);
getTaskResultRequest.setFetchLog(Boolean.TRUE);
getTaskResultRequest.setFetchJobResult(Boolean.FALSE);
ModelJobResult.ModelJobData modelJobData =
(ModelJobResult.ModelJobData)
MLExecutorClient.getJobResult(loadBalancer, getTaskResultRequest);
logDetail = modelJobData.getLogDetail();
}
return new JobDetailResponse(jobDO, null, null, logDetail);
response.setLog(logDetail);
return response;
}
// the ml job
if (jobDO.getType().mlJob()) {
// no need to fetch log and fetch job result
if (!jobDetailRequest.getFetchLog() && !jobDetailRequest.getFetchJobResult()) {
return response;
}

GetTaskResultRequest getTaskResultRequest =
new GetTaskResultRequest(user, jobDO.getId(), jobDO.getJobType());
getTaskResultRequest.setFetchJobResult(jobDetailRequest.getFetchJobResult());
getTaskResultRequest.setFetchLog(jobDetailRequest.getFetchLog());
ModelJobResult.ModelJobData modelJobData =
(ModelJobResult.ModelJobData)
MLExecutorClient.getJobResult(loadBalancer, getTaskResultRequest);
Expand All @@ -84,12 +106,12 @@ public Object queryJobDetail(String user, String agency, String jobID) throws Ex
modelJobData.getModelData(),
modelJobData.getLogDetail());
}
JobDetailResponse response = new JobDetailResponse(jobDO);
// the psi job, parse the output
if (JobType.isPSIJob(jobDO.getJobType())) {
PSIJobParam psiJobParam = PSIJobParam.deserialize(jobDO.getParam());
response.setResultFileInfo(
psiJobParam.getResultPath(datasetMapper, fileMetaBuilder, jobID));
psiJobParam.getResultPath(
datasetMapper, fileMetaBuilder, jobDetailRequest.getJobID()));
}
// the pir job, get result files
if (JobType.isPirJob(jobDO.getJobType())) {
Expand All @@ -107,7 +129,9 @@ public Object queryJobDetail(String user, String agency, String jobID) throws Ex
mpcJobParam.check(datasetMapper);
response.setResultFileInfo(
mpcJobParam.getMpcPath(
fileMetaBuilder, jobID, ExecutorConfig.getMpcResultFileName()));
fileMetaBuilder,
jobDetailRequest.getJobID(),
ExecutorConfig.getMpcResultFileName()));
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import com.webank.wedpr.common.utils.Constant;
import com.webank.wedpr.common.utils.WeDPRResponse;
import com.webank.wedpr.components.scheduler.SchedulerService;
import com.webank.wedpr.components.scheduler.impl.JobDetailRequest;
import com.webank.wedpr.components.token.auth.TokenUtils;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(
Expand All @@ -39,8 +37,9 @@ public class SchedulerController {
@Autowired private SchedulerService schedulerService;

// create the authorization request
@GetMapping("/queryJobDetail")
public WeDPRResponse queryJobDetail(@RequestParam String jobID, HttpServletRequest request) {
@PostMapping("/queryJobDetail")
public WeDPRResponse queryJobDetail(
@RequestBody JobDetailRequest jobDetailRequest, HttpServletRequest request) {
try {
WeDPRResponse response =
new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG);
Expand All @@ -49,13 +48,17 @@ public WeDPRResponse queryJobDetail(@RequestParam String jobID, HttpServletReque
this.schedulerService.queryJobDetail(
TokenUtils.getLoginUser(request).getUsername(),
WeDPRCommonConfig.getAgency(),
jobID));
jobDetailRequest));
return response;
} catch (Exception e) {
logger.warn("queryJobDetail exception, job: {}, error: ", jobID, e);
logger.warn(
"queryJobDetail exception, job: {}, error: ", jobDetailRequest.getJobID(), e);
return new WeDPRResponse(
Constant.WEDPR_FAILED,
"queryJobDetail for job " + jobID + " failed for " + e.getMessage());
"queryJobDetail for job "
+ jobDetailRequest.getJobID()
+ " failed for "
+ e.getMessage());
}
}
}

0 comments on commit 928b022

Please sign in to comment.