diff --git a/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/expression/javascript/CWLExpressionJavascriptResolver.java b/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/expression/javascript/CWLExpressionJavascriptResolver.java index 0bb45b972..f5ed66af5 100644 --- a/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/expression/javascript/CWLExpressionJavascriptResolver.java +++ b/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/expression/javascript/CWLExpressionJavascriptResolver.java @@ -45,6 +45,7 @@ public static Object evaluate(Object context, Object self, String expr, CWLRunti } Context cx = Context.enter(); + cx.setLanguageVersion(Context.VERSION_ES6); cx.setOptimizationLevel(OPTIMIZATION_LEVEL); cx.setMaximumInterpreterStackDepth(MAX_STACK_DEPTH); cx.setClassShutter(new CWLExpressionDenyAllClassShutter()); diff --git a/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/processor/callback/CWLStageInputProcessorCallback.java b/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/processor/callback/CWLStageInputProcessorCallback.java index 6dc022f98..b4cb6fe75 100644 --- a/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/processor/callback/CWLStageInputProcessorCallback.java +++ b/rabix-bindings-cwl/src/main/java/org/rabix/bindings/cwl/processor/callback/CWLStageInputProcessorCallback.java @@ -105,7 +105,7 @@ private String stagePath(String path, StageInput stageInput) throws BindingExcep return destinationFile.toString(); case LINK: try { - Files.createLink(destinationFile, file); + Files.createSymbolicLink(destinationFile, file); } catch (IOException e) { throw new BindingException(e); } diff --git a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java index 6c35fd3e8..6357b7318 100644 --- a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java +++ b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java @@ -380,6 +380,10 @@ public static List> getSecondaryFiles(SBJob job, HashAlgorit Path pathToSec = Paths.get(secondaryFilePath); if (Files.exists(pathToSec) || !onlyExisting) { Map file = SBFileValueHelper.pathToRawFile(pathToSec, hashAlgorithm, Paths.get(SBFileValueHelper.getPath(fileValue))); + boolean loadContents = SBBindingHelper.loadContents(binding); + if (loadContents) { + SBFileValueHelper.setContents(file); + } secondaryFileMaps.add(file); } } catch (IOException | URISyntaxException e) { diff --git a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/processor/callback/SBStageInputProcessorCallback.java b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/processor/callback/SBStageInputProcessorCallback.java index 8378d0a07..d2dd85eb9 100644 --- a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/processor/callback/SBStageInputProcessorCallback.java +++ b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/processor/callback/SBStageInputProcessorCallback.java @@ -109,7 +109,7 @@ private String stagePath(String path, StageInput stageInput) throws BindingExcep return destinationFile.getAbsolutePath(); case LINK: try { - Files.createLink(destinationFile.toPath(), file.toPath()); + Files.createSymbolicLink(destinationFile.toPath(), file.toPath()); } catch (IOException e) { throw new BindingException(e); } diff --git a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/service/impl/SBGlobServiceImpl.java b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/service/impl/SBGlobServiceImpl.java index 69d2d6d30..ee818ebcb 100644 --- a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/service/impl/SBGlobServiceImpl.java +++ b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/service/impl/SBGlobServiceImpl.java @@ -81,7 +81,7 @@ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOExce @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { if(dir.getFileName() != null) { - if (matcher.matches(dir.getFileName()) && isDir) { + if (matcher.matches(dir.getFileName()) && isDir && !dir.equals(globDir.toPath())) { files.add(dir.toFile()); } } diff --git a/rabix-engine-store-postgres/src/main/java/org/rabix/engine/store/postgres/jdbi/impl/JDBIIntermediaryFilesRepository.java b/rabix-engine-store-postgres/src/main/java/org/rabix/engine/store/postgres/jdbi/impl/JDBIIntermediaryFilesRepository.java index 28f0262ed..ceebe983f 100644 --- a/rabix-engine-store-postgres/src/main/java/org/rabix/engine/store/postgres/jdbi/impl/JDBIIntermediaryFilesRepository.java +++ b/rabix-engine-store-postgres/src/main/java/org/rabix/engine/store/postgres/jdbi/impl/JDBIIntermediaryFilesRepository.java @@ -1,39 +1,33 @@ package org.rabix.engine.store.postgres.jdbi.impl; -import java.lang.annotation.Annotation; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; -import java.util.Set; -import java.util.UUID; - import org.rabix.engine.store.postgres.jdbi.impl.JDBIIntermediaryFilesRepository.IntermediaryFileEntityMapper; import org.rabix.engine.store.repository.IntermediaryFilesRepository; import org.skife.jdbi.v2.SQLStatement; import org.skife.jdbi.v2.StatementContext; -import org.skife.jdbi.v2.sqlobject.Bind; -import org.skife.jdbi.v2.sqlobject.Binder; -import org.skife.jdbi.v2.sqlobject.BinderFactory; -import org.skife.jdbi.v2.sqlobject.BindingAnnotation; -import org.skife.jdbi.v2.sqlobject.SqlQuery; -import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.*; import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper; import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator; import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.unstable.BindIn; +import java.lang.annotation.*; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Set; +import java.util.UUID; + @RegisterMapper(IntermediaryFileEntityMapper.class) @UseStringTemplate3StatementLocator public interface JDBIIntermediaryFilesRepository extends IntermediaryFilesRepository { - + @SqlUpdate("insert into intermediary_files (root_id,filename,count) values (:root_id,:filename,:count)") void insert(@Bind("root_id") UUID root_id, @Bind("filename") String filename, @Bind("count") Integer count); + @SqlUpdate("insert into intermediary_files (root_id,filename,count) values (:root_id,:filename,:count) on conflict(root_id, filename) do nothing") + void insertIfNotExists(@Bind("root_id") UUID root_id, @Bind("filename") String filename, @Bind("count") Integer count); + @SqlUpdate("update intermediary_files set count=:count where root_id=:root_id and filename=:filename") void update(@Bind("root_id") UUID root_id, @Bind("filename") String filename, @Bind("count") Integer count); @@ -42,21 +36,21 @@ public interface JDBIIntermediaryFilesRepository extends IntermediaryFilesReposi @Override @SqlUpdate("insert into intermediary_files (root_id,filename,count) values (:root_id,:filename,1) on conflict (root_id, filename) do update set count=intermediary_files.count+1") void increment(@Bind("root_id") UUID rootId, @Bind("filename") String filename); - + @SqlUpdate("delete from intermediary_files where root_id=:root_id and filename=:filename") void delete(@Bind("root_id") UUID rootId, @Bind("filename") String filename); - + @SqlUpdate("delete from intermediary_files where root_id=:root_id") void delete(@Bind("root_id") UUID root_id); - + @Override @SqlUpdate("delete from intermediary_files where root_id in ()") void deleteByRootIds(@BindIn("ids") Set rootIds); - + @Override @SqlQuery("select * from intermediary_files where root_id=:root_id") List get(@Bind("root_id") UUID root_id); - + @BindingAnnotation(BindIntermediaryFileEntity.IntermediaryFileEntityBinderFactory.class) @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.PARAMETER }) @@ -72,7 +66,7 @@ public void bind(SQLStatement q, BindIntermediaryFileEntity bind, Intermediar } } } - + public static class IntermediaryFileEntityMapper implements ResultSetMapper { public IntermediaryFileEntity map(int index, ResultSet resultSet, StatementContext ctx) throws SQLException { UUID rootId = resultSet.getObject("root_id", UUID.class); @@ -81,5 +75,5 @@ public IntermediaryFileEntity map(int index, ResultSet resultSet, StatementConte return new IntermediaryFileEntity(rootId, filename, count); } } - + } diff --git a/rabix-engine-store/src/main/java/org/rabix/engine/store/memory/impl/InMemoryIntermediaryFilesRepository.java b/rabix-engine-store/src/main/java/org/rabix/engine/store/memory/impl/InMemoryIntermediaryFilesRepository.java index 60449ce19..bdafaafb9 100644 --- a/rabix-engine-store/src/main/java/org/rabix/engine/store/memory/impl/InMemoryIntermediaryFilesRepository.java +++ b/rabix-engine-store/src/main/java/org/rabix/engine/store/memory/impl/InMemoryIntermediaryFilesRepository.java @@ -7,7 +7,7 @@ public class InMemoryIntermediaryFilesRepository implements IntermediaryFilesRepository { - private final Map> intermediaryFilesRepository; + private final Map> intermediaryFilesRepository; public InMemoryIntermediaryFilesRepository() { intermediaryFilesRepository = new ConcurrentHashMap<>(); @@ -15,58 +15,53 @@ public InMemoryIntermediaryFilesRepository() { @Override public void insert(UUID rootId, String filename, Integer count) { + Map intermediaryPerRoot = intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>()); + intermediaryPerRoot.put(filename, new IntermediaryFileEntity(rootId, filename, count)); + intermediaryFilesRepository.put(rootId, intermediaryPerRoot); + } - if(intermediaryFilesRepository.containsKey(rootId)) { - List intermediaryPerRoot = intermediaryFilesRepository.get(rootId); - intermediaryPerRoot.add(new IntermediaryFileEntity(rootId, filename, count)); - } - else { - List intermediaryPerRoot = new ArrayList<>(); - intermediaryPerRoot.add(new IntermediaryFileEntity(rootId, filename, count)); + @Override + public void insertIfNotExists(UUID rootId, String filename, Integer count) { + Map intermediaryPerRoot = intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>()); + IntermediaryFileEntity intermediaryFileEntity = intermediaryPerRoot.get(filename); + + if (intermediaryFileEntity == null) { + intermediaryFileEntity = new IntermediaryFileEntity(rootId, filename, count); + + intermediaryPerRoot.put(filename, intermediaryFileEntity); intermediaryFilesRepository.put(rootId, intermediaryPerRoot); } } @Override public void update(UUID rootId, String filename, Integer count) { - if(intermediaryFilesRepository.containsKey(rootId)) { - List intermediaryPerRoot = intermediaryFilesRepository.get(rootId); - for(IntermediaryFileEntity file: intermediaryPerRoot) { - if(file.getFilename().equals(filename)) { - file.setCount(count); - break; - } + Map intermediaryPerRoot = intermediaryFilesRepository.get(rootId); + if (intermediaryPerRoot == null) { + insert(rootId, filename, count); + } else { + IntermediaryFileEntity intermediaryFileEntity = intermediaryPerRoot.get(filename); + if (intermediaryFileEntity == null) { + intermediaryFileEntity = new IntermediaryFileEntity(rootId, filename, count); + } else { + intermediaryFileEntity.setCount(count); } + intermediaryPerRoot.put(filename, intermediaryFileEntity); } } @Override public void delete(UUID rootId, String filename) { - if(intermediaryFilesRepository.containsKey(rootId)) { - List intermediaryPerRoot = intermediaryFilesRepository.get(rootId); - for (Iterator iterator = intermediaryPerRoot.iterator(); iterator.hasNext();) { - IntermediaryFileEntity file = iterator.next(); - if (file.getFilename().equals(filename)) { - iterator.remove(); - break; - } - } - } + intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>()).remove(filename); } @Override public void delete(UUID rootId) { - if(intermediaryFilesRepository.containsKey(rootId)) { - intermediaryFilesRepository.remove(rootId); - } + intermediaryFilesRepository.remove(rootId); } @Override public List get(UUID rootId) { - if(intermediaryFilesRepository.containsKey(rootId)) { - return intermediaryFilesRepository.get(rootId); - } - return Collections.emptyList(); + return new ArrayList<>(intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>()).values()); } @Override @@ -78,10 +73,31 @@ public void deleteByRootIds(Set rootIds) { @Override public void decrement(UUID rootId, String filename) { + Map intermediaryFiles = intermediaryFilesRepository.get(rootId); + if (intermediaryFiles == null) { + return; + } + + IntermediaryFileEntity intermediaryFileEntity = intermediaryFiles.get(filename); + if (intermediaryFileEntity == null) { + return; + } + intermediaryFileEntity.decrement(); } + @Override public void increment(UUID rootId, String filename) { + Map intermediaryFiles = intermediaryFilesRepository.getOrDefault(rootId, new ConcurrentHashMap<>()); + + IntermediaryFileEntity intermediaryFileEntity = intermediaryFiles.get(filename); + if (intermediaryFileEntity == null) { + intermediaryFileEntity = new IntermediaryFileEntity(rootId, filename, 0); + } + + intermediaryFileEntity.increment(); + intermediaryFiles.put(filename, intermediaryFileEntity); + intermediaryFilesRepository.put(rootId, intermediaryFiles); } } diff --git a/rabix-engine-store/src/main/java/org/rabix/engine/store/repository/IntermediaryFilesRepository.java b/rabix-engine-store/src/main/java/org/rabix/engine/store/repository/IntermediaryFilesRepository.java index 9367b4c6d..6bf3fe7a1 100644 --- a/rabix-engine-store/src/main/java/org/rabix/engine/store/repository/IntermediaryFilesRepository.java +++ b/rabix-engine-store/src/main/java/org/rabix/engine/store/repository/IntermediaryFilesRepository.java @@ -7,25 +7,27 @@ public interface IntermediaryFilesRepository { void insert(UUID rootId, String filename, Integer count); - + + void insertIfNotExists(UUID rootId, String filename, Integer count); + void update(UUID rootId, String filename, Integer count); - + void delete(UUID rootId, String filename); - + void delete(UUID rootId); - + void deleteByRootIds(Set rootIds); - + List get(UUID rootId); - + void decrement(UUID rootId, String filename); - - public class IntermediaryFileEntity { - + + class IntermediaryFileEntity { + UUID rootId; String filename; Integer count; - + public IntermediaryFileEntity(UUID rootId, String filename, Integer count) { super(); this.rootId = rootId; @@ -35,28 +37,41 @@ public IntermediaryFileEntity(UUID rootId, String filename, Integer count) { public UUID getRootId() { return rootId; } + public void setRootId(UUID rootId) { this.rootId = rootId; } + public String getFilename() { return filename; } + public void setFilename(String filename) { this.filename = filename; } + public Integer getCount() { return count; } + public void setCount(Integer count) { this.count = count; } - + + public void decrement() { + count = count == null ? -1 : count - 1; + } + + public void increment() { + count = count == null ? 1 : count + 1; + } + @Override public String toString() { - return "IntermediaryFileEntity [filename=" + filename + ", count=" + count + "]"; + return "IntermediaryFileEntity [rootId=" + rootId + ", filename=" + filename + ", count=" + count + "]"; } } void increment(UUID rootId, String filename); - + } diff --git a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/InputEventHandler.java b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/InputEventHandler.java index 08f4e6084..5370bede4 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/InputEventHandler.java +++ b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/InputEventHandler.java @@ -39,12 +39,12 @@ public class InputEventHandler implements EventHandler { @Inject private EventProcessor eventProcessor; @Inject - private IntermediaryFilesService filesService; - + private IntermediaryFilesService intermediaryFilesService; private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void handle(InputUpdateEvent event, EventHandlingMode mode) throws EventHandlerException { + logger.debug(event.toString()); JobRecord job = jobService.find(event.getJobId(), event.getContextId()); if (job == null) { @@ -52,9 +52,11 @@ public void handle(InputUpdateEvent event, EventHandlingMode mode) throws EventH return; } - filesService.handleInputSent(event.getContextId(), event.getValue()); - VariableRecord variable = variableService.find(event.getJobId(), event.getPortId(), LinkPortType.INPUT, event.getContextId()); + if (!job.isContainer() && !job.isScatterWrapper()) { + intermediaryFilesService.incrementInputFilesReferences(event.getContextId(), event.getValue()); + } + VariableRecord variable = variableService.find(event.getJobId(), event.getPortId(), LinkPortType.INPUT, event.getContextId()); DAGNode node = dagNodeService.get(InternalSchemaHelper.normalizeId(job.getId()), event.getContextId(), job.getDagHash()); if (event.isLookAhead()) { diff --git a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java index 7e8201212..8d939ea61 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java +++ b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java @@ -18,7 +18,6 @@ import org.rabix.common.helper.CloneHelper; import org.rabix.common.helper.InternalSchemaHelper; import org.rabix.common.helper.JSONHelper; -import org.rabix.common.logging.VerboseLogger; import org.rabix.engine.JobHelper; import org.rabix.engine.event.Event; import org.rabix.engine.event.impl.ContextStatusEvent; @@ -55,6 +54,7 @@ public class JobStatusEventHandler implements EventHandler { private final VariableRecordService variableRecordService; private final ContextRecordService contextRecordService; private final JobStatsRecordService jobStatsRecordService; + private final IntermediaryFilesService intermediaryFilesService; private final JobRepository jobRepository; private final JobService jobService; @@ -68,7 +68,7 @@ public JobStatusEventHandler(final DAGNodeService dagNodeService, final AppServi final VariableRecordService variableRecordService, final ContextRecordService contextRecordService, final EventProcessor eventProcessor, final ScatterHandler scatterHelper, final JobRepository jobRepository, final JobService jobService, final JobStatsRecordService jobStatsRecordService, - final Configuration configuration, final JobHelper jobHelper) { + final Configuration configuration, final JobHelper jobHelper, final IntermediaryFilesService intermediaryFilesService) { this.dagNodeService = dagNodeService; this.scatterHelper = scatterHelper; this.eventProcessor = eventProcessor; @@ -80,9 +80,9 @@ public JobStatusEventHandler(final DAGNodeService dagNodeService, final AppServi this.appService = appService; this.jobService = jobService; this.jobHelper = jobHelper; - this.jobRepository = jobRepository; this.setResources = configuration.getBoolean("engine.set_resources", false); + this.intermediaryFilesService = intermediaryFilesService; } @Override @@ -149,18 +149,6 @@ public void handle(JobStatusEvent event, EventHandlingMode mode) throws EventHan } break; case COMPLETED: - if (jobRecord.getState() == JobRecord.JobState.COMPLETED) { - logger.info("Job {} of {} is already completed.", jobRecord.getId(), jobRecord.getRootId()); - break; - } - - if (!jobRecord.isRoot()) { - jobService.delete(jobRecord.getRootId(), jobRecord.getExternalId()); - if (jobRecord.isContainer() || jobRecord.isScatterWrapper()) { - VerboseLogger.log(String.format("Job %s has completed", jobRecord.getId())); - } - } - updateJobStats(jobRecord, jobStatsRecord); if ((!jobRecord.isScatterWrapper() || jobRecord.isRoot()) && !jobRecord.isContainer()) { @@ -173,6 +161,11 @@ public void handle(JobStatusEvent event, EventHandlingMode mode) throws EventHan jobRecord.setState(JobRecord.JobState.COMPLETED); jobRecordService.update(jobRecord); + if (!jobRecord.isContainer() && !jobRecord.isScatterWrapper()) { + Job job = jobRepository.get(event.getEventGroupId()); + intermediaryFilesService.decrementInputFilesReferences(event.getContextId(), job.getInputs()); + } + if (jobRecord.isRoot()) { eventProcessor.send(new ContextStatusEvent(event.getContextId(), ContextStatus.COMPLETED)); try { @@ -185,12 +178,17 @@ public void handle(JobStatusEvent event, EventHandlingMode mode) throws EventHan e.printStackTrace(); } } else { + try { + Job job = jobHelper.createJob(jobRecord, JobStatus.COMPLETED, event.getResult()); + jobRepository.update(job); + + jobService.handleJobCompleted(job); + } catch (BindingException e) { + logger.warn("Could not create completed job for {}", event.getJobId()); + } + if (!jobRecord.isScattered()) { checkJobRootPartiallyCompleted(jobRecord, mode); - try { - jobService.handleJobCompleted(jobHelper.createJob(jobRecord, JobStatus.COMPLETED, event.getResult())); - } catch (BindingException e) { - } } } break; diff --git a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/OutputEventHandler.java b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/OutputEventHandler.java index da5f21532..13a564133 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/OutputEventHandler.java +++ b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/OutputEventHandler.java @@ -52,6 +52,7 @@ public class OutputEventHandler implements EventHandler { public void handle(final OutputUpdateEvent event, EventHandlingMode mode) throws EventHandlerException { + logger.debug(event.toString()); JobRecord sourceJob = jobRecordService.find(event.getJobId(), event.getContextId()); if (sourceJob == null) { @@ -63,6 +64,8 @@ public void handle(final OutputUpdateEvent event, EventHandlingMode mode) throws jobRecordService.resetOutputPortCounter(sourceJob, event.getNumberOfScattered(), event.getPortId()); } + filesService.registerOutputFiles(event.getContextId(), event.getValue()); + Boolean isScatterWrapper = sourceJob.isScatterWrapper(); VariableRecord sourceVariable = variableService.find(event.getJobId(), event.getPortId(), LinkPortType.OUTPUT, event.getContextId()); @@ -116,10 +119,6 @@ public void handle(final OutputUpdateEvent event, EventHandlingMode mode) throws eventProcessor.send(new JobStatusEvent(sourceJob.getId(), event.getContextId(), JobState.COMPLETED, createJob(sourceJob, JobStatus.COMPLETED).getOutputs(), event.getEventGroupId(), sourceJob.getId())); } - - if (sourceJob.isCompleted() || links.isEmpty()) { - filesService.handleDanglingOutput(event.getContextId(), value); - } } diff --git a/rabix-engine/src/main/java/org/rabix/engine/service/IntermediaryFilesService.java b/rabix-engine/src/main/java/org/rabix/engine/service/IntermediaryFilesService.java index 09ac78213..331564519 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/service/IntermediaryFilesService.java +++ b/rabix-engine/src/main/java/org/rabix/engine/service/IntermediaryFilesService.java @@ -1,31 +1,19 @@ package org.rabix.engine.service; -import java.util.Set; -import java.util.UUID; - -import org.rabix.bindings.model.FileValue; import org.rabix.bindings.model.Job; -public interface IntermediaryFilesService { - - void addOrIncrement(UUID rootId, FileValue file, Integer usage); - - void decrementFiles(UUID rootId, Set checkFiles); +import java.util.UUID; - void jobFailed(UUID rootId, Set rootInputs); +public interface IntermediaryFilesService { - void handleUnusedFiles(Job job); + void registerOutputFiles(UUID rootId, Object value); - void handleJobCompleted(Job job); + void handleUnusedFilesIfAny(Job job); void handleJobFailed(Job job, Job rootJob); - void extractPathsFromFileValue(Set paths, FileValue file); - - void handleInputSent(UUID rootId, Object input); - - void handleDanglingOutput(UUID rootId, Object input); + void decrementInputFilesReferences(UUID rootId, Object value); - void handleInputSent(UUID rootId, Object input, int count); + void incrementInputFilesReferences(UUID rootId, Object inputs); } diff --git a/rabix-engine/src/main/java/org/rabix/engine/service/impl/GarbageCollectionServiceImpl.java b/rabix-engine/src/main/java/org/rabix/engine/service/impl/GarbageCollectionServiceImpl.java index 6340529df..48f7a4d23 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/service/impl/GarbageCollectionServiceImpl.java +++ b/rabix-engine/src/main/java/org/rabix/engine/service/impl/GarbageCollectionServiceImpl.java @@ -3,8 +3,10 @@ import com.google.common.collect.Sets; import com.google.inject.Inject; import org.apache.commons.configuration.Configuration; +import org.rabix.bindings.model.Job; import org.rabix.engine.metrics.MetricsHelper; import org.rabix.engine.service.GarbageCollectionService; +import org.rabix.engine.service.IntermediaryFilesService; import org.rabix.engine.store.model.ContextRecord; import org.rabix.engine.store.model.ContextRecord.ContextStatus; import org.rabix.engine.store.model.JobRecord; @@ -33,6 +35,7 @@ public class GarbageCollectionServiceImpl implements GarbageCollectionService { private final DAGRepository dagRepository; private final ContextRecordRepository contextRecordRepository; private final IntermediaryFilesRepository intermediaryFilesRepository; + private final IntermediaryFilesService intermediaryFilesService; private final TransactionHelper transactionHelper; private final MetricsHelper metricsHelper; @@ -57,6 +60,7 @@ public GarbageCollectionServiceImpl(JobRepository jobRepository, DAGRepository dagRepository, ContextRecordRepository contextRecordRepository, IntermediaryFilesRepository intermediaryFilesRepository, + IntermediaryFilesService intermediaryFilesService, TransactionHelper transactionHelper, MetricsHelper metricsHelper, Configuration configuration) { @@ -69,6 +73,7 @@ public GarbageCollectionServiceImpl(JobRepository jobRepository, this.dagRepository = dagRepository; this.contextRecordRepository = contextRecordRepository; this.intermediaryFilesRepository = intermediaryFilesRepository; + this.intermediaryFilesService = intermediaryFilesService; this.transactionHelper = transactionHelper; this.metricsHelper = metricsHelper; @@ -157,6 +162,15 @@ private void collect(JobRecord jobRecord) { private void flush(UUID rootId, List garbage) { garbage.forEach(record -> { + logger.debug("flush(rootId={}, job={})", rootId, record.getId()); + + Job job = jobRepository.get(record.getExternalId()); + if (job != null) { + intermediaryFilesService.handleUnusedFilesIfAny(job); + } else { + logger.debug("Cannot check intermediary files. Unknown job {} of {}", record.getId(), record.getRootId()); + } + jobRecordRepository.delete(record.getExternalId(), record.getRootId()); jobRepository.delete(record.getRootId(), new HashSet<>(Collections.singletonList(record.getExternalId()))); linkRecordRepository.delete(record.getId(), rootId); diff --git a/rabix-engine/src/main/java/org/rabix/engine/service/impl/IntermediaryFilesServiceImpl.java b/rabix-engine/src/main/java/org/rabix/engine/service/impl/IntermediaryFilesServiceImpl.java index f546d3de5..abc079b4c 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/service/impl/IntermediaryFilesServiceImpl.java +++ b/rabix-engine/src/main/java/org/rabix/engine/service/impl/IntermediaryFilesServiceImpl.java @@ -1,92 +1,79 @@ package org.rabix.engine.service.impl; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; - +import com.google.inject.Inject; import org.rabix.bindings.helper.FileValueHelper; import org.rabix.bindings.model.FileValue; import org.rabix.bindings.model.Job; -import org.rabix.bindings.model.dag.DAGLinkPort.LinkPortType; -import org.rabix.common.helper.InternalSchemaHelper; -import org.rabix.engine.store.model.LinkRecord; -import org.rabix.engine.store.repository.IntermediaryFilesRepository; -import org.rabix.engine.store.repository.IntermediaryFilesRepository.IntermediaryFileEntity; import org.rabix.engine.service.IntermediaryFilesHandler; import org.rabix.engine.service.IntermediaryFilesService; -import org.rabix.engine.service.LinkRecordService; +import org.rabix.engine.store.repository.IntermediaryFilesRepository; +import org.rabix.engine.store.repository.IntermediaryFilesRepository.IntermediaryFileEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.inject.Inject; +import java.util.*; +import java.util.Map.Entry; +import java.util.function.Consumer; public class IntermediaryFilesServiceImpl implements IntermediaryFilesService { private final static Logger logger = LoggerFactory.getLogger(IntermediaryFilesServiceImpl.class); private IntermediaryFilesRepository intermediaryFilesRepository; - private LinkRecordService linkRecordService; private IntermediaryFilesHandler fileHandler; - + @Inject - protected IntermediaryFilesServiceImpl(LinkRecordService linkRecordService, IntermediaryFilesHandler handler, IntermediaryFilesRepository intermediaryFilesRepository) { - this.linkRecordService = linkRecordService; + protected IntermediaryFilesServiceImpl(IntermediaryFilesHandler handler, IntermediaryFilesRepository intermediaryFilesRepository) { this.fileHandler = handler; this.intermediaryFilesRepository = intermediaryFilesRepository; } @Override - public void decrementFiles(UUID rootId, Set checkFiles) { - for (String path : checkFiles) { - intermediaryFilesRepository.decrement(rootId, path); - } + @SuppressWarnings("unchecked") + public void registerOutputFiles(UUID rootId, Object value) { + FileValueHelper + .getFilesFromValue(value) + .forEach(fileValue -> + extractPathsFromFileValue(fileValue) + .forEach(path -> intermediaryFilesRepository.insertIfNotExists(rootId, path, 0))); } - + @Override - public void handleUnusedFiles(Job job){ - fileHandler.handleUnusedFiles(job, getUnusedFiles(job.getRootId())); - } + @SuppressWarnings("unchecked") + public void handleUnusedFilesIfAny(Job job) { + Set unusedFiles = getUnusedFiles(job.getRootId()); + logger.debug("handleUnusedFiles of {}: {}", job.getRootId(), unusedFiles); + fileHandler.handleUnusedFiles(job, unusedFiles); + } @Override - public void handleJobCompleted(Job job) { - if (!job.isRoot()) { - Set inputs = new HashSet(); - for (Map.Entry entry : job.getInputs().entrySet()) { - Set files = new HashSet(FileValueHelper.getFilesFromValue(entry.getValue())); - for (FileValue file : files) { - extractPathsFromFileValue(inputs, file); - } - } - decrementFiles(job.getRootId(), inputs); - handleUnusedFiles(job); - } + public void handleJobFailed(Job job, Job rootJob) { + handleUnusedFilesIfAny(job); } @Override - public void handleJobFailed(Job job, Job rootJob) { - handleUnusedFiles(job); + public void incrementInputFilesReferences(UUID rootId, Object value) { + logger.debug("incrementInputFilesReferences(rootId={}, value={})", rootId, value); + FileValueHelper.getFilesFromValue(value).forEach(fileValue -> addOrIncrement(rootId, fileValue)); } - + @Override - public void jobFailed(UUID rootId, Set rootInputs) { - List filesForRootIdList = intermediaryFilesRepository.get(rootId); - Map filesForRootId = convertToMap(filesForRootIdList); - for(Iterator> it = filesForRootId.entrySet().iterator(); it.hasNext();) { - Entry fileEntry = it.next(); - if(!rootInputs.contains(fileEntry.getKey())) { - logger.debug("Removing onJobFailed: " + fileEntry.getKey()); - filesForRootId.put(fileEntry.getKey(), 0); - } + @SuppressWarnings("unchecked") + public void decrementInputFilesReferences(UUID rootId, Object value) { + logger.debug("decrementInputFilesReferences(rootId={}, value={})", rootId, value); + decrement(rootId, (Map) value); + } + + private void decrement(UUID rootId, Map inputOutputMap) { + if (inputOutputMap == null) { + return; } + + forEachPath(inputOutputMap, path -> intermediaryFilesRepository.decrement(rootId, path)); } - + + private Map convertToMap(List filesForRootId) { Map result = new HashMap<>(); for(IntermediaryFileEntity f: filesForRootId) { @@ -94,26 +81,35 @@ private Map convertToMap(List filesForR } return result; } - - @Override - public void extractPathsFromFileValue(Set paths, FileValue file) { - paths.add(file.getPath()); - if(file.getSecondaryFiles()!=null) - for(FileValue f: file.getSecondaryFiles()) { - extractPathsFromFileValue(paths, f); + + private Set extractPathsFromFileValue(FileValue file) { + Set paths = new HashSet<>(); + + String path = file.getPath(); + if (path != null) { + paths.add(path); + } + + if (file.getSecondaryFiles() != null) { + for (FileValue f : file.getSecondaryFiles()) { + paths.addAll(extractPathsFromFileValue(f)); } + } + return paths; } - - @Override - public void addOrIncrement(UUID rootId, FileValue file, Integer usage) { - Set paths = new HashSet(); - extractPathsFromFileValue(paths, file); + + private void addOrIncrement(UUID rootId, FileValue file) { + if (file == null || file.getPath() == null) { + return; + } + + Set paths = extractPathsFromFileValue(file); for(String path: paths) { intermediaryFilesRepository.increment(rootId, path); } } - - protected Set getUnusedFiles(UUID rootId) { + + private Set getUnusedFiles(UUID rootId) { List filesForRootIdList = intermediaryFilesRepository.get(rootId); Map filesForRootId = convertToMap(filesForRootIdList); Set unusedFiles = new HashSet(); @@ -128,26 +124,12 @@ protected Set getUnusedFiles(UUID rootId) { return unusedFiles; } - @Override - public void handleInputSent(UUID rootId, Object input) { - handleInputSent(rootId, input, 1); - } - - @Override - public void handleInputSent(UUID rootId, Object input, int count) { - Set files = new HashSet(FileValueHelper.getFilesFromValue(input)); - for(FileValue file: files){ - addOrIncrement(rootId, file, count); - } - } - - @Override - public void handleDanglingOutput(UUID rootId, Object input) { - Set inputs = new HashSet(); - Set files = new HashSet(FileValueHelper.getFilesFromValue(input)); - for (FileValue file : files) { - extractPathsFromFileValue(inputs, file); - } - decrementFiles(rootId, inputs); + private void forEachPath(Map map, Consumer consumer) { + map.values().stream() + .map(FileValueHelper::getFilesFromValue) + .flatMap(List::stream) + .forEach(fileValue -> + extractPathsFromFileValue(fileValue) + .forEach(consumer)); } } \ No newline at end of file diff --git a/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java b/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java index da2cecd0f..c8d77291b 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java +++ b/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java @@ -242,10 +242,6 @@ public void handleJobsReady(Set jobs, UUID rootId, UUID producedByNode) { engineStatusCallback.onJobsReady(jobs, rootId, producedByNode); } catch (EngineStatusCallbackException e) { logger.error("Engine status callback failed", e); - } finally { - if (!jobs.isEmpty()) { - jobRepository.delete(rootId, jobs.stream().filter(job -> !job.isRoot()).map(Job::getId).collect(Collectors.toSet())); - } } } @@ -367,7 +363,7 @@ public void handleJobRootAborted(Job rootJob) { @Override public void handleJobCompleted(Job job){ logger.info("Job id: {}, name:{}, rootId: {} is completed.", job.getId(), job.getName(), job.getRootId()); - try{ + try { engineStatusCallback.onJobCompleted(job.getId(), job.getRootId()); } catch (EngineStatusCallbackException e) { logger.error("Engine status callback failed",e); diff --git a/rabix-engine/src/main/java/org/rabix/engine/service/impl/NoOpIntermediaryFilesServiceHandler.java b/rabix-engine/src/main/java/org/rabix/engine/service/impl/NoOpIntermediaryFilesServiceHandler.java index e44045d1b..1ba3ac91e 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/service/impl/NoOpIntermediaryFilesServiceHandler.java +++ b/rabix-engine/src/main/java/org/rabix/engine/service/impl/NoOpIntermediaryFilesServiceHandler.java @@ -17,7 +17,7 @@ public NoOpIntermediaryFilesServiceHandler() {} @Override public void handleUnusedFiles(Job job, Set unusedFiles) { - logger.debug(String.format("handleUnusedFiles(%s)", job.getRootId())); + logger.debug(String.format("handleUnusedFilesIfAny(%s)", job.getRootId())); } } diff --git a/rabix-engine/src/main/resources/org/rabix/engine/jdbi/dbinit.sql b/rabix-engine/src/main/resources/org/rabix/engine/jdbi/dbinit.sql index c1f99f0f1..4a97d2128 100644 --- a/rabix-engine/src/main/resources/org/rabix/engine/jdbi/dbinit.sql +++ b/rabix-engine/src/main/resources/org/rabix/engine/jdbi/dbinit.sql @@ -465,4 +465,8 @@ DROP INDEX IF EXISTS event_status_index; DROP INDEX IF EXISTS event_id_type_index; DROP INDEX IF EXISTS backend_id_index; DROP INDEX IF EXISTS context_record_id_index; -DROP INDEX IF EXISTS context_record_status_index; \ No newline at end of file +DROP INDEX IF EXISTS context_record_status_index; + +--changeset bunny:1487849040814-77 dbms:postgresql +CREATE UNIQUE INDEX intermediary_files_index ON intermediary_files USING btree (root_id, filename); +--rollback DROP INDEX intermediary_files_index; \ No newline at end of file