Skip to content

Commit

Permalink
Add support for re-using inputs from another job
Browse files Browse the repository at this point in the history
- Add new delegation input type "InputsFromJob" that can be POSTed to a job to indicate that the job re-uses the inputs of another job in the system (only possible if the user also owns the referenced job)
- If the inputs owned by a job are referenced by some other job, they will not be deleted if the owning job gets deleted. Instead, they will be deleted when no other job references them anymore.
- POSTing an InputsFromJob input to a job, it will be "dereferenced" immediately, so that subsequent GETs on /inputs directly return the inputs of the referenced jobs.

Signed-off-by: Benjamin Rögner <[email protected]>
  • Loading branch information
roegi committed Jul 29, 2024
1 parent 2905926 commit 360f355
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import com.here.xyz.jobs.steps.outputs.Output;
import com.here.xyz.jobs.steps.resources.ExecutionResource;
import com.here.xyz.jobs.steps.resources.Load;
import com.here.xyz.jobs.util.AsyncS3Client;
import com.here.xyz.util.Async;
import com.here.xyz.util.service.Core;
import io.vertx.core.Future;
Expand Down Expand Up @@ -439,7 +438,9 @@ public Future<Void> consumeInput(ModelBasedInput input) {
}

private Future<Void> deleteInputs() {
return AsyncS3Client.getInstance().deleteFolderAsync(inputS3Prefix(getId()));
//TODO: Asyncify!
Input.deleteInputs(getId());
return Future.succeededFuture();
}

public Future<List<Input>> loadInputs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

Expand All @@ -38,6 +39,7 @@
import com.here.xyz.jobs.RuntimeStatus;
import com.here.xyz.jobs.datasets.DatasetDescription;
import com.here.xyz.jobs.steps.inputs.Input;
import com.here.xyz.jobs.steps.inputs.InputsFromJob;
import com.here.xyz.jobs.steps.inputs.ModelBasedInput;
import com.here.xyz.jobs.steps.inputs.UploadUrl;
import com.here.xyz.jobs.steps.outputs.Output;
Expand All @@ -46,8 +48,10 @@
import io.vertx.core.Future;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.openapi.router.RouterBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.NotImplementedException;

