diff --git a/changes/add_action_liveness.md b/changes/add_action_liveness.md new file mode 100644 index 000000000..e803c335d --- /dev/null +++ b/changes/add_action_liveness.md @@ -0,0 +1,2 @@ +* Allow actions to know if the olive is live +* Add `IS_LIVE` submission policy for Vidarr submit action diff --git a/plugin-guanyin/src/main/java/ca/on/oicr/gsi/shesmu/guanyin/RunReport.java b/plugin-guanyin/src/main/java/ca/on/oicr/gsi/shesmu/guanyin/RunReport.java index 2718cfe35..f39a6f18f 100644 --- a/plugin-guanyin/src/main/java/ca/on/oicr/gsi/shesmu/guanyin/RunReport.java +++ b/plugin-guanyin/src/main/java/ca/on/oicr/gsi/shesmu/guanyin/RunReport.java @@ -205,7 +205,8 @@ public ObjectNode parameters() { } @Override - public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { final var overloaded = services.isOverloaded("all", "guanyin"); if (!overloaded.isEmpty()) { errors = Collections.singletonList("Overloaded services: " + String.join(", ", overloaded)); diff --git a/plugin-jira/src/main/java/ca/on/oicr/gsi/shesmu/jira/BaseTicketAction.java b/plugin-jira/src/main/java/ca/on/oicr/gsi/shesmu/jira/BaseTicketAction.java index 1b913e9e2..9b1fa9d4d 100644 --- a/plugin-jira/src/main/java/ca/on/oicr/gsi/shesmu/jira/BaseTicketAction.java +++ b/plugin-jira/src/main/java/ca/on/oicr/gsi/shesmu/jira/BaseTicketAction.java @@ -175,7 +175,8 @@ protected abstract boolean isInTargetState( Stream closedStates, Predicate matchesIssue); @Override - public final ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public final ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { if (connection == null) { return ActionState.FAILED; } diff --git a/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/DeleteAction.java b/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/DeleteAction.java index 6d3fd14ad..0fe01e636 100644 --- a/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/DeleteAction.java +++ b/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/DeleteAction.java @@ -80,7 +80,8 @@ public int hashCode() { } @Override - public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { // The logic for removing happens in the other class so that it can make // serialised requests to the remote end return connection.get().rm(target.toString(), automatic); diff --git a/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/SymlinkAction.java b/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/SymlinkAction.java index b04d79696..19547b027 100644 --- a/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/SymlinkAction.java +++ b/plugin-sftp/src/main/java/ca/on/oicr/gsi/shesmu/sftp/SymlinkAction.java @@ -102,7 +102,8 @@ public void link(Path link) { } @Override - public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { // The logic for creating the symlink happens in the other class so that it can make serialised // requests to the remote end final var result = diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/BaseUnloadAction.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/BaseUnloadAction.java index d97f4339b..3365f3259 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/BaseUnloadAction.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/BaseUnloadAction.java @@ -60,7 +60,8 @@ public final Stream> commands() { protected abstract UnloadFilter createFilter(); @Override - public final ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public final ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { if (!allowedToRun) { errors = List.of("Waiting for human approval before removing data from Víðarr."); return ActionState.HALP; diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunState.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunState.java index 5f0214a9e..d2a7c5ba6 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunState.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunState.java @@ -53,7 +53,8 @@ public abstract PerformResult perform( URI vidarrUrl, SubmitWorkflowRequest request, SubmissionPolicy submissionPolicy, - Duration lastGeneratedByOlive) + Duration lastGeneratedByOlive, + boolean isOliveLive) throws IOException, InterruptedException; public abstract Optional reattempt(); diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateAttemptSubmit.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateAttemptSubmit.java index 2870e7e84..53691ddd6 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateAttemptSubmit.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateAttemptSubmit.java @@ -55,10 +55,11 @@ public PerformResult perform( URI vidarrUrl, SubmitWorkflowRequest request, SubmissionPolicy submissionPolicy, - Duration lastGeneratedByOlive) + Duration lastGeneratedByOlive, + boolean isOliveLive) throws IOException, InterruptedException { request.setAttempt(attempt); - request.setMode(submissionPolicy.mode(lastGeneratedByOlive)); + request.setMode(submissionPolicy.mode(lastGeneratedByOlive, isOliveLive)); final var response = VidarrPlugin.CLIENT.send( HttpRequest.newBuilder(vidarrUrl.resolve("/api/submit")) diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateConflicted.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateConflicted.java index 29bdf90f0..1100b1877 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateConflicted.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateConflicted.java @@ -49,7 +49,8 @@ public PerformResult perform( URI vidarrUrl, SubmitWorkflowRequest request, SubmissionPolicy submissionPolicy, - Duration lastGeneratedByOlive) { + Duration lastGeneratedByOlive, + boolean isOliveLive) { return new PerformResult(errors, ActionState.HALP, this); } diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateDead.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateDead.java index 1a6a8f872..4a48c009d 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateDead.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateDead.java @@ -33,7 +33,8 @@ public PerformResult perform( URI vidarrUrl, SubmitWorkflowRequest request, SubmissionPolicy submissionPolicy, - Duration lastGeneratedByOlive) { + Duration lastGeneratedByOlive, + boolean isOliveLive) { return new PerformResult(List.of(), ActionState.ZOMBIE, this); } diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMissing.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMissing.java index 647205afd..6c250e80c 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMissing.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMissing.java @@ -57,7 +57,8 @@ public PerformResult perform( URI vidarrUrl, SubmitWorkflowRequest request, SubmissionPolicy submissionPolicy, - Duration lastGeneratedByOlive) { + Duration lastGeneratedByOlive, + boolean isOliveLive) { return new PerformResult(errors, ActionState.FAILED, this); } diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMonitor.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMonitor.java index 973fd869e..93c43f3f0 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMonitor.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/RunStateMonitor.java @@ -129,7 +129,8 @@ public PerformResult perform( URI vidarrUrl, SubmitWorkflowRequest request, SubmissionPolicy submissionPolicy, - Duration lastGeneratedByOlive) + Duration lastGeneratedByOlive, + boolean isOliveLive) throws IOException, InterruptedException { return create(vidarrUrl, status.getId()); } diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmissionPolicy.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmissionPolicy.java index 3dda7e30a..36a4ac374 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmissionPolicy.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmissionPolicy.java @@ -5,13 +5,15 @@ /** Determines when a workflow run should be actually submitted */ public interface SubmissionPolicy { - static SubmissionPolicy ALWAYS = lastGeneratedByAnOlive -> SubmitMode.RUN; - static SubmissionPolicy DRY_RUN = lastGeneratedByAnOlive -> SubmitMode.DRY_RUN; + static SubmissionPolicy ALWAYS = (lastGeneratedByAnOlive, isOliveLive) -> SubmitMode.RUN; + static SubmissionPolicy DRY_RUN = (lastGeneratedByAnOlive, isOliveLive) -> SubmitMode.DRY_RUN; + static SubmissionPolicy IS_LIVE = + (lastGeneratedByAnOlive, isOliveLive) -> isOliveLive ? SubmitMode.RUN : SubmitMode.DRY_RUN; static SubmissionPolicy maxDelay(long maximum) { - return lastGeneratedByAnOlive -> + return (lastGeneratedByAnOlive, isOliveLive) -> lastGeneratedByAnOlive.getSeconds() > maximum ? SubmitMode.DRY_RUN : SubmitMode.RUN; } - SubmitMode mode(Duration lastGeneratedByAnOlive); + SubmitMode mode(Duration lastGeneratedByAnOlive, boolean isOliveLive); } diff --git a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmitAction.java b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmitAction.java index eec3286a1..52ca07de4 100644 --- a/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmitAction.java +++ b/plugin-vidarr/src/main/java/ca/on/oicr/gsi/shesmu/vidarr/SubmitAction.java @@ -202,7 +202,8 @@ public int hashCode() { } @Override - public synchronized ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public synchronized ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { if (stale) { return ActionState.ZOMBIE; } @@ -218,7 +219,8 @@ public synchronized ActionState perform(ActionServices services, Duration lastGe .map( url -> { try { - return state.perform(url, request, submissionPolicy, lastGeneratedByOlive); + return state.perform( + url, request, submissionPolicy, lastGeneratedByOlive, isOliveLive); } catch (IOException | InterruptedException e) { e.printStackTrace(); return new RunState.PerformResult( @@ -279,12 +281,13 @@ public void services(Set services) { @ActionParameter( name = "submission_policy", required = false, - type = "u3ALWAYS$t0DRY_RUN$t0MAX_DELAY$t1i") + type = "u3ALWAYS$t0DRY_RUN$t0IS_LIVE$t0MAX_DELAY$t1i") public void submissionPolicy(AlgebraicValue policy) { submissionPolicy = switch (policy.name()) { case "ALWAYS" -> SubmissionPolicy.ALWAYS; case "DRY_RUN" -> SubmissionPolicy.DRY_RUN; + case "IS_LIVE" -> SubmissionPolicy.IS_LIVE; case "MAX_DELAY" -> SubmissionPolicy.maxDelay((Long) policy.get(0)); default -> throw new IllegalStateException("Unexpected value: " + policy.name()); }; diff --git a/shesmu-pluginapi/src/main/java/ca/on/oicr/gsi/shesmu/plugin/action/Action.java b/shesmu-pluginapi/src/main/java/ca/on/oicr/gsi/shesmu/plugin/action/Action.java index 057f35ceb..e1439ef35 100644 --- a/shesmu-pluginapi/src/main/java/ca/on/oicr/gsi/shesmu/plugin/action/Action.java +++ b/shesmu-pluginapi/src/main/java/ca/on/oicr/gsi/shesmu/plugin/action/Action.java @@ -18,9 +18,9 @@ * times, it should be de-duplicated using {@link #equals(Object)} by the set. It will then attempt * to complete each action and track the success of performing an action. * - *

Creating this action should not perform it until {{@link #perform(ActionServices, Duration)} - * is called. If the action is needs to be in contact with a remote system, that information must be - * baked into its constructor. + *

Creating this action should not perform it until {{@link #perform(ActionServices, Duration, + * boolean)} is called. If the action is needs to be in contact with a remote system, that + * information must be baked into its constructor. */ public abstract class Action { private final String type; @@ -41,7 +41,7 @@ public Action(String type) { *

Since the scheduler will deduplicate the same action many times, most actions are fated to * die quickly. This method will be called to indicate that this instance of the object will * actually be used. It is called after {@link #prepare()} and before {@link - * #perform(ActionServices, Duration)}. + * #perform(ActionServices, Duration, boolean)}. * * @param actionId the action ID recorded for this action */ @@ -87,16 +87,26 @@ public Optional externalTimestamp() { /** * Attempt to complete this action. * - *

This will be called multiple times until an action returns {@link ActionState#SUCCEEDED}. - * Because Shesmu is stateless, the action must determine if an equivalent action has already been - * performed. This object may be recreated since the launch, so the action cannot expect to hold - * permanent state (e.g., job id) as a field. + *

This will be called multiple times until an action returns {@link ActionState#SUCCEEDED} or + * {@link ActionState#ZOMBIE}. Because Shesmu is stateless, the action must determine if an + * equivalent action has already been performed. This object may be recreated since the launch, so + * the action cannot expect to hold permanent state (e.g., job id) as a field. + * + * @param services an abstraction to allow action to query information about the server state + * @param lastGeneratedByOlive the delay between now and the last time the action was generated by + * an olive + * @param isOliveLive true if the action is connected to an olive or actnow file + * @return the state of the action; if {@link ActionState#SUCCEEDED} or {@link + * ActionState#ZOMBIE}, the action will not be polled again */ - public abstract ActionState perform(ActionServices services, Duration lastGeneratedByOlive); + public abstract ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive); /** - * The amount of time the {@link #perform(ActionServices, Duration)} method should be allowed to - * run for before being interrupted + * The amount of time the {@link #perform(ActionServices, Duration, boolean)} method should be + * allowed to run for before being interrupted + * + * @return the maximum time for the watchdog timer */ public Duration performTimeout() { return Duration.of(1, ChronoUnit.HOURS); @@ -122,9 +132,11 @@ public void purgeCleanup() {} * The number of minutes to wait before attempting to retry this action. * *

If an action has to be re-attempted, the action processor can wait until a certain window - * expires before it will call {@link #perform(ActionServices, Duration)} again. This only sets a - * lower limit on how frequently an action can be retried; there is no upper limit. This method - * should return a constant. + * expires before it will call {@link #perform(ActionServices, Duration, boolean)} again. This + * only sets a lower limit on how frequently an action can be retried; there is no upper limit. + * This method should return a constant. + * + * @return the preferred polling interval */ public abstract long retryMinutes(); diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/Server.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/Server.java index 472c1bba6..71f934a3f 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/Server.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/Server.java @@ -3026,7 +3026,7 @@ public void start() { compiler.start(fileWatcher); staticActions.start(fileWatcher); System.out.println("Starting action processor..."); - processor.start(executor); + processor.start(executor, compiler); System.out.println("Starting scheduler..."); master.start(executor); pluginManager.log("Shesmu started.", Map.of()); diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/NothingAction.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/NothingAction.java index 58a063edb..ba24f2c88 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/NothingAction.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/NothingAction.java @@ -91,7 +91,8 @@ public int hashCode() { } @Override - public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { return ActionState.ZOMBIE; } diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/actions/fake/FakeAction.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/actions/fake/FakeAction.java index 16ea2d9e5..05b4951ac 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/actions/fake/FakeAction.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/core/actions/fake/FakeAction.java @@ -76,7 +76,8 @@ public ObjectNode parameters() { } @Override - public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { return ActionState.ZOMBIE; } diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/server/ActionProcessor.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/server/ActionProcessor.java index 5a5430c56..cd841664c 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/server/ActionProcessor.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/server/ActionProcessor.java @@ -55,8 +55,8 @@ /** * Background process for launching actions and reporting the results * - *

This class collects actions and tries to {@link Action#perform(ActionServices, Duration)} - * until successful. + *

This class collects actions and tries to {@link Action#perform(ActionServices, Duration, + * boolean)} until successful. */ public final class ActionProcessor implements OliveServices, InputSource, MetroDiagram.OliveFlowReader { @@ -1579,8 +1579,8 @@ public Stream sources() { } /** Begin the action processor */ - public void start(ScheduledExecutorService executor) { - executor.scheduleWithFixedDelay(this::update, 5, 1, TimeUnit.MINUTES); + public void start(ScheduledExecutorService executor, Predicate isOliveLive) { + executor.scheduleWithFixedDelay(() -> this.update(isOliveLive), 5, 1, TimeUnit.MINUTES); executor.scheduleWithFixedDelay(this::updateAlerts, 5, 5, TimeUnit.MINUTES); } @@ -1678,7 +1678,8 @@ public Stream tags(Filter... filters) { .flatMap(entry -> Stream.concat(entry.getValue().tags.stream(), entry.getKey().tags())); } - private void update() { + private void update(Predicate isOliveLive) { + final var now = Instant.now(); final var candidates = actions.entrySet().stream() @@ -1689,7 +1690,7 @@ private void update() { && !entry.getValue().updateInProgress && Duration.between(entry.getValue().lastChecked, now).toMinutes() >= Math.max(10, entry.getKey().retryMinutes())) - /** + /* * Sort by time since last checked, and then priority, to avoid starving actions of * attention because of priority, then sort by their ActionState's processPriority so * that certain ActionStates get checked first. @@ -1754,7 +1755,8 @@ private void update() { .getKey() .perform( actionServices, - Duration.between(now, entry.getValue().lastAdded)); + Duration.between(now, entry.getValue().lastAdded), + entry.getValue().locations.stream().anyMatch(isOliveLive)); entry.getValue().thrown = null; } catch (final Throwable e) { entry.getValue().lastState = ActionState.UNKNOWN; diff --git a/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/CompilerTest.java b/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/CompilerTest.java index e506ff7d5..212802287 100644 --- a/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/CompilerTest.java +++ b/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/CompilerTest.java @@ -145,7 +145,8 @@ public int hashCode() { } @Override - public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { return ActionState.SUCCEEDED; } diff --git a/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/RunTest.java b/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/RunTest.java index 5f6c4a65f..d937d2f06 100644 --- a/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/RunTest.java +++ b/shesmu-server/src/test/java/ca/on/oicr/gsi/shesmu/RunTest.java @@ -59,7 +59,7 @@ private static class ActionChecker implements OliveServices { @Override public boolean accept( Action action, String filename, int line, int column, String hash, String[] tags) { - if (action.perform(null, Duration.ZERO) == ActionState.SUCCEEDED) { + if (action.perform(null, Duration.ZERO, true) == ActionState.SUCCEEDED) { good++; } else { bad++; @@ -184,7 +184,8 @@ public int hashCode() { } @Override - public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) { + public ActionState perform( + ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) { return ok ? ActionState.SUCCEEDED : ActionState.FAILED; }