Skip to content

Commit

Permalink
Upload job infos to workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
lukfor committed Sep 18, 2023
1 parent 44afc36 commit 3b26d30
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 108 deletions.
18 changes: 9 additions & 9 deletions src/main/java/cloudgene/mapred/database/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ public boolean insert(AbstractJob job) {

public boolean update(AbstractJob job) {
StringBuilder sql = new StringBuilder();
sql.append("update job ");
sql.append(" set name = ?, state = ?, ");
sql.append("update job set ");
sql.append(" name = ?, state = ?, ");
sql.append(" start_time = ?, end_time = ?, ");
sql.append(
" user_id = ?, s3_url = ?, type = ?, deleted_on = ?, application = ?, application_id = ?, submitted_on = ?, finished_on = ?, setup_start_time = ?, setup_end_time = ? ");
sql.append(" user_id = ?, s3_url = ?, type = ?, deleted_on = ?, ");
sql.append(" application = ?, application_id = ?, submitted_on = ?, ");
sql.append(" finished_on = ?, setup_start_time = ?, setup_end_time = ? ");
sql.append("where id = ? ");
try {

Expand Down Expand Up @@ -105,9 +106,8 @@ public boolean update(AbstractJob job) {

public boolean updateUser(User oldUser, User newUser) {
StringBuilder sql = new StringBuilder();
sql.append("update job ");
sql.append("set user_id = ?, name = ? ");
sql.append("where user_id = ?");
sql.append("update job set user_id = ?, name = ? ");
sql.append("where user_id = ?");
try {

Object[] params = new Object[3];
Expand All @@ -132,7 +132,7 @@ public boolean updateUser(User oldUser, User newUser) {
public boolean delete(AbstractJob job) {
StringBuilder sql = new StringBuilder();
sql.append("delete from job ");
sql.append("where id = ? ");
sql.append("where id = ?");
try {

Object[] params = new Object[1];
Expand Down Expand Up @@ -277,7 +277,7 @@ public List<AbstractJob> findAllNotRetiredJobs() {
params[1] = AbstractJob.STATE_RUNNING;
params[2] = AbstractJob.STATE_EXPORTING;
params[3] = AbstractJob.STATE_RETIRED;
params[4] = AbstractJob.STATE_DELETED;
params[4] = AbstractJob.STATE_DELETED;
params[5] = AbstractJob.STATE_RETIRED;
params[6] = AbstractJob.STATE_DELETED;

Expand Down
87 changes: 30 additions & 57 deletions src/main/java/cloudgene/mapred/jobs/AbstractJob.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloudgene.mapred.jobs;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -30,6 +31,10 @@

abstract public class AbstractJob extends PriorityRunnable {

public static final String JOB_LOG = "job.txt";

public static final String JOB_OUT = "std.out";

private static final Logger log = LoggerFactory.getLogger(AbstractJob.class);

private DateFormat formatter = new SimpleDateFormat("yy/MM/dd HH:mm:ss");
Expand Down Expand Up @@ -102,8 +107,6 @@ abstract public class AbstractJob extends PriorityRunnable {

private Settings settings;

private String logs;

private String localWorkspace;

private boolean canceld = false;
Expand Down Expand Up @@ -300,7 +303,7 @@ public boolean runInstallationAndResolveAppLinks() {
@Override
public void run() {

if (isCanceld()) {
if (canceld) {
return;
}

Expand Down Expand Up @@ -389,29 +392,19 @@ public void run() {

}

writeLog("Cleaning up...");
if (getState() == AbstractJob.STATE_FAILED || getState() == AbstractJob.STATE_CANCELED) {

writeLog("Cleaning up...");
onFailure();
log.info("[Job {}]cleanup successful.", getId());
writeLog("Cleanup successful.");

} else {
writeLog("Cleaning up...");
cleanUp();
log.info("[Job {}] cleanup successful.", getId());
writeLog("Cleanup successful.");

}
log.info("[Job {}]cleanup successful.", getId());
writeLog("Cleanup successful.");

if (canceld) {
setState(AbstractJob.STATE_CANCELED);
}

closeStdOutFiles();

setEndTime(System.currentTimeMillis());

} catch (Exception | Error e) {

setState(AbstractJob.STATE_FAILED);
Expand All @@ -429,11 +422,10 @@ public void run() {
log.info("[Job {}]: cleanup successful.", getId());
writeLog("Cleanup successful.");

closeStdOutFiles();

setEndTime(System.currentTimeMillis());

}

closeStdOutFiles();
setEndTime(System.currentTimeMillis());
}

public void cancel() {
Expand All @@ -448,8 +440,8 @@ public void cancel() {
}

private void initStdOutFiles() throws FileNotFoundException {
stdOutStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, "std.out")));
logStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, "job.txt")));
stdOutStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, JOB_OUT)));
logStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, JOB_LOG)));
}

private void closeStdOutFiles() {
Expand All @@ -459,8 +451,15 @@ private void closeStdOutFiles() {
stdOutStream.close();
logStream.close();

} catch (IOException e) {
// stage files to workspace
workspace.uploadLog(new File(FileUtil.path(localWorkspace, JOB_OUT)));
workspace.uploadLog(new File(FileUtil.path(localWorkspace, JOB_LOG)));

FileUtil.deleteFile(FileUtil.path(localWorkspace, JOB_OUT));
FileUtil.deleteFile(FileUtil.path(localWorkspace, JOB_LOG));

} catch (IOException e) {
log.error("[Job {}]: Staging log files failed.", getId(), e);
}

}
Expand All @@ -474,25 +473,13 @@ public void writeOutput(String line) {

}
} catch (IOException e) {

log.error("[Job {}]: Write output failed.", getId(), e);
}

}

public void writeOutputln(String line) {

try {
if (stdOutStream == null) {
initStdOutFiles();
}

stdOutStream.write(line.getBytes("UTF-8"));
stdOutStream.write("\n".getBytes("UTF-8"));
stdOutStream.flush();

} catch (IOException e) {
}

writeOutput(line + "\n");
}

public void writeLog(String line) {
Expand All @@ -508,6 +495,7 @@ public void writeLog(String line) {
logStream.flush();

} catch (IOException e) {
log.error("[Job {}]: Write output failed.", getId(), e);
}

}
Expand All @@ -524,14 +512,6 @@ public CloudgeneContext getContext() {
return context;
}

public void setLogs(String logs) {
this.logs = logs;
}

public String getLogs() {
return logs;
}

@Override
public boolean equals(Object obj) {

Expand Down Expand Up @@ -583,10 +563,6 @@ public String getApplicationId() {
return applicationId;
}

public boolean isCanceld() {
return canceld;
}

public boolean isRunning() {
return state == STATE_EXPORTING || state == STATE_RUNNING || state == STATE_WAITING;
}
Expand Down Expand Up @@ -619,16 +595,13 @@ public void kill() {

}

public long getCurrentTime() {
return System.currentTimeMillis();
}

public void setCurrentTime(long time) {

}

public String getPublicJobId() {
return publicJobId;
}

public String getLog(String name) {
String logFilename = FileUtil.path(settings.getLocalWorkspace(), getId(), name);
return FileUtil.readFileAsString(logFilename);
}

}
2 changes: 1 addition & 1 deletion src/main/java/cloudgene/mapred/jobs/CloudgeneJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public boolean setup() {
try {
log.info("[Job {}] Setup workspace {}'", getId(), workspace.getName());
context.log("Setup External Workspace on " + workspace.getName());
workspace.setup(this.getId());
workspace.setup();
context.setWorkspace(workspace);
} catch (Exception e) {
writeLog(e.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@

public interface IWorkspace {

public void setup(String job) throws IOException;
public void setJob(String job);

public void setup() throws IOException;

public String upload(String id, File file) throws IOException;

public String uploadInput(String id, File file) throws IOException;

public String uploadLog(File file) throws IOException;

public InputStream download(String url) throws IOException;

Expand All @@ -40,5 +44,7 @@ public interface IWorkspace {
public void cleanup(String job) throws IOException;

public boolean exists(String path) throws IOException;

public String downloadLog(String string) throws IOException;

}
28 changes: 26 additions & 2 deletions src/main/java/cloudgene/mapred/jobs/workspace/LocalWorkspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,17 @@ public String getName() {
}

@Override
public void setup(String job) throws IOException {
public void setJob(String job) {
workspace = FileUtil.path(location, job);
}

@Override
public void setup() throws IOException {

if (workspace == null) {
throw new IOException("No job id provided.");
}

log.info("Init workspace " + workspace);
FileUtil.createDirectory(workspace);
}
Expand All @@ -59,6 +68,11 @@ public String upload(String id, File file) throws IOException {
return target;
}

@Override
public String uploadLog(File file) throws IOException {
return upload(LOGS_DIRECTORY, file);
}

@Override
public String uploadInput(String id, File file) throws IOException {
return upload(FileUtil.path(INPUT_DIRECTORY, id), file);
Expand All @@ -74,8 +88,18 @@ public InputStream download(String path) throws IOException {
if (file.exists()) {
return new FileInputStream(file);
} else {
throw new IOException("File '" + path + "' not found in workspace.");
throw new IOException("File '" + absolutePath + "' not found in workspace.");
}
}

@Override
public String downloadLog(String name) throws IOException {

if (workspace == null) {
throw new IOException("No job id provided.");
}

return FileUtil.readFileAsString(download(FileUtil.path(workspace, LOGS_DIRECTORY, name)));
}

@Override
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/cloudgene/mapred/jobs/workspace/S3Workspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ public String getName() {
}

@Override
public void setup(String job) throws IOException {
public void setJob(String job) {
this.job = job;
}

@Override
public void setup() throws IOException {

this.job = job;
if (job == null) {
throw new IOException("No job id provided.");
}

if (location == null) {
throw new IOException("No S3 Output Bucket specified.");
Expand Down Expand Up @@ -86,6 +93,11 @@ public String uploadInput(String id, File file) throws IOException {
return upload(FileUtil.path(INPUT_DIRECTORY, id), file);
}

@Override
public String uploadLog(File file) throws IOException {
return upload(LOGS_DIRECTORY, file);
}

@Override
public InputStream download(String url) throws IOException {

Expand All @@ -99,6 +111,11 @@ public InputStream download(String url) throws IOException {

return s3is;
}

@Override
public String downloadLog(String name) throws IOException {
return FileUtil.readFileAsString(download(FileUtil.path(LOGS_DIRECTORY, name)));
}

public boolean exists(String url) {
String bucket = S3Util.getBucket(url);
Expand Down Expand Up @@ -245,7 +262,7 @@ public List<Download> getDownloads(String url) {

return downloads;
}

@Override
public List<Download> getLogs() {
String url = location + "/" + job + "/" + LOGS_DIRECTORY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public IWorkspace getByUrl(String url) {
}

public IWorkspace getByJob(AbstractJob job) {
return getDefault();
IWorkspace workspace = getDefault();
workspace.setJob(job.getId());
return workspace;
}

}
Loading

0 comments on commit 3b26d30

Please sign in to comment.