Skip to content

Commit

Permalink
Merge branch 'master' into checkPersistentExport.03
Browse files Browse the repository at this point in the history
Signed-off-by: qGYdXbY2 <[email protected]>
  • Loading branch information
qGYdXbY2 committed Aug 5, 2024
2 parents 0e16852 + 103b165 commit f62db6d
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<version>3.9.35-SNAPSHOT</version>
<version>3.9.36-SNAPSHOT</version>
<packaging>pom</packaging>

<modules>
Expand Down
2 changes: 1 addition & 1 deletion xyz-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../pom.xml</relativePath>
<version>3.9.35-SNAPSHOT</version>
<version>3.9.36-SNAPSHOT</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-hub-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../pom.xml</relativePath>
<version>3.9.35-SNAPSHOT</version>
<version>3.9.36-SNAPSHOT</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-hub-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../pom.xml</relativePath>
<version>3.9.35-SNAPSHOT</version>
<version>3.9.36-SNAPSHOT</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<version>3.9.35-SNAPSHOT</version>
<version>3.9.36-SNAPSHOT</version>
</parent>

<name>XYZ Job Framework</name>
Expand Down
2 changes: 1 addition & 1 deletion xyz-jobs/xyz-job-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.here.xyz</groupId>
<artifactId>xyz-jobs</artifactId>
<version>3.9.35-SNAPSHOT</version>
<version>3.9.36-SNAPSHOT</version>
</parent>

