Skip to content

Commit

Permalink
Allow actions to know their olive liveness
Browse files Browse the repository at this point in the history
Provide an indication of olive liveness to actions when performed and add a
Víðarr submission policy to check for liveness.
  • Loading branch information
apmasell authored and avarsava committed Jan 29, 2024
1 parent 7806aaa commit 0fe022d
Show file tree
Hide file tree
Showing 21 changed files with 82 additions and 46 deletions.
2 changes: 2 additions & 0 deletions changes/add_action_liveness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Allow actions to know if the olive is live
* Add `IS_LIVE` submission policy for Vidarr submit action
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public ObjectNode parameters() {
}

@Override
public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
final var overloaded = services.isOverloaded("all", "guanyin");
if (!overloaded.isEmpty()) {
errors = Collections.singletonList("Overloaded services: " + String.join(", ", overloaded));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ protected abstract boolean isInTargetState(
Stream<String> closedStates, Predicate<String> matchesIssue);

@Override
public final ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public final ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
if (connection == null) {
return ActionState.FAILED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public int hashCode() {
}

@Override
public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
// The logic for removing happens in the other class so that it can make
// serialised requests to the remote end
return connection.get().rm(target.toString(), automatic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public void link(Path link) {
}

@Override
public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
// The logic for creating the symlink happens in the other class so that it can make serialised
// requests to the remote end
final var result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public final Stream<ActionCommand<?>> commands() {
protected abstract UnloadFilter createFilter();

@Override
public final ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public final ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
if (!allowedToRun) {
errors = List.of("Waiting for human approval before removing data from Víðarr.");
return ActionState.HALP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public abstract PerformResult perform(
URI vidarrUrl,
SubmitWorkflowRequest request,
SubmissionPolicy submissionPolicy,
Duration lastGeneratedByOlive)
Duration lastGeneratedByOlive,
boolean isOliveLive)
throws IOException, InterruptedException;

public abstract Optional<RunState> reattempt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ public PerformResult perform(
URI vidarrUrl,
SubmitWorkflowRequest request,
SubmissionPolicy submissionPolicy,
Duration lastGeneratedByOlive)
Duration lastGeneratedByOlive,
boolean isOliveLive)
throws IOException, InterruptedException {
request.setAttempt(attempt);
request.setMode(submissionPolicy.mode(lastGeneratedByOlive));
request.setMode(submissionPolicy.mode(lastGeneratedByOlive, isOliveLive));
final var response =
VidarrPlugin.CLIENT.send(
HttpRequest.newBuilder(vidarrUrl.resolve("/api/submit"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public PerformResult perform(
URI vidarrUrl,
SubmitWorkflowRequest request,
SubmissionPolicy submissionPolicy,
Duration lastGeneratedByOlive) {
Duration lastGeneratedByOlive,
boolean isOliveLive) {
return new PerformResult(errors, ActionState.HALP, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public PerformResult perform(
URI vidarrUrl,
SubmitWorkflowRequest request,
SubmissionPolicy submissionPolicy,
Duration lastGeneratedByOlive) {
Duration lastGeneratedByOlive,
boolean isOliveLive) {
return new PerformResult(List.of(), ActionState.ZOMBIE, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public PerformResult perform(
URI vidarrUrl,
SubmitWorkflowRequest request,
SubmissionPolicy submissionPolicy,
Duration lastGeneratedByOlive) {
Duration lastGeneratedByOlive,
boolean isOliveLive) {
return new PerformResult(errors, ActionState.FAILED, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public PerformResult perform(
URI vidarrUrl,
SubmitWorkflowRequest request,
SubmissionPolicy submissionPolicy,
Duration lastGeneratedByOlive)
Duration lastGeneratedByOlive,
boolean isOliveLive)
throws IOException, InterruptedException {
return create(vidarrUrl, status.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

/** Determines when a workflow run should be actually submitted */
public interface SubmissionPolicy {
static SubmissionPolicy ALWAYS = lastGeneratedByAnOlive -> SubmitMode.RUN;
static SubmissionPolicy DRY_RUN = lastGeneratedByAnOlive -> SubmitMode.DRY_RUN;
static SubmissionPolicy ALWAYS = (lastGeneratedByAnOlive, isOliveLive) -> SubmitMode.RUN;
static SubmissionPolicy DRY_RUN = (lastGeneratedByAnOlive, isOliveLive) -> SubmitMode.DRY_RUN;
static SubmissionPolicy IS_LIVE =
(lastGeneratedByAnOlive, isOliveLive) -> isOliveLive ? SubmitMode.RUN : SubmitMode.DRY_RUN;

static SubmissionPolicy maxDelay(long maximum) {
return lastGeneratedByAnOlive ->
return (lastGeneratedByAnOlive, isOliveLive) ->
lastGeneratedByAnOlive.getSeconds() > maximum ? SubmitMode.DRY_RUN : SubmitMode.RUN;
}

SubmitMode mode(Duration lastGeneratedByAnOlive);
SubmitMode mode(Duration lastGeneratedByAnOlive, boolean isOliveLive);
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ public int hashCode() {
}

@Override
public synchronized ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public synchronized ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
if (stale) {
return ActionState.ZOMBIE;
}
Expand All @@ -218,7 +219,8 @@ public synchronized ActionState perform(ActionServices services, Duration lastGe
.map(
url -> {
try {
return state.perform(url, request, submissionPolicy, lastGeneratedByOlive);
return state.perform(
url, request, submissionPolicy, lastGeneratedByOlive, isOliveLive);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
return new RunState.PerformResult(
Expand Down Expand Up @@ -279,12 +281,13 @@ public void services(Set<String> services) {
@ActionParameter(
name = "submission_policy",
required = false,
type = "u3ALWAYS$t0DRY_RUN$t0MAX_DELAY$t1i")
type = "u3ALWAYS$t0DRY_RUN$t0IS_LIVE$t0MAX_DELAY$t1i")
public void submissionPolicy(AlgebraicValue policy) {
submissionPolicy =
switch (policy.name()) {
case "ALWAYS" -> SubmissionPolicy.ALWAYS;
case "DRY_RUN" -> SubmissionPolicy.DRY_RUN;
case "IS_LIVE" -> SubmissionPolicy.IS_LIVE;
case "MAX_DELAY" -> SubmissionPolicy.maxDelay((Long) policy.get(0));
default -> throw new IllegalStateException("Unexpected value: " + policy.name());
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
* times, it should be de-duplicated using {@link #equals(Object)} by the set. It will then attempt
* to complete each action and track the success of performing an action.
*
* <p>Creating this action should not perform it until {{@link #perform(ActionServices, Duration)}
* is called. If the action is needs to be in contact with a remote system, that information must be
* baked into its constructor.
* <p>Creating this action should not perform it until {{@link #perform(ActionServices, Duration,
* boolean)} is called. If the action is needs to be in contact with a remote system, that
* information must be baked into its constructor.
*/
public abstract class Action {
private final String type;
Expand All @@ -41,7 +41,7 @@ public Action(String type) {
* <p>Since the scheduler will deduplicate the same action many times, most actions are fated to
* die quickly. This method will be called to indicate that this instance of the object will
* actually be used. It is called after {@link #prepare()} and before {@link
* #perform(ActionServices, Duration)}.
* #perform(ActionServices, Duration, boolean)}.
*
* @param actionId the action ID recorded for this action
*/
Expand Down Expand Up @@ -87,16 +87,26 @@ public Optional<Instant> externalTimestamp() {
/**
* Attempt to complete this action.
*
* <p>This will be called multiple times until an action returns {@link ActionState#SUCCEEDED}.
* Because Shesmu is stateless, the action must determine if an equivalent action has already been
* performed. This object may be recreated since the launch, so the action cannot expect to hold
* permanent state (e.g., job id) as a field.
* <p>This will be called multiple times until an action returns {@link ActionState#SUCCEEDED} or
* {@link ActionState#ZOMBIE}. Because Shesmu is stateless, the action must determine if an
* equivalent action has already been performed. This object may be recreated since the launch, so
* the action cannot expect to hold permanent state (e.g., job id) as a field.
*
* @param services an abstraction to allow action to query information about the server state
* @param lastGeneratedByOlive the delay between now and the last time the action was generated by
* an olive
* @param isOliveLive true if the action is connected to an olive or <code>actnow</code> file
* @return the state of the action; if {@link ActionState#SUCCEEDED} or {@link
* ActionState#ZOMBIE}, the action will not be polled again
*/
public abstract ActionState perform(ActionServices services, Duration lastGeneratedByOlive);
public abstract ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive);

/**
* The amount of time the {@link #perform(ActionServices, Duration)} method should be allowed to
* run for before being interrupted
* The amount of time the {@link #perform(ActionServices, Duration, boolean)} method should be
* allowed to run for before being interrupted
*
* @return the maximum time for the watchdog timer
*/
public Duration performTimeout() {
return Duration.of(1, ChronoUnit.HOURS);
Expand All @@ -122,9 +132,11 @@ public void purgeCleanup() {}
* The number of minutes to wait before attempting to retry this action.
*
* <p>If an action has to be re-attempted, the action processor can wait until a certain window
* expires before it will call {@link #perform(ActionServices, Duration)} again. This only sets a
* lower limit on how frequently an action can be retried; there is no upper limit. This method
* should return a constant.
* expires before it will call {@link #perform(ActionServices, Duration, boolean)} again. This
* only sets a lower limit on how frequently an action can be retried; there is no upper limit.
* This method should return a constant.
*
* @return the preferred polling interval
*/
public abstract long retryMinutes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3026,7 +3026,7 @@ public void start() {
compiler.start(fileWatcher);
staticActions.start(fileWatcher);
System.out.println("Starting action processor...");
processor.start(executor);
processor.start(executor, compiler);
System.out.println("Starting scheduler...");
master.start(executor);
pluginManager.log("Shesmu started.", Map.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public int hashCode() {
}

@Override
public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
return ActionState.ZOMBIE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public ObjectNode parameters() {
}

@Override
public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
return ActionState.ZOMBIE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
/**
* Background process for launching actions and reporting the results
*
* <p>This class collects actions and tries to {@link Action#perform(ActionServices, Duration)}
* until successful.
* <p>This class collects actions and tries to {@link Action#perform(ActionServices, Duration,
* boolean)} until successful.
*/
public final class ActionProcessor
implements OliveServices, InputSource, MetroDiagram.OliveFlowReader {
Expand Down Expand Up @@ -1579,8 +1579,8 @@ public Stream<SourceLocation> sources() {
}

/** Begin the action processor */
public void start(ScheduledExecutorService executor) {
executor.scheduleWithFixedDelay(this::update, 5, 1, TimeUnit.MINUTES);
public void start(ScheduledExecutorService executor, Predicate<SourceLocation> isOliveLive) {
executor.scheduleWithFixedDelay(() -> this.update(isOliveLive), 5, 1, TimeUnit.MINUTES);
executor.scheduleWithFixedDelay(this::updateAlerts, 5, 5, TimeUnit.MINUTES);
}

Expand Down Expand Up @@ -1678,7 +1678,8 @@ public Stream<String> tags(Filter... filters) {
.flatMap(entry -> Stream.concat(entry.getValue().tags.stream(), entry.getKey().tags()));
}

private void update() {
private void update(Predicate<SourceLocation> isOliveLive) {

final var now = Instant.now();
final var candidates =
actions.entrySet().stream()
Expand All @@ -1689,7 +1690,7 @@ private void update() {
&& !entry.getValue().updateInProgress
&& Duration.between(entry.getValue().lastChecked, now).toMinutes()
>= Math.max(10, entry.getKey().retryMinutes()))
/**
/*
* Sort by time since last checked, and then priority, to avoid starving actions of
* attention because of priority, then sort by their ActionState's processPriority so
* that certain ActionStates get checked first.
Expand Down Expand Up @@ -1754,7 +1755,8 @@ private void update() {
.getKey()
.perform(
actionServices,
Duration.between(now, entry.getValue().lastAdded));
Duration.between(now, entry.getValue().lastAdded),
entry.getValue().locations.stream().anyMatch(isOliveLive));
entry.getValue().thrown = null;
} catch (final Throwable e) {
entry.getValue().lastState = ActionState.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public int hashCode() {
}

@Override
public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
return ActionState.SUCCEEDED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private static class ActionChecker implements OliveServices {
@Override
public boolean accept(
Action action, String filename, int line, int column, String hash, String[] tags) {
if (action.perform(null, Duration.ZERO) == ActionState.SUCCEEDED) {
if (action.perform(null, Duration.ZERO, true) == ActionState.SUCCEEDED) {
good++;
} else {
bad++;
Expand Down Expand Up @@ -184,7 +184,8 @@ public int hashCode() {
}

@Override
public ActionState perform(ActionServices services, Duration lastGeneratedByOlive) {
public ActionState perform(
ActionServices services, Duration lastGeneratedByOlive, boolean isOliveLive) {
return ok ? ActionState.SUCCEEDED : ActionState.FAILED;
}

Expand Down

0 comments on commit 0fe022d

Please sign in to comment.