diff --git a/src/main/java/cloudgene/mapred/database/JobDao.java b/src/main/java/cloudgene/mapred/database/JobDao.java index d2e8b407..244ba398 100644 --- a/src/main/java/cloudgene/mapred/database/JobDao.java +++ b/src/main/java/cloudgene/mapred/database/JobDao.java @@ -66,11 +66,12 @@ public boolean insert(AbstractJob job) { public boolean update(AbstractJob job) { StringBuilder sql = new StringBuilder(); - sql.append("update job "); - sql.append(" set name = ?, state = ?, "); + sql.append("update job set "); + sql.append(" name = ?, state = ?, "); sql.append(" start_time = ?, end_time = ?, "); - sql.append( - " user_id = ?, s3_url = ?, type = ?, deleted_on = ?, application = ?, application_id = ?, submitted_on = ?, finished_on = ?, setup_start_time = ?, setup_end_time = ? "); + sql.append(" user_id = ?, s3_url = ?, type = ?, deleted_on = ?, "); + sql.append(" application = ?, application_id = ?, submitted_on = ?, "); + sql.append(" finished_on = ?, setup_start_time = ?, setup_end_time = ? "); sql.append("where id = ? "); try { @@ -105,9 +106,8 @@ public boolean update(AbstractJob job) { public boolean updateUser(User oldUser, User newUser) { StringBuilder sql = new StringBuilder(); - sql.append("update job "); - sql.append("set user_id = ?, name = ? "); - sql.append("where user_id = ?"); + sql.append("update job set user_id = ?, name = ? "); + sql.append("where user_id = ?"); try { Object[] params = new Object[3]; @@ -132,7 +132,7 @@ public boolean updateUser(User oldUser, User newUser) { public boolean delete(AbstractJob job) { StringBuilder sql = new StringBuilder(); sql.append("delete from job "); - sql.append("where id = ? "); + sql.append("where id = ?"); try { Object[] params = new Object[1]; @@ -277,7 +277,7 @@ public List findAllNotRetiredJobs() { params[1] = AbstractJob.STATE_RUNNING; params[2] = AbstractJob.STATE_EXPORTING; params[3] = AbstractJob.STATE_RETIRED; - params[4] = AbstractJob.STATE_DELETED; + params[4] = AbstractJob.STATE_DELETED; params[5] = AbstractJob.STATE_RETIRED; params[6] = AbstractJob.STATE_DELETED; diff --git a/src/main/java/cloudgene/mapred/jobs/AbstractJob.java b/src/main/java/cloudgene/mapred/jobs/AbstractJob.java index aaed803e..c1e93dff 100644 --- a/src/main/java/cloudgene/mapred/jobs/AbstractJob.java +++ b/src/main/java/cloudgene/mapred/jobs/AbstractJob.java @@ -1,6 +1,7 @@ package cloudgene.mapred.jobs; import java.io.BufferedOutputStream; +import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -30,6 +31,10 @@ abstract public class AbstractJob extends PriorityRunnable { + public static final String JOB_LOG = "job.txt"; + + public static final String JOB_OUT = "std.out"; + private static final Logger log = LoggerFactory.getLogger(AbstractJob.class); private DateFormat formatter = new SimpleDateFormat("yy/MM/dd HH:mm:ss"); @@ -102,8 +107,6 @@ abstract public class AbstractJob extends PriorityRunnable { private Settings settings; - private String logs; - private String localWorkspace; private boolean canceld = false; @@ -300,7 +303,7 @@ public boolean runInstallationAndResolveAppLinks() { @Override public void run() { - if (isCanceld()) { + if (canceld) { return; } @@ -389,29 +392,19 @@ public void run() { } + writeLog("Cleaning up..."); if (getState() == AbstractJob.STATE_FAILED || getState() == AbstractJob.STATE_CANCELED) { - - writeLog("Cleaning up..."); onFailure(); - log.info("[Job {}]cleanup successful.", getId()); - writeLog("Cleanup successful."); - } else { - writeLog("Cleaning up..."); cleanUp(); - log.info("[Job {}] cleanup successful.", getId()); - writeLog("Cleanup successful."); - } + log.info("[Job {}]cleanup successful.", getId()); + writeLog("Cleanup successful."); if (canceld) { setState(AbstractJob.STATE_CANCELED); } - closeStdOutFiles(); - - setEndTime(System.currentTimeMillis()); - } catch (Exception | Error e) { setState(AbstractJob.STATE_FAILED); @@ -429,11 +422,10 @@ public void run() { log.info("[Job {}]: cleanup successful.", getId()); writeLog("Cleanup successful."); - closeStdOutFiles(); - - setEndTime(System.currentTimeMillis()); - } + + closeStdOutFiles(); + setEndTime(System.currentTimeMillis()); } public void cancel() { @@ -448,8 +440,8 @@ public void cancel() { } private void initStdOutFiles() throws FileNotFoundException { - stdOutStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, "std.out"))); - logStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, "job.txt"))); + stdOutStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, JOB_OUT))); + logStream = new BufferedOutputStream(new FileOutputStream(FileUtil.path(localWorkspace, JOB_LOG))); } private void closeStdOutFiles() { @@ -459,8 +451,15 @@ private void closeStdOutFiles() { stdOutStream.close(); logStream.close(); - } catch (IOException e) { + // stage files to workspace + workspace.uploadLog(new File(FileUtil.path(localWorkspace, JOB_OUT))); + workspace.uploadLog(new File(FileUtil.path(localWorkspace, JOB_LOG))); + + FileUtil.deleteFile(FileUtil.path(localWorkspace, JOB_OUT)); + FileUtil.deleteFile(FileUtil.path(localWorkspace, JOB_LOG)); + } catch (IOException e) { + log.error("[Job {}]: Staging log files failed.", getId(), e); } } @@ -474,25 +473,13 @@ public void writeOutput(String line) { } } catch (IOException e) { - + log.error("[Job {}]: Write output failed.", getId(), e); } } public void writeOutputln(String line) { - - try { - if (stdOutStream == null) { - initStdOutFiles(); - } - - stdOutStream.write(line.getBytes("UTF-8")); - stdOutStream.write("\n".getBytes("UTF-8")); - stdOutStream.flush(); - - } catch (IOException e) { - } - + writeOutput(line + "\n"); } public void writeLog(String line) { @@ -508,6 +495,7 @@ public void writeLog(String line) { logStream.flush(); } catch (IOException e) { + log.error("[Job {}]: Write output failed.", getId(), e); } } @@ -524,14 +512,6 @@ public CloudgeneContext getContext() { return context; } - public void setLogs(String logs) { - this.logs = logs; - } - - public String getLogs() { - return logs; - } - @Override public boolean equals(Object obj) { @@ -583,10 +563,6 @@ public String getApplicationId() { return applicationId; } - public boolean isCanceld() { - return canceld; - } - public boolean isRunning() { return state == STATE_EXPORTING || state == STATE_RUNNING || state == STATE_WAITING; } @@ -619,16 +595,13 @@ public void kill() { } - public long getCurrentTime() { - return System.currentTimeMillis(); - } - - public void setCurrentTime(long time) { - - } - public String getPublicJobId() { return publicJobId; } + public String getLog(String name) { + String logFilename = FileUtil.path(settings.getLocalWorkspace(), getId(), name); + return FileUtil.readFileAsString(logFilename); + } + } diff --git a/src/main/java/cloudgene/mapred/jobs/CloudgeneJob.java b/src/main/java/cloudgene/mapred/jobs/CloudgeneJob.java index 895bcbb7..96f14f4d 100644 --- a/src/main/java/cloudgene/mapred/jobs/CloudgeneJob.java +++ b/src/main/java/cloudgene/mapred/jobs/CloudgeneJob.java @@ -103,7 +103,7 @@ public boolean setup() { try { log.info("[Job {}] Setup workspace {}'", getId(), workspace.getName()); context.log("Setup External Workspace on " + workspace.getName()); - workspace.setup(this.getId()); + workspace.setup(); context.setWorkspace(workspace); } catch (Exception e) { writeLog(e.toString()); diff --git a/src/main/java/cloudgene/mapred/jobs/workspace/IWorkspace.java b/src/main/java/cloudgene/mapred/jobs/workspace/IWorkspace.java index 701a6fc1..edff9bb1 100644 --- a/src/main/java/cloudgene/mapred/jobs/workspace/IWorkspace.java +++ b/src/main/java/cloudgene/mapred/jobs/workspace/IWorkspace.java @@ -9,11 +9,15 @@ public interface IWorkspace { - public void setup(String job) throws IOException; + public void setJob(String job); + + public void setup() throws IOException; public String upload(String id, File file) throws IOException; public String uploadInput(String id, File file) throws IOException; + + public String uploadLog(File file) throws IOException; public InputStream download(String url) throws IOException; @@ -40,5 +44,7 @@ public interface IWorkspace { public void cleanup(String job) throws IOException; public boolean exists(String path) throws IOException; + + public String downloadLog(String string) throws IOException; } diff --git a/src/main/java/cloudgene/mapred/jobs/workspace/LocalWorkspace.java b/src/main/java/cloudgene/mapred/jobs/workspace/LocalWorkspace.java index c573a7df..6e35a4bf 100644 --- a/src/main/java/cloudgene/mapred/jobs/workspace/LocalWorkspace.java +++ b/src/main/java/cloudgene/mapred/jobs/workspace/LocalWorkspace.java @@ -43,8 +43,17 @@ public String getName() { } @Override - public void setup(String job) throws IOException { + public void setJob(String job) { workspace = FileUtil.path(location, job); + } + + @Override + public void setup() throws IOException { + + if (workspace == null) { + throw new IOException("No job id provided."); + } + log.info("Init workspace " + workspace); FileUtil.createDirectory(workspace); } @@ -59,6 +68,11 @@ public String upload(String id, File file) throws IOException { return target; } + @Override + public String uploadLog(File file) throws IOException { + return upload(LOGS_DIRECTORY, file); + } + @Override public String uploadInput(String id, File file) throws IOException { return upload(FileUtil.path(INPUT_DIRECTORY, id), file); @@ -74,8 +88,18 @@ public InputStream download(String path) throws IOException { if (file.exists()) { return new FileInputStream(file); } else { - throw new IOException("File '" + path + "' not found in workspace."); + throw new IOException("File '" + absolutePath + "' not found in workspace."); + } + } + + @Override + public String downloadLog(String name) throws IOException { + + if (workspace == null) { + throw new IOException("No job id provided."); } + + return FileUtil.readFileAsString(download(FileUtil.path(workspace, LOGS_DIRECTORY, name))); } @Override diff --git a/src/main/java/cloudgene/mapred/jobs/workspace/S3Workspace.java b/src/main/java/cloudgene/mapred/jobs/workspace/S3Workspace.java index ec8ff6e4..656779b9 100644 --- a/src/main/java/cloudgene/mapred/jobs/workspace/S3Workspace.java +++ b/src/main/java/cloudgene/mapred/jobs/workspace/S3Workspace.java @@ -52,9 +52,16 @@ public String getName() { } @Override - public void setup(String job) throws IOException { + public void setJob(String job) { + this.job = job; + } + + @Override + public void setup() throws IOException { - this.job = job; + if (job == null) { + throw new IOException("No job id provided."); + } if (location == null) { throw new IOException("No S3 Output Bucket specified."); @@ -86,6 +93,11 @@ public String uploadInput(String id, File file) throws IOException { return upload(FileUtil.path(INPUT_DIRECTORY, id), file); } + @Override + public String uploadLog(File file) throws IOException { + return upload(LOGS_DIRECTORY, file); + } + @Override public InputStream download(String url) throws IOException { @@ -99,6 +111,11 @@ public InputStream download(String url) throws IOException { return s3is; } + + @Override + public String downloadLog(String name) throws IOException { + return FileUtil.readFileAsString(download(FileUtil.path(LOGS_DIRECTORY, name))); + } public boolean exists(String url) { String bucket = S3Util.getBucket(url); @@ -245,7 +262,7 @@ public List getDownloads(String url) { return downloads; } - + @Override public List getLogs() { String url = location + "/" + job + "/" + LOGS_DIRECTORY; diff --git a/src/main/java/cloudgene/mapred/jobs/workspace/WorkspaceFactory.java b/src/main/java/cloudgene/mapred/jobs/workspace/WorkspaceFactory.java index 561d7a58..18f1c140 100644 --- a/src/main/java/cloudgene/mapred/jobs/workspace/WorkspaceFactory.java +++ b/src/main/java/cloudgene/mapred/jobs/workspace/WorkspaceFactory.java @@ -49,7 +49,9 @@ public IWorkspace getByUrl(String url) { } public IWorkspace getByJob(AbstractJob job) { - return getDefault(); + IWorkspace workspace = getDefault(); + workspace.setJob(job.getId()); + return workspace; } } diff --git a/src/main/java/cloudgene/mapred/server/controller/LogController.java b/src/main/java/cloudgene/mapred/server/controller/LogController.java index 6b5f20ea..700f3668 100644 --- a/src/main/java/cloudgene/mapred/server/controller/LogController.java +++ b/src/main/java/cloudgene/mapred/server/controller/LogController.java @@ -1,16 +1,17 @@ package cloudgene.mapred.server.controller; +import java.io.IOException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cloudgene.mapred.core.User; import cloudgene.mapred.jobs.AbstractJob; +import cloudgene.mapred.jobs.workspace.WorkspaceFactory; import cloudgene.mapred.server.Application; import cloudgene.mapred.server.auth.AuthenticationService; import cloudgene.mapred.server.auth.AuthenticationType; import cloudgene.mapred.server.services.JobService; -import cloudgene.mapred.util.Settings; -import genepi.io.FileUtil; import io.micronaut.http.MediaType; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Get; @@ -24,7 +25,7 @@ public class LogController { private static Logger log = LoggerFactory.getLogger(LogController.class); - + @Inject protected Application application; @@ -34,25 +35,21 @@ public class LogController { @Inject protected JobService jobService; + @Inject + protected WorkspaceFactory workspaceFactory; + @Get("/logs/{id}") @Secured(SecurityRule.IS_AUTHENTICATED) @Produces(MediaType.TEXT_PLAIN) - public String getByJobs(Authentication authentication, String id) { + public String getByJob(Authentication authentication, String id) throws IOException { User user = authenticationService.getUserByAuthentication(authentication, AuthenticationType.ALL_TOKENS); AbstractJob job = jobService.getByIdAndUser(id, user); - Settings settings = application.getSettings(); - // log file - String logFilename = FileUtil.path(settings.getLocalWorkspace(), job.getId(), "job.txt"); - String logContent = FileUtil.readFileAsString(logFilename); - - // std out - String outputFilename = FileUtil.path(settings.getLocalWorkspace(), job.getId(), "std.out"); - String outputContent = FileUtil.readFileAsString(outputFilename); - + String logContent = jobService.getJobLog(job, AbstractJob.JOB_LOG); + String outputContent = jobService.getJobLog(job, AbstractJob.JOB_OUT); + StringBuffer buffer = new StringBuffer(); - if (!logContent.isEmpty()) { buffer.append("job.txt:\n\n"); buffer.append(logContent); @@ -62,14 +59,13 @@ public String getByJobs(Authentication authentication, String id) { buffer.append("\n\nstd.out:\n\n"); buffer.append(outputContent); } - - + String message = String.format("Job: viewing logs for job ID %s", job.getId()); if (user.isAdmin()) { message += String.format(" (by ADMIN user ID %s - email %s)", user.getId(), user.getMail()); } log.info(message); - + return buffer.toString(); } diff --git a/src/main/java/cloudgene/mapred/server/responses/JobResponse.java b/src/main/java/cloudgene/mapred/server/responses/JobResponse.java index 6f550f00..f48bbb1f 100644 --- a/src/main/java/cloudgene/mapred/server/responses/JobResponse.java +++ b/src/main/java/cloudgene/mapred/server/responses/JobResponse.java @@ -16,7 +16,6 @@ public class JobResponse { private String application; private String applicationId; - private boolean canceld; private long deletedOn; private long endTime; private String id; @@ -52,14 +51,6 @@ public void setApplicationId(String applicationId) { this.applicationId = applicationId; } - public boolean isCanceld() { - return canceld; - } - - public void setCanceld(boolean canceld) { - this.canceld = canceld; - } - public long getDeletedOn() { return deletedOn; } @@ -202,12 +193,10 @@ public static JobResponse build(AbstractJob job, User user) { JobResponse response = new JobResponse(); response.setApplication(job.getApplication()); response.setApplicationId(job.getApplicationId()); - response.setCanceld(job.isCanceld()); response.setName(job.getName()); response.setId(job.getId()); response.setState(job.getState()); - response.setLogs(job.getLogs()); response.setPositionInQueue(job.getPositionInQueue()); response.setUserAgent(job.getUserAgent()); diff --git a/src/main/java/cloudgene/mapred/server/services/JobService.java b/src/main/java/cloudgene/mapred/server/services/JobService.java index 5b9f0f50..6ecc2fea 100644 --- a/src/main/java/cloudgene/mapred/server/services/JobService.java +++ b/src/main/java/cloudgene/mapred/server/services/JobService.java @@ -1,6 +1,7 @@ package cloudgene.mapred.server.services; import java.io.File; +import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; @@ -125,7 +126,8 @@ public AbstractJob submitJob(String appId, List form, User user) { try { // setup workspace - workspace.setup(id); + workspace.setJob(id); + workspace.setup(); // parse input params inputParams = parseAndUpdateInputParams(form, app, workspace); @@ -140,7 +142,7 @@ public AbstractJob submitJob(String appId, List form, User user) { name = jobName; } - //TODO: remove and solve via workspace! + // TODO: remove and solve via workspace! String localWorkspace = FileUtil.path(settings.getLocalWorkspace(), id); FileUtil.createDirectory(localWorkspace); @@ -266,17 +268,17 @@ public AbstractJob restart(AbstractJob job) { } - IWorkspace workspace = workspaceFactory.getDefault(); try { // setup workspace - workspace.setup(job.getId()); + workspace.setJob(job.getId()); + workspace.setup(); } catch (Exception e) { throw new JsonHttpStatusException(HttpStatus.BAD_REQUEST, e.getMessage()); } job.setWorkspace(workspace); - + ((CloudgeneJob) job).loadApp(application.getWdlApp()); this.application.getWorkflowEngine().restart(job); @@ -522,7 +524,7 @@ public List getJobs(String state) { case "running-stq": - //TODO: remove! + // TODO: remove! jobs = new Vector(); break; @@ -548,6 +550,16 @@ public List getJobs(String state) { return jobs; } + public String getJobLog(AbstractJob job, String name) throws IOException { + if (job.isRunning()) { + // files are locally when job is running + return job.getLog(name); + } else { + IWorkspace workspace = workspaceFactory.getByJob(job); + return workspace.downloadLog(name); + } + } + public boolean needsImport(String url) { return url.startsWith("sftp://") || url.startsWith("http://") || url.startsWith("https://") || url.startsWith("ftp://") || url.startsWith("s3://");