<name>XYZ Job Service</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.here.xyz.jobs.datasets.DatasetDescription;
import com.here.xyz.jobs.steps.inputs.Input;
import com.here.xyz.jobs.steps.inputs.InputsFromJob;
import com.here.xyz.jobs.steps.inputs.InputsFromS3;
import com.here.xyz.jobs.steps.inputs.ModelBasedInput;
import com.here.xyz.jobs.steps.inputs.UploadUrl;
import com.here.xyz.jobs.steps.outputs.Output;
Expand Down Expand Up @@ -112,6 +113,18 @@ protected void postJobInput(final RoutingContext context) throws HttpException {
.onSuccess(res -> sendResponse(context, CREATED.code(), res))
.onFailure(err -> sendErrorResponse(context, err));
}
else if (input instanceof InputsFromS3 s3Inputs) {
loadJob(context, jobId)
.compose(job -> job.getStatus().getState() == NOT_READY
? Future.succeededFuture(job)
: Future.failedFuture(new HttpException(BAD_REQUEST, "No inputs can be created after a job was submitted.")))
.compose(job -> {
s3Inputs.dereference(job.getId());
return Future.succeededFuture();
})
.onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null))
.onFailure(err -> sendErrorResponse(context, err));
}
else if (input instanceof ModelBasedInput modelBasedInput) {
loadJob(context, jobId)
.compose(job -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@
import com.here.xyz.util.Async;
import io.vertx.core.Future;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class AsyncS3Client extends S3Client {
private static final AsyncS3Client instance = new AsyncS3Client(Config.instance.JOBS_S3_BUCKET);
private static final Map<String, AsyncS3Client> instances = new ConcurrentHashMap<>();
private static final Async ASYNC = new Async(20, AsyncS3Client.class);

protected AsyncS3Client(String bucketName) {
super(bucketName);
}

public static AsyncS3Client getInstance() {
return instance;
return getInstance(Config.instance.JOBS_S3_BUCKET);
}

public static AsyncS3Client getInstance(String bucketName) {
if (!instances.containsKey(bucketName))
instances.put(bucketName, new AsyncS3Client(bucketName));
return instances.get(bucketName);
}

//NOTE: Only the long-blocking methods are added as async variants
Expand Down
2 changes: 1 addition & 1 deletion xyz-jobs/xyz-job-steps/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.here.xyz</groupId>
<artifactId>xyz-jobs</artifactId>
<version>3.9.35-SNAPSHOT</version>
<version>3.9.36-SNAPSHOT</version>
</parent>

<name>XYZ Job Steps</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,10 @@ public void execute() throws WebClientException, SQLException, TooManyResourcesC
}

public void _execute(boolean isResume) throws WebClientException, SQLException, TooManyResourcesClaimed {
logAndSetPhase(null, "Importing input files from s3://" + bucketName() + "/" + inputS3Prefix() + " in region " + bucketRegion()
+ " into space " + getSpaceId() + " ...");

logAndSetPhase(null, "Loading space config for space "+getSpaceId());
logAndSetPhase(null, "Importing input files for job " + getJobId() + " into space " + getSpaceId() + " ...");
logAndSetPhase(null, "Loading space config for space " + getSpaceId() + " ...");
Space space = loadSpace(getSpaceId());
logAndSetPhase(null, "Getting storage database for space "+getSpaceId());
logAndSetPhase(null, "Getting storage database for space " + getSpaceId() + " ...");
Database db = loadDatabase(space.getStorage().getId(), WRITER);

//TODO: Move resume logic into #resume()
Expand Down Expand Up @@ -251,9 +249,9 @@ private void createAndFillTemporaryTable(Database db) throws SQLException, TooMa

}catch (SQLException e){
//We expect that
if(e.getSQLState() != null && !e.getSQLState().equals("42P01")) {
if (e.getSQLState() != null && !e.getSQLState().equals("42P01"))
throw e;
}
//TODO: Should we really ignore all other SQLExceptions?
}

if(!tmpTableNotExistsAndHasNoData) {
Expand All @@ -264,7 +262,7 @@ private void createAndFillTemporaryTable(Database db) throws SQLException, TooMa
runWriteQuerySync(buildTemporaryTableForImportQuery(getSchema(db)), db, 0);

logAndSetPhase(Phase.FILL_TMP_TABLE);
fillTemporaryTableWithInputs(db, loadInputs(), bucketName(), bucketRegion());
fillTemporaryTableWithInputs(db, loadInputs(), bucketRegion());
}
}

Expand Down Expand Up @@ -397,10 +395,10 @@ private SQLQuery buildTemporaryTableForImportQuery(String schema) {
.withVariable("primaryKey", getTemporaryTableName() + "_primKey");
}

private void fillTemporaryTableWithInputs(Database db, List<Input> inputs, String bucketName, String bucketRegion) throws SQLException, TooManyResourcesClaimed {
private void fillTemporaryTableWithInputs(Database db, List<Input> inputs, String bucketRegion) throws SQLException, TooManyResourcesClaimed {
List<SQLQuery> queryList = new ArrayList<>();
for (Input input : inputs){
if(input instanceof UploadUrl uploadUrl) {
for (Input input : inputs) {
if (input instanceof UploadUrl uploadUrl) {
JsonObject data = new JsonObject()
.put("compressed", uploadUrl.isCompressed())
.put("filesize", uploadUrl.getByteSize());
Expand All @@ -414,7 +412,7 @@ private void fillTemporaryTableWithInputs(Database db, List<Input> inputs, Strin
.withVariable("schema", getSchema(db))
.withVariable("table", getTemporaryTableName())
.withNamedParameter("s3Key", input.getS3Key())
.withNamedParameter("bucketName", bucketName)
.withNamedParameter("bucketName", input.getS3Bucket())
.withNamedParameter("bucketRegion", bucketRegion)
.withNamedParameter("state", "SUBMITTED")
.withNamedParameter("data", data.toString())
Expand All @@ -431,9 +429,9 @@ private void fillTemporaryTableWithInputs(Database db, List<Input> inputs, Strin
.withVariable("schema", getSchema(db))
.withVariable("table", getTemporaryTableName())
.withNamedParameter("s3Key", "SUCCESS_MARKER")
.withNamedParameter("bucketName", bucketName)
.withNamedParameter("bucketName", "SUCCESS_MARKER")
.withNamedParameter("state", "SUCCESS_MARKER")
.withNamedParameter("bucketRegion", bucketRegion)
.withNamedParameter("bucketRegion", "SUCCESS_MARKER")
.withNamedParameter("data", "{}"));
runBatchWriteQuerySync(SQLQuery.batchOf(queryList), db, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package com.here.xyz.jobs.steps.inputs;

import com.amazonaws.services.s3.AmazonS3URI;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.here.xyz.Typed;
import com.here.xyz.XyzSerializable;
import com.here.xyz.jobs.steps.Config;
import com.here.xyz.jobs.util.S3Client;
import java.io.IOException;
import java.util.HashSet;
Expand All @@ -49,6 +51,8 @@ public abstract class Input <T extends Input> implements Typed {
protected long byteSize;
protected boolean compressed;
@JsonIgnore
private String s3Bucket;
@JsonIgnore
private String s3Key;
private static Map<String, List<Input>> inputsCache = new WeakHashMap<>();
private static Set<String> submittedJobs = new HashSet<>();
Expand All @@ -61,6 +65,25 @@ private static String inputMetaS3Key(String jobId) {
return jobId + "/meta/inputs.json";
}

private static String defaultBucket() {
return Config.instance.JOBS_S3_BUCKET;
}

public String getS3Bucket() {
if (s3Bucket == null)
return defaultBucket();
return s3Bucket;
}

public void setS3Bucket(String s3Bucket) {
this.s3Bucket = s3Bucket;
}

public T withS3Bucket(String s3Bucket) {
setS3Bucket(s3Bucket);
return (T) this;
}

public String getS3Key() {
return s3Key;
}
Expand All @@ -87,30 +110,46 @@ public static List<Input> loadInputs(String jobId) {
return loadInputsAndWriteMetadata(jobId);
}

private static AmazonS3URI toS3Uri(String s3Uri) {
return new AmazonS3URI(s3Uri);
}

private static List<Input> loadInputsAndWriteMetadata(String jobId) {
try {
InputsMetadata metadata = loadMetadata(jobId);
return metadata.inputs.entrySet().stream()
.map(metaEntry -> createInput(metaEntry.getKey(), metaEntry.getValue().byteSize, metaEntry.getValue().compressed))
.map(metaEntry -> {
final String metaKey = metaEntry.getKey();
String s3Bucket = null;
String s3Key;
if (metaKey.startsWith("s3://")) {
AmazonS3URI s3Uri = toS3Uri(metaKey);
s3Bucket = s3Uri.getBucket();
s3Key = "/" + s3Uri.getKey();
}
else
s3Key = metaKey;
return createInput(s3Bucket, s3Key, metaEntry.getValue().byteSize, metaEntry.getValue().compressed);
})
.toList();
}
catch (IOException | AmazonS3Exception ignore) {}

final List<Input> inputs = loadInputsInParallel(jobId);
final List<Input> inputs = loadInputsInParallel(defaultBucket(), Input.inputS3Prefix(jobId));
//Only write metadata of jobs which are submitted already
if (inputs != null && submittedJobs.contains(jobId))
storeMetadata(jobId, inputs);

return inputs;
}

static InputsMetadata loadMetadata(String jobId) throws IOException {
static final InputsMetadata loadMetadata(String jobId) throws IOException {
InputsMetadata metadata = XyzSerializable.deserialize(S3Client.getInstance().loadObjectContent(inputMetaS3Key(jobId)),
InputsMetadata.class);
return metadata;
}

static void storeMetadata(String jobId, InputsMetadata metadata) {
static final void storeMetadata(String jobId, InputsMetadata metadata) {
try {
S3Client.getInstance().putObject(inputMetaS3Key(jobId), "application/json", metadata.serialize());
}
Expand All @@ -124,19 +163,20 @@ private static void storeMetadata(String jobId, List<Input> inputs) {
storeMetadata(jobId, inputs, null);
}

static void storeMetadata(String jobId, List<Input> inputs, String referencedJobId) {
static final void storeMetadata(String jobId, List<Input> inputs, String referencedJobId) {
logger.info("Storing inputs metadata for job {} ...", jobId);
Map<String, InputMetadata> metadata = inputs.stream()
.collect(Collectors.toMap(input -> input.s3Key, input -> new InputMetadata(input.byteSize, input.compressed)));
.collect(Collectors.toMap(input -> (input.s3Bucket == null ? "" : "s3://" + input.s3Bucket) + input.s3Key,
input -> new InputMetadata(input.byteSize, input.compressed)));
storeMetadata(jobId, new InputsMetadata(metadata, Set.of(jobId), referencedJobId));
}

private static List<Input> loadInputsInParallel(String jobId) {
logger.info("Scanning inputs for job {} ...", jobId);
static final List<Input> loadInputsInParallel(String bucketName, String inputS3Prefix) {
logger.info("Scanning inputs from bucket {} and prefix {} ...", bucketName, inputS3Prefix);
ForkJoinPool tmpPool = new ForkJoinPool(10);
List<Input> inputs = null;
try {
inputs = tmpPool.submit(() -> loadAndTransformInputs(jobId, -1, Input.class)).get();
inputs = tmpPool.submit(() -> loadAndTransformInputs(bucketName, inputS3Prefix, -1, Input.class)).get();
}
catch (InterruptedException | ExecutionException ignore) {}
finally {
Expand All @@ -150,13 +190,14 @@ public static int currentInputsCount(String jobId, Class<? extends Input> inputT
}

public static <T extends Input> List<T> loadInputsSample(String jobId, int maxSampleSize, Class<T> inputType) {
return loadAndTransformInputs(jobId, maxSampleSize, inputType);
return loadAndTransformInputs(defaultBucket(), Input.inputS3Prefix(jobId), maxSampleSize, inputType);
}

private static <T extends Input> List<T> loadAndTransformInputs(String jobId, int maxReturnSize, Class<T> inputType) {
Stream<Input> inputsStream = S3Client.getInstance().scanFolder(Input.inputS3Prefix(jobId))
private static <T extends Input> List<T> loadAndTransformInputs(String bucketName, String inputS3Prefix, int maxReturnSize, Class<T> inputType) {
Stream<Input> inputsStream = S3Client.getInstance(bucketName).scanFolder(inputS3Prefix)
.parallelStream()
.map(s3ObjectSummary -> createInput(s3ObjectSummary.getKey(), s3ObjectSummary.getSize(), inputIsCompressed(s3ObjectSummary.getKey())))
.map(s3ObjectSummary -> createInput(defaultBucket().equals(bucketName) ? null : bucketName, s3ObjectSummary.getKey(),
s3ObjectSummary.getSize(), inputIsCompressed(s3ObjectSummary.getKey())))
.filter(input -> inputType.isAssignableFrom(input.getClass()));

if (maxReturnSize > 0)
Expand Down Expand Up @@ -199,9 +240,10 @@ else if (metadata != null)
storeMetadata(owningJobId, metadata);
}

private static Input createInput(String s3Key, long byteSize, boolean compressed) {
private static Input createInput(String s3Bucket, String s3Key, long byteSize, boolean compressed) {
//TODO: Support ModelBasedInputs
return new UploadUrl()
.withS3Bucket(s3Bucket)
.withS3Key(s3Key)
.withByteSize(byteSize)
.withCompressed(compressed);
Expand Down
Loading

0 comments on commit f62db6d

Please sign in to comment.