public class JobApi extends Api {
Expand Down Expand Up @@ -124,6 +128,25 @@ else if (context.request().bytesRead() > 256 * 1024)
.onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null))
.onFailure(err -> sendErrorResponse(context, err));
}
else if (input instanceof InputsFromJob inputsReference) {
//NOTE: Both jobs have to be loaded to authorize the user for both
loadJob(context, jobId)
.compose(job -> loadJob(context, inputsReference.getJobId()).compose(referencedJob -> {
try {
if (!Objects.equals(referencedJob.getOwner(), job.getOwner()))
return Future.failedFuture(new HttpException(FORBIDDEN, "Inputs of job " + inputsReference.getJobId()
+ " can not be referenced by job " + job.getId() + " as it has a different owner."));

inputsReference.dereference(job.getId());
return Future.succeededFuture();
}
catch (IOException e) {
return Future.failedFuture(e);
}
}))
.onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null))
.onFailure(err -> sendErrorResponse(context, err));
}
else
throw new NotImplementedException("Input type " + input.getClass().getSimpleName() + " is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
import org.apache.logging.log4j.Logger;

@JsonSubTypes({
@JsonSubTypes.Type(value = UploadUrl.class, name = "UploadUrl")
@JsonSubTypes.Type(value = UploadUrl.class, name = "UploadUrl"),
@JsonSubTypes.Type(value = InputsFromJob.class, name = "InputsFromJob")
})
public abstract class Input <T extends Input> implements Typed {
private static final Logger logger = LogManager.getLogger();
Expand Down Expand Up @@ -88,29 +89,48 @@ public static List<Input> loadInputs(String jobId) {

private static List<Input> loadInputsAndWriteMetadata(String jobId) {
try {
InputsMetadata metadata = XyzSerializable.deserialize(S3Client.getInstance().loadObjectContent(inputMetaS3Key(jobId)),
InputsMetadata.class);
return metadata.inputs.entrySet().stream().map(metaEntry -> createInput(metaEntry.getKey(), metaEntry.getValue().byteSize, metaEntry.getValue().compressed)).toList();
InputsMetadata metadata = loadMetadata(jobId);
return metadata.inputs.entrySet().stream()
.map(metaEntry -> createInput(metaEntry.getKey(), metaEntry.getValue().byteSize, metaEntry.getValue().compressed))
.toList();
}
catch (IOException | AmazonS3Exception ignore) {}

final List<Input> inputs = loadInputsInParallel(jobId);
//Only write metadata of jobs which are submitted already
if (inputs != null && submittedJobs.contains(jobId)) {
logger.info("Storing inputs metadata for job {} ...", jobId);
Map<String, InputMetadata> metadata = inputs.stream()
.collect(Collectors.toMap(input -> input.s3Key, input -> new InputMetadata(input.byteSize, input.compressed)));
try {
S3Client.getInstance().putObject(inputMetaS3Key(jobId), "application/json", new InputsMetadata(metadata).serialize());
}
catch (IOException e) {
logger.error("Error writing inputs metadata file for job {}.", jobId, e);
//NOTE: Next call to this method will try it again
}
}
if (inputs != null && submittedJobs.contains(jobId))
storeMetadata(jobId, inputs);

return inputs;
}

static InputsMetadata loadMetadata(String jobId) throws IOException {
InputsMetadata metadata = XyzSerializable.deserialize(S3Client.getInstance().loadObjectContent(inputMetaS3Key(jobId)),
InputsMetadata.class);
return metadata;
}

static void storeMetadata(String jobId, InputsMetadata metadata) {
try {
S3Client.getInstance().putObject(inputMetaS3Key(jobId), "application/json", metadata.serialize());
}
catch (IOException e) {
logger.error("Error writing inputs metadata file for job {}.", jobId, e);
//NOTE: Next call to this method will try it again
}
}

private static void storeMetadata(String jobId, List<Input> inputs) {
storeMetadata(jobId, inputs, null);
}

static void storeMetadata(String jobId, List<Input> inputs, String referencedJobId) {
logger.info("Storing inputs metadata for job {} ...", jobId);
Map<String, InputMetadata> metadata = inputs.stream()
.collect(Collectors.toMap(input -> input.s3Key, input -> new InputMetadata(input.byteSize, input.compressed)));
storeMetadata(jobId, new InputsMetadata(metadata, Set.of(jobId), referencedJobId));
}

private static List<Input> loadInputsInParallel(String jobId) {
logger.info("Scanning inputs for job {} ...", jobId);
ForkJoinPool tmpPool = new ForkJoinPool(10);
Expand Down Expand Up @@ -152,12 +172,39 @@ public static ModelBasedInput resolveRawInput(Map<String, Object> rawInput) {
return XyzSerializable.fromMap(rawInput, ModelBasedInput.class);
}

private static Input createInput(String s3Key, long byteSize, boolean isCompressed) {
public static void deleteInputs(String jobId) {
deleteInputs(jobId, jobId);
}

private static void deleteInputs(String owningJobId, String referencingJob) {
InputsMetadata metadata = null;
try {
metadata = loadMetadata(owningJobId);
metadata.referencingJobs().remove(referencingJob);
}
catch (IOException ignore) {}

//Only delete the inputs if no other job is referencing them anymore
if (metadata == null || metadata.referencingJobs().isEmpty()) {
if (metadata != null && metadata.referencedJob() != null)
/*
The owning job referenced the inputs of another job, remove the owning job from the list of referencing jobs
and check whether the referenced inputs may be deleted now.
*/
deleteInputs(metadata.referencedJob(), owningJobId);

S3Client.getInstance().deleteFolder(inputS3Prefix(owningJobId));
}
else if (metadata != null)
storeMetadata(owningJobId, metadata);
}

private static Input createInput(String s3Key, long byteSize, boolean compressed) {
//TODO: Support ModelBasedInputs
return new UploadUrl()
.withS3Key(s3Key)
.withByteSize(byteSize)
.withCompressed(isCompressed);
.withCompressed(compressed);
}

private static boolean inputIsCompressed(String s3Key) {
Expand Down Expand Up @@ -199,5 +246,6 @@ public T withCompressed(boolean compressed) {
}

public record InputMetadata(@JsonProperty long byteSize, @JsonProperty boolean compressed) {}
public record InputsMetadata(@JsonProperty Map<String, InputMetadata> inputs) implements XyzSerializable {}
public record InputsMetadata(@JsonProperty Map<String, InputMetadata> inputs, @JsonProperty Set<String> referencingJobs,
@JsonProperty String referencedJob) implements XyzSerializable {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2017-2024 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

package com.here.xyz.jobs.steps.inputs;

import java.io.IOException;
import java.util.List;

public class InputsFromJob extends Input<InputsFromJob> {
private String jobId;

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public InputsFromJob withJobId(String jobId) {
setJobId(jobId);
return this;
}

/**
* De-references this delegator object by loading the inputs of the referenced job and writes the metadata
* for the referencing job.
*
* Additionally, this method adds bidirectional references between the metadata objects of the two involved jobs.
* That is necessary to prevent the deletion of the referenced inputs if they're still in use.
*
* @param referencingJobId The job that owns this delegator object
* @throws IOException when the metadata for the referenced job could not be updated
*/
public void dereference(String referencingJobId) throws IOException {
//First load the inputs of the other job to ensure the other job's metadata actually have been written
List<Input> inputs = Input.loadInputs(getJobId());
updateInputMetaReferences(referencingJobId);
//Store the metadata of the job that references the other job's metadata
storeMetadata(referencingJobId, inputs, getJobId());
}

private void updateInputMetaReferences(String referencingJobId) throws IOException {
InputsMetadata referencedMetadata = loadMetadata(getJobId());
//Add the referencing job to the list of jobs referencing the metadata
referencedMetadata.referencingJobs().add(referencingJobId);
storeMetadata(getJobId(), referencedMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;

@JsonTypeInfo(use = Id.NAME, property = "type")
public class ModelBasedInput extends Input<ModelBasedInput> {
public abstract class ModelBasedInput extends Input<ModelBasedInput> {

}

0 comments on commit 360f355

Please sign in to comment.