Skip to content

Commit

Permalink
Handel abort on all job states.
Browse files Browse the repository at this point in the history
Signed-off-by: Maximilian Chrzan <[email protected]>
Signed-off-by: mchrza <[email protected]>
  • Loading branch information
mchrza committed Nov 20, 2023
1 parent 81eb0f0 commit 00077af
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,7 @@ private Future<Boolean> readEnableHashedSpaceId() {
}

public Future<Job> executeAbort() {
try {
isValidForAbort();
}
catch (HttpException e) {
return Future.failedFuture(e);
}
return Future.succeededFuture(this);
return updateJobStatus(this, aborted);
}

public Future<Job> prepareStart() {
Expand All @@ -238,12 +232,6 @@ public Future<Job> prepareStart() {

public abstract Future<Job> executeStart();

public Future<Void> abortIfPossible() {
//Target: we want to terminate all running sqlQueries
return executeAbort()
.compose(job -> Future.succeededFuture(), t -> Future.succeededFuture()); //Job is not in state "executing" ignore
}

public Future<Job> executeRetry() {
try {
isValidForRetry();
Expand Down Expand Up @@ -351,18 +339,6 @@ protected Future<Job> isValidForStart() {
return Future.succeededFuture(this);
}

@JsonIgnore
public void isValidForAbort() throws HttpException {
/*
It is only allowed to abort a job inside executing state, because we have multiple nodes running.
During the execution we have running SQL-Statements - due to the abortion of them, the client which
has executed the Query will handle the abortion.
*/
//TODO: Allow abortion also in other states (e.g. queued), because it could be the user started a job accidentally and wants to stop & recreate
if (getStatus() != executing && getStatus() != finalizing)
throw new HttpException(PRECONDITION_FAILED, "Invalid state: [" + getStatus() + "] for abort!");
}

@JsonIgnore
protected void isValidForRetry() throws HttpException {
if (!getStatus().equals(Status.failed) && !getStatus().equals(Status.aborted))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.here.xyz.httpconnector.util.jobs.Job.Status.prepared;

import com.here.xyz.httpconnector.CService;
import com.here.xyz.httpconnector.config.JDBCClients;
import com.here.xyz.httpconnector.util.jobs.CombinedJob;
import com.here.xyz.httpconnector.util.jobs.Export;
import com.here.xyz.httpconnector.util.jobs.Job;
Expand Down Expand Up @@ -59,6 +60,11 @@ protected void process() throws InterruptedException, CannotDecodeException {
all stages can end up in failed
*/
switch (currentJob.getStatus()) {
case aborted:
//Abort has happened on other node
removeJob(job);
JDBCClients.abortJobsByJobId(job);
break;
case waiting:
updateJobStatus(currentJob, Job.Status.queued);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.here.xyz.httpconnector.util.scheduler;

import com.here.xyz.httpconnector.CService;
import com.here.xyz.httpconnector.config.JDBCClients;
import com.here.xyz.httpconnector.config.JDBCImporter;
import com.here.xyz.httpconnector.util.jobs.Import;
import com.here.xyz.httpconnector.util.jobs.Job;
Expand Down Expand Up @@ -52,6 +53,11 @@ protected void process() throws InterruptedException, CannotDecodeException {
**/

switch (currentJob.getStatus()){
case aborted:
//Abort has happened on other node
removeJob(job);
JDBCClients.abortJobsByJobId(job);
break;
case waiting:
updateJobStatus(currentJob,Job.Status.validating)
.compose(j -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static Future<Job> updateJobStatus(Job j, Job.Status status) {
}

protected static Future<Job> updateJobStatus(Job job, Job.Status status, String errorDescription, String errorType) {
if(hasJob(job) == null) {
if(hasJob(job) == null && status != aborted) {
logger.warn("[{}] Job is already removed from queue, dont update status {}!", job.getId(), job.getStatus());
return Future.failedFuture("Job " + job.getId() + " is not in queue anymore!");
}
Expand Down

0 comments on commit 00077af

Please sign in to comment.