Skip to content

Commit

Permalink
update getJobDetail
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Dec 4, 2024
1 parent 3dde178 commit 47abaee
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,17 @@ public void updateSingleJobStatus(String user, String agency, JobDO jobDO, JobSt
batchUpdateJobStatus(user, agency, new ArrayList<>(Arrays.asList(jobDO)), status);
}

public List<JobDO> queryJobDetail(String jobID, String user, String agency) {
public List<JobDO> queryJobDetail(String jobID, Boolean onlyMeta, String user, String agency) {
JobDO condition = new JobDO(jobID);
List<JobDO> jobDOList = this.queryJobByCondition(false, user, agency, condition);
List<JobDO> jobDOList = this.queryJobByCondition(onlyMeta, user, agency, condition);
// try to query the follower information
if (jobDOList == null || jobDOList.isEmpty()) {
// reset the condition
condition.setOwner(null);
condition.setOwnerAgency(null);
jobDOList =
this.projectMapper.queryFollowerJobByCondition(false, user, agency, condition);
this.projectMapper.queryFollowerJobByCondition(
onlyMeta, user, agency, condition);
}
return jobDOList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class JobRequest extends PageRequest {
private JobDO job = new JobDO(true);
private List<FollowerDO> taskParties;
private List<String> datasetList;
private Boolean onlyMeta = Boolean.FALSE;
private Boolean onlyMeta = Boolean.TRUE;

public JobRequest() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ public WeDPRResponse queryFollowerJobByCondition(String user, JobRequest request
this.projectMapperWrapper
.getProjectMapper()
.queryFollowerJobByCondition(
false, user, WeDPRCommonConfig.getAgency(), request.getJob());
request.getOnlyMeta(),
user,
WeDPRCommonConfig.getAgency(),
request.getJob());
if (jobDOList == null) {
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
select
<choose>
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`, `parties`, `create_time`, `last_update_time`
`id`, `owner`, `owner_agency`, `job_type`, `parties`, `create_time`, `last_update_time`, `status`
</when>
<otherwise>
*
Expand Down Expand Up @@ -297,7 +297,8 @@
select
<choose>
<when test="onlyMeta == true">
job_table.id, job_table.owner, job_table.owner_agency, job_table.job_type
job_table.id, job_table.owner, job_table.owner_agency,
job_table.job_type, job_table.create_time, job_table.last_update_time, job_table.status
</when>
<otherwise>
job_table.*
Expand All @@ -308,11 +309,14 @@
where `user_name` = #{followerUser} and `agency` = #{followerAgency})as follower_table
left join
(select
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`
<otherwise>
*
</otherwise>
<choose>
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`, `last_update_time`, `create_time`, `status`
</when>
<otherwise>
*
</otherwise>
</choose>
from `wedpr_job_table` where 1 = 1
<choose>
<when test="condition != null and condition != ''">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package com.webank.wedpr.components.scheduler;

import com.webank.wedpr.components.scheduler.impl.JobDetailRequest;

public interface SchedulerService {
// query the job detail
public abstract Object queryJobDetail(String user, String agency, String jobID)
throws Exception;
public abstract Object queryJobDetail(
String user, String agency, JobDetailRequest jobDetailRequest) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@

package com.webank.wedpr.components.scheduler.executor.impl.ml.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.webank.wedpr.common.utils.ObjectMapperFactory;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ModelJobResult {
@Data
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class ModelJobData {
private Object jobPlanetResult;
private String modelData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class GetTaskResultRequest implements BaseRequest {
String jobID;
String user;
String jobType;
Boolean onlyFetchLog;
Boolean fetchLog = Boolean.FALSE;
Boolean fetchJobResult = Boolean.FALSE;

public GetTaskResultRequest() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.scheduler.impl;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
public class JobDetailRequest {
private String jobID;
private Boolean fetchJobDetail = Boolean.TRUE;
private Boolean fetchJobResult = Boolean.FALSE;
private Boolean fetchLog = Boolean.FALSE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,46 +50,71 @@ public class SchedulerServiceImpl implements SchedulerService {
private LoadBalancer loadBalancer;

@Override
public Object queryJobDetail(String user, String agency, String jobID) throws Exception {
List<JobDO> jobDOList = this.projectMapperWrapper.queryJobDetail(jobID, user, agency);
public Object queryJobDetail(String user, String agency, JobDetailRequest jobDetailRequest)
throws Exception {
Boolean onlyMeta = Boolean.TRUE;
// query the jobDetail
if (jobDetailRequest.getFetchJobDetail()) {
onlyMeta = Boolean.FALSE;
}
List<JobDO> jobDOList =
this.projectMapperWrapper.queryJobDetail(
jobDetailRequest.getJobID(), onlyMeta, user, agency);
if (jobDOList == null || jobDOList.isEmpty()) {
throw new WeDPRException("queryJobDetail failed for the job " + jobID + " not exist!");
throw new WeDPRException(
"queryJobDetail failed for the job "
+ jobDetailRequest.getJobID()
+ " not exist!");
}
// query the jobDetail
JobDO jobDO = jobDOList.get(0);
JobDetailResponse response = new JobDetailResponse(jobDO);
// run failed, no need to fetch the result, only fetch the log
if (jobDO.getType().mlJob() && !JobStatus.success(jobDO.getStatus())) {
if (jobDetailRequest.getFetchLog()
&& jobDO.getType().mlJob()
&& !JobStatus.success(jobDO.getStatus())) {
Object logDetail = null;
if (jobDO.getJobStatus().finished()) {
GetTaskResultRequest getTaskResultRequest =
new GetTaskResultRequest(user, jobDO.getId(), jobDO.getJobType());
getTaskResultRequest.setOnlyFetchLog(Boolean.TRUE);
getTaskResultRequest.setFetchLog(Boolean.TRUE);
getTaskResultRequest.setFetchJobResult(Boolean.FALSE);
ModelJobResult.ModelJobData modelJobData =
(ModelJobResult.ModelJobData)
MLExecutorClient.getJobResult(loadBalancer, getTaskResultRequest);
logDetail = modelJobData.getLogDetail();
}
return new JobDetailResponse(jobDO, null, null, logDetail);
response.setLog(logDetail);
return response;
}
// the ml job
if (jobDO.getType().mlJob()) {
// no need to fetch log and fetch job result
if (!jobDetailRequest.getFetchLog() && !jobDetailRequest.getFetchJobResult()) {
return response;
}

GetTaskResultRequest getTaskResultRequest =
new GetTaskResultRequest(user, jobDO.getId(), jobDO.getJobType());
getTaskResultRequest.setFetchJobResult(jobDetailRequest.getFetchJobResult());
getTaskResultRequest.setFetchLog(jobDetailRequest.getFetchLog());
ModelJobResult.ModelJobData modelJobData =
(ModelJobResult.ModelJobData)
MLExecutorClient.getJobResult(loadBalancer, getTaskResultRequest);
if (modelJobData == null) {
return new JobDetailResponse(jobDO, null, null, null);
}
return new JobDetailResponse(
jobDO,
modelJobData.getJobPlanetResult(),
modelJobData.getModelData(),
modelJobData.getLogDetail());
}
JobDetailResponse response = new JobDetailResponse(jobDO);
// the psi job, parse the output
if (JobType.isPSIJob(jobDO.getJobType())) {
PSIJobParam psiJobParam = PSIJobParam.deserialize(jobDO.getParam());
response.setResultFileInfo(
psiJobParam.getResultPath(datasetMapper, fileMetaBuilder, jobID));
psiJobParam.getResultPath(
datasetMapper, fileMetaBuilder, jobDetailRequest.getJobID()));
}
// the pir job, get result files
if (JobType.isPirJob(jobDO.getJobType())) {
Expand All @@ -107,7 +132,9 @@ public Object queryJobDetail(String user, String agency, String jobID) throws Ex
mpcJobParam.check(datasetMapper);
response.setResultFileInfo(
mpcJobParam.getMpcPath(
fileMetaBuilder, jobID, ExecutorConfig.getMpcResultFileName()));
fileMetaBuilder,
jobDetailRequest.getJobID(),
ExecutorConfig.getMpcResultFileName()));
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import com.webank.wedpr.common.utils.Constant;
import com.webank.wedpr.common.utils.WeDPRResponse;
import com.webank.wedpr.components.scheduler.SchedulerService;
import com.webank.wedpr.components.scheduler.impl.JobDetailRequest;
import com.webank.wedpr.components.token.auth.TokenUtils;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(
Expand All @@ -39,8 +37,9 @@ public class SchedulerController {
@Autowired private SchedulerService schedulerService;

// create the authorization request
@GetMapping("/queryJobDetail")
public WeDPRResponse queryJobDetail(@RequestParam String jobID, HttpServletRequest request) {
@PostMapping("/queryJobDetail")
public WeDPRResponse queryJobDetail(
@RequestBody JobDetailRequest jobDetailRequest, HttpServletRequest request) {
try {
WeDPRResponse response =
new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG);
Expand All @@ -49,13 +48,17 @@ public WeDPRResponse queryJobDetail(@RequestParam String jobID, HttpServletReque
this.schedulerService.queryJobDetail(
TokenUtils.getLoginUser(request).getUsername(),
WeDPRCommonConfig.getAgency(),
jobID));
jobDetailRequest));
return response;
} catch (Exception e) {
logger.warn("queryJobDetail exception, job: {}, error: ", jobID, e);
logger.warn(
"queryJobDetail exception, job: {}, error: ", jobDetailRequest.getJobID(), e);
return new WeDPRResponse(
Constant.WEDPR_FAILED,
"queryJobDetail for job " + jobID + " failed for " + e.getMessage());
"queryJobDetail for job "
+ jobDetailRequest.getJobID()
+ " failed for "
+ e.getMessage());
}
}
}

0 comments on commit 47abaee

Please sign in to comment.