Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add onlyMeta to accelerate job fetch perf #171

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,17 @@ public void updateSingleJobStatus(String user, String agency, JobDO jobDO, JobSt
batchUpdateJobStatus(user, agency, new ArrayList<>(Arrays.asList(jobDO)), status);
}

public List<JobDO> queryJobDetail(String jobID, String user, String agency) {
public List<JobDO> queryJobDetail(String jobID, Boolean onlyMeta, String user, String agency) {
JobDO condition = new JobDO(jobID);
List<JobDO> jobDOList = this.queryJobByCondition(false, user, agency, condition);
List<JobDO> jobDOList = this.queryJobByCondition(onlyMeta, user, agency, condition);
// try to query the follower information
if (jobDOList == null || jobDOList.isEmpty()) {
// reset the condition
condition.setOwner(null);
condition.setOwnerAgency(null);
jobDOList =
this.projectMapper.queryFollowerJobByCondition(false, user, agency, condition);
this.projectMapper.queryFollowerJobByCondition(
onlyMeta, user, agency, condition);
}
return jobDOList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.webank.wedpr.components.project.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.webank.wedpr.common.utils.Common;
import com.webank.wedpr.common.utils.PageRequest;
import com.webank.wedpr.common.utils.WeDPRException;
Expand All @@ -23,12 +24,16 @@
import com.webank.wedpr.components.project.dao.ProjectDO;
import com.webank.wedpr.components.project.dao.ProjectMapper;
import java.util.List;
import lombok.Data;
import lombok.SneakyThrows;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class JobRequest extends PageRequest {
private JobDO job = new JobDO(true);
private List<FollowerDO> taskParties;
private List<String> datasetList;
private Boolean onlyMeta = Boolean.TRUE;

public JobRequest() {}

Expand Down Expand Up @@ -60,10 +65,6 @@ public void setJob(JobDO job) {
this.job = job;
}

public List<FollowerDO> getTaskParties() {
return taskParties;
}

public void setTaskParties(List<FollowerDO> taskParties) {
this.taskParties = taskParties;
checkAndConfigTaskParities(taskParties);
Expand Down Expand Up @@ -92,6 +93,13 @@ public void setDatasetList(List<String> datasetList) {
this.datasetList = datasetList;
}

public void setOnlyMeta(Boolean onlyMeta) {
if (onlyMeta == null) {
return;
}
this.onlyMeta = onlyMeta;
}

@Override
public String toString() {
return "JobRequest{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ public WeDPRResponse queryJobByCondition(String user, JobRequest request) {
// query with using owner identity
List<JobDO> jobDOList =
this.projectMapperWrapper.queryJobByCondition(
false, user, WeDPRCommonConfig.getAgency(), request.getJob());
request.getOnlyMeta(),
user,
WeDPRCommonConfig.getAgency(),
request.getJob());
response.setData(
new BatchJobList(new PageInfo<JobDO>(jobDOList).getTotal(), jobDOList));
return response;
Expand Down Expand Up @@ -431,7 +434,10 @@ public WeDPRResponse queryFollowerJobByCondition(String user, JobRequest request
this.projectMapperWrapper
.getProjectMapper()
.queryFollowerJobByCondition(
false, user, WeDPRCommonConfig.getAgency(), request.getJob());
request.getOnlyMeta(),
user,
WeDPRCommonConfig.getAgency(),
request.getJob());
if (jobDOList == null) {
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
select
<choose>
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`, `parties`
`id`, `owner`, `owner_agency`, `job_type`, `parties`, `create_time`, `last_update_time`, `status`
</when>
<otherwise>
*
Expand Down Expand Up @@ -297,7 +297,8 @@
select
<choose>
<when test="onlyMeta == true">
job_table.id, job_table.owner, job_table.owner_agency, job_table.job_type
job_table.id, job_table.owner, job_table.owner_agency,
job_table.job_type, job_table.create_time, job_table.last_update_time, job_table.status
</when>
<otherwise>
job_table.*
Expand All @@ -307,7 +308,16 @@
(select `user_name`, `agency`, `resource_id` from `wedpr_follower_table`
where `user_name` = #{followerUser} and `agency` = #{followerAgency})as follower_table
left join
(select * from `wedpr_job_table` where 1 = 1
(select
<choose>
<when test="onlyMeta == true">
`id`, `owner`, `owner_agency`, `job_type`, `last_update_time`, `create_time`, `status`
</when>
<otherwise>
*
</otherwise>
</choose>
from `wedpr_job_table` where 1 = 1
<choose>
<when test="condition != null and condition != ''">
<if test="condition.name != null and condition.name !=''">
Expand Down Expand Up @@ -351,7 +361,7 @@
(select `user_name`, `agency`, `resource_id` from `wedpr_follower_table`
where `user_name` = #{followerUser} and `agency` = #{followerAgency})
as follower_table left join
(select * from `wedpr_job_table` where 1 = 1
(select `id`, `owner` from `wedpr_job_table` where 1 = 1
<choose>
<when test="condition != null and condition != ''">
<if test="condition.name != null and condition.name !=''">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package com.webank.wedpr.components.scheduler;

import com.webank.wedpr.components.scheduler.impl.JobDetailRequest;

public interface SchedulerService {
// query the job detail
public abstract Object queryJobDetail(String user, String agency, String jobID)
throws Exception;
public abstract Object queryJobDetail(
String user, String agency, JobDetailRequest jobDetailRequest) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import com.webank.wedpr.components.scheduler.executor.impl.ml.model.ModelJobResult;
import com.webank.wedpr.components.scheduler.executor.impl.ml.request.GetTaskResultRequest;
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLExecutorClient {
private static final Logger logger = LoggerFactory.getLogger(MLExecutorClient.class);

public static Object getJobResult(LoadBalancer loadBalancer, GetTaskResultRequest request)
throws Exception {

Expand Down Expand Up @@ -57,9 +61,6 @@ public static Object getJobResult(LoadBalancer loadBalancer, GetTaskResultReques
if (modelJobResult.getData() == null) {
return null;
}
if (modelJobResult.getData().getJobPlanetResult() == null) {
return null;
}
return modelJobResult.getData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@

package com.webank.wedpr.components.scheduler.executor.impl.ml.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.webank.wedpr.common.utils.ObjectMapperFactory;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ModelJobResult {
@Data
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class ModelJobData {
private Object jobPlanetResult;
private String modelData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class GetTaskResultRequest implements BaseRequest {
String jobID;
String user;
String jobType;
Boolean onlyFetchLog;
Boolean fetchLog = Boolean.FALSE;
Boolean fetchJobResult = Boolean.FALSE;

public GetTaskResultRequest() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.scheduler.impl;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
public class JobDetailRequest {
private String jobID;
private Boolean fetchJobDetail = Boolean.TRUE;
private Boolean fetchJobResult = Boolean.FALSE;
private Boolean fetchLog = Boolean.FALSE;

public void setFetchJobDetail(Boolean fetchJobDetail) {
if (fetchJobDetail == null) {
return;
}
this.fetchJobDetail = fetchJobDetail;
}

public void setFetchJobResult(Boolean fetchJobResult) {
if (fetchJobResult == null) {
return;
}
this.fetchJobResult = fetchJobResult;
}

public void setFetchLog(Boolean fetchLog) {
if (fetchLog == null) {
return;
}
this.fetchLog = fetchLog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,46 +50,71 @@ public class SchedulerServiceImpl implements SchedulerService {
private LoadBalancer loadBalancer;

@Override
public Object queryJobDetail(String user, String agency, String jobID) throws Exception {
List<JobDO> jobDOList = this.projectMapperWrapper.queryJobDetail(jobID, user, agency);
public Object queryJobDetail(String user, String agency, JobDetailRequest jobDetailRequest)
throws Exception {
Boolean onlyMeta = Boolean.TRUE;
// query the jobDetail
if (jobDetailRequest.getFetchJobDetail()) {
onlyMeta = Boolean.FALSE;
}
List<JobDO> jobDOList =
this.projectMapperWrapper.queryJobDetail(
jobDetailRequest.getJobID(), onlyMeta, user, agency);
if (jobDOList == null || jobDOList.isEmpty()) {
throw new WeDPRException("queryJobDetail failed for the job " + jobID + " not exist!");
throw new WeDPRException(
"queryJobDetail failed for the job "
+ jobDetailRequest.getJobID()
+ " not exist!");
}
// query the jobDetail
JobDO jobDO = jobDOList.get(0);
JobDetailResponse response = new JobDetailResponse(jobDO);
// run failed, no need to fetch the result, only fetch the log
if (jobDO.getType().mlJob() && !JobStatus.success(jobDO.getStatus())) {
if (jobDetailRequest.getFetchLog()
&& jobDO.getType().mlJob()
&& !JobStatus.success(jobDO.getStatus())) {
Object logDetail = null;
if (jobDO.getJobStatus().finished()) {
GetTaskResultRequest getTaskResultRequest =
new GetTaskResultRequest(user, jobDO.getId(), jobDO.getJobType());
getTaskResultRequest.setOnlyFetchLog(Boolean.TRUE);
getTaskResultRequest.setFetchLog(Boolean.TRUE);
getTaskResultRequest.setFetchJobResult(Boolean.FALSE);
ModelJobResult.ModelJobData modelJobData =
(ModelJobResult.ModelJobData)
MLExecutorClient.getJobResult(loadBalancer, getTaskResultRequest);
logDetail = modelJobData.getLogDetail();
}
return new JobDetailResponse(jobDO, null, null, logDetail);
response.setLog(logDetail);
return response;
}
// the ml job
if (jobDO.getType().mlJob()) {
// no need to fetch log and fetch job result
if (!jobDetailRequest.getFetchLog() && !jobDetailRequest.getFetchJobResult()) {
return response;
}

GetTaskResultRequest getTaskResultRequest =
new GetTaskResultRequest(user, jobDO.getId(), jobDO.getJobType());
getTaskResultRequest.setFetchJobResult(jobDetailRequest.getFetchJobResult());
getTaskResultRequest.setFetchLog(jobDetailRequest.getFetchLog());
ModelJobResult.ModelJobData modelJobData =
(ModelJobResult.ModelJobData)
MLExecutorClient.getJobResult(loadBalancer, getTaskResultRequest);
if (modelJobData == null) {
return new JobDetailResponse(jobDO, null, null, null);
}
return new JobDetailResponse(
jobDO,
modelJobData.getJobPlanetResult(),
modelJobData.getModelData(),
modelJobData.getLogDetail());
}
JobDetailResponse response = new JobDetailResponse(jobDO);
// the psi job, parse the output
if (JobType.isPSIJob(jobDO.getJobType())) {
PSIJobParam psiJobParam = PSIJobParam.deserialize(jobDO.getParam());
response.setResultFileInfo(
psiJobParam.getResultPath(datasetMapper, fileMetaBuilder, jobID));
psiJobParam.getResultPath(
datasetMapper, fileMetaBuilder, jobDetailRequest.getJobID()));
}
// the pir job, get result files
if (JobType.isPirJob(jobDO.getJobType())) {
Expand All @@ -107,7 +132,9 @@ public Object queryJobDetail(String user, String agency, String jobID) throws Ex
mpcJobParam.check(datasetMapper);
response.setResultFileInfo(
mpcJobParam.getMpcPath(
fileMetaBuilder, jobID, ExecutorConfig.getMpcResultFileName()));
fileMetaBuilder,
jobDetailRequest.getJobID(),
ExecutorConfig.getMpcResultFileName()));
}

return response;
Expand Down
Loading
Loading