Skip to content

Commit

Permalink
Some fixes related to InputsFromS3 job inputs
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Rögner <[email protected]>
  • Loading branch information
roegi committed Aug 5, 2024
1 parent 109b6f6 commit 741cd42
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private static void startRealJob(String spaceId) throws IOException, Interrupted
.withSource(new Files<>().withInputSettings(new FileInputSettings().withFormat(new GeoJson().withEntityPerLine(Feature))))
.withTarget(new DatasetDescription.Space<>().withId(spaceId));

System.out.println("Starting job ...");
System.out.println("Creating job ...");
HttpResponse<byte[]> jobResponse = post("/jobs", job);

System.out.println("Got response:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package com.here.xyz.jobs.steps.impl.imp;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.S3Object;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.here.xyz.XyzSerializable;
Expand All @@ -30,11 +29,9 @@
import com.here.xyz.models.geojson.implementation.Feature;
import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import org.json.JSONException;
Expand All @@ -47,11 +44,11 @@ public class ImportFilesQuickValidator {
private static final int VALIDATE_LINE_MAX_LINE_SIZE_BYTES = 4 * 1024 * 1024;

static void validate(UploadUrl uploadUrl, Format format) throws ValidationException {
validate(uploadUrl.getS3Key(), format, uploadUrl.isCompressed());
validate(uploadUrl.getS3Bucket(), uploadUrl.getS3Key(), format, uploadUrl.isCompressed());
}

static void validate(String s3Key, Format format, boolean isCompressed) throws ValidationException {
S3Client client = S3Client.getInstance();
static void validate(String s3Bucket, String s3Key, Format format, boolean isCompressed) throws ValidationException {
S3Client client = S3Client.getInstance(s3Bucket);
try {
if (isCompressed)
validateFirstCSVLine(client, s3Key, format, "", 0, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@

@JsonSubTypes({
@JsonSubTypes.Type(value = UploadUrl.class, name = "UploadUrl"),
@JsonSubTypes.Type(value = InputsFromJob.class, name = "InputsFromJob")
@JsonSubTypes.Type(value = InputsFromJob.class, name = "InputsFromJob"),
@JsonSubTypes.Type(value = InputsFromS3.class, name = "InputsFromS3")
})
public abstract class Input <T extends Input> implements Typed {
private static final Logger logger = LogManager.getLogger();
Expand Down Expand Up @@ -102,43 +103,44 @@ public static List<Input> loadInputs(String jobId) {
if (submittedJobs.contains(jobId)) {
List<Input> inputs = inputsCache.get(jobId);
if (inputs == null) {
inputs = loadInputsAndWriteMetadata(jobId);
inputs = loadInputsAndWriteMetadata(jobId, -1, Input.class);
inputsCache.put(jobId, inputs);
}
return inputs;
}
return loadInputsAndWriteMetadata(jobId);
return loadInputsAndWriteMetadata(jobId, -1, Input.class);
}

private static AmazonS3URI toS3Uri(String s3Uri) {
return new AmazonS3URI(s3Uri);
}

private static List<Input> loadInputsAndWriteMetadata(String jobId) {
private static <T extends Input> List<T> loadInputsAndWriteMetadata(String jobId, int maxReturnSize, Class<T> inputType) {
try {
InputsMetadata metadata = loadMetadata(jobId);
return metadata.inputs.entrySet().stream()
Stream<T> inputs = metadata.inputs.entrySet().stream()
.map(metaEntry -> {
final String metaKey = metaEntry.getKey();
String s3Bucket = null;
String s3Key;
if (metaKey.startsWith("s3://")) {
AmazonS3URI s3Uri = toS3Uri(metaKey);
s3Bucket = s3Uri.getBucket();
s3Key = "/" + s3Uri.getKey();
s3Key = s3Uri.getKey();
}
else
s3Key = metaKey;
return createInput(s3Bucket, s3Key, metaEntry.getValue().byteSize, metaEntry.getValue().compressed);
})
.toList();
return (T) createInput(s3Bucket, s3Key, metaEntry.getValue().byteSize, metaEntry.getValue().compressed);
});

return (maxReturnSize > 0 ? inputs.unordered().limit(maxReturnSize) : inputs).toList();
}
catch (IOException | AmazonS3Exception ignore) {}

final List<Input> inputs = loadInputsInParallel(defaultBucket(), Input.inputS3Prefix(jobId));
final List<T> inputs = loadInputsInParallel(defaultBucket(), Input.inputS3Prefix(jobId), maxReturnSize, inputType);
//Only write metadata of jobs which are submitted already
if (inputs != null && submittedJobs.contains(jobId))
storeMetadata(jobId, inputs);
storeMetadata(jobId, (List<Input>) inputs);

return inputs;
}
Expand Down Expand Up @@ -166,19 +168,26 @@ private static void storeMetadata(String jobId, List<Input> inputs) {
static final void storeMetadata(String jobId, List<Input> inputs, String referencedJobId) {
logger.info("Storing inputs metadata for job {} ...", jobId);
Map<String, InputMetadata> metadata = inputs.stream()
.collect(Collectors.toMap(input -> (input.s3Bucket == null ? "" : "s3://" + input.s3Bucket) + input.s3Key,
.collect(Collectors.toMap(input -> (input.s3Bucket == null ? "" : "s3://" + input.s3Bucket + "/") + input.s3Key,
input -> new InputMetadata(input.byteSize, input.compressed)));
storeMetadata(jobId, new InputsMetadata(metadata, Set.of(jobId), referencedJobId));
}

static final List<Input> loadInputsInParallel(String bucketName, String inputS3Prefix) {
return loadInputsInParallel(bucketName, inputS3Prefix, -1, Input.class);
}

static final <T extends Input> List<T> loadInputsInParallel(String bucketName, String inputS3Prefix, int maxReturnSize, Class<T> inputType) {
logger.info("Scanning inputs from bucket {} and prefix {} ...", bucketName, inputS3Prefix);
ForkJoinPool tmpPool = new ForkJoinPool(10);
List<Input> inputs = null;
List<T> inputs = null;
try {
inputs = tmpPool.submit(() -> loadAndTransformInputs(bucketName, inputS3Prefix, -1, Input.class)).get();
inputs = tmpPool.submit(() -> loadAndTransformInputs(bucketName, inputS3Prefix, maxReturnSize, inputType)).get();
}
catch (InterruptedException ignore) {}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
catch (InterruptedException | ExecutionException ignore) {}
finally {
tmpPool.shutdown();
}
Expand All @@ -190,7 +199,7 @@ public static int currentInputsCount(String jobId, Class<? extends Input> inputT
}

public static <T extends Input> List<T> loadInputsSample(String jobId, int maxSampleSize, Class<T> inputType) {
return loadAndTransformInputs(defaultBucket(), Input.inputS3Prefix(jobId), maxSampleSize, inputType);
return loadInputsAndWriteMetadata(jobId, maxSampleSize, inputType);
}

private static <T extends Input> List<T> loadAndTransformInputs(String bucketName, String inputS3Prefix, int maxReturnSize, Class<T> inputType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ public String getPrefix() {
}

public void setPrefix(String prefix) {
if (prefix.startsWith("/"))
prefix = prefix.substring(1);
this.prefix = prefix;
}

public InputsFromS3 withBucketArn(String bucketArn) {
setBucketArn(bucketArn);
public InputsFromS3 withPrefix(String prefix) {
setPrefix(prefix);
return this;
}

Expand All @@ -46,8 +48,8 @@ public void setBucketArn(String bucketArn) {
this.bucketArn = bucketArn;
}

public InputsFromS3 withPrefix(String prefix) {
setPrefix(prefix);
public InputsFromS3 withBucketArn(String bucketArn) {
setBucketArn(bucketArn);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import static org.junit.Assert.assertEquals;

import com.here.xyz.jobs.steps.Config;
import com.here.xyz.jobs.steps.TestSteps;
import com.here.xyz.jobs.steps.impl.imp.ImportFilesToSpace.Format;
import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException;

import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -35,9 +35,15 @@ public class QuickValidatorTest extends TestSteps {
public void cleanUp() {
cleanS3Files(TEST_PREFIX);
}

private String generateTestS3Key(String name){
return TEST_PREFIX + name;
}

private void validate(String s3Key, Format format, boolean isCompressed) throws ValidationException {
ImportFilesQuickValidator.validate(Config.instance.JOBS_S3_BUCKET, s3Key, format, isCompressed);
}

private void uploadAndValidateValidFiles(boolean gzip) throws IOException, ValidationException {
/** With no new line at end */
uploadFileToS3(generateTestS3Key("test_valid_1_jsonwkb.csv"),
Expand Down Expand Up @@ -76,12 +82,12 @@ private void uploadAndValidateValidFiles(boolean gzip) throws IOException, Valid
);

/** Should not fail - above are all valid */
ImportFilesQuickValidator.validate(generateTestS3Key("test_valid_1_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
ImportFilesQuickValidator.validate(generateTestS3Key("test_valid_1_geojson.csv"), Format.CSV_GEOJSON, gzip);
ImportFilesQuickValidator.validate(generateTestS3Key("test_valid_1_geojson.txt"), Format.GEOJSON, gzip);
ImportFilesQuickValidator.validate(generateTestS3Key("test_valid_2_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
ImportFilesQuickValidator.validate(generateTestS3Key("test_valid_2_geojson.csv"), Format.CSV_GEOJSON, gzip);
ImportFilesQuickValidator.validate(generateTestS3Key("test_valid_2_geojson.txt"), Format.GEOJSON, gzip);
validate(generateTestS3Key("test_valid_1_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
validate(generateTestS3Key("test_valid_1_geojson.csv"), Format.CSV_GEOJSON, gzip);
validate(generateTestS3Key("test_valid_1_geojson.txt"), Format.GEOJSON, gzip);
validate(generateTestS3Key("test_valid_2_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
validate(generateTestS3Key("test_valid_2_geojson.csv"), Format.CSV_GEOJSON, gzip);
validate(generateTestS3Key("test_valid_2_geojson.txt"), Format.GEOJSON, gzip);
}

private void uploadAndValidateFilesWithInvalidJson(boolean gzip) throws IOException {
Expand All @@ -105,17 +111,17 @@ private void uploadAndValidateFilesWithInvalidJson(boolean gzip) throws IOExcept
);

try{
ImportFilesQuickValidator.validate(generateTestS3Key("test_invalid_1_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
validate(generateTestS3Key("test_invalid_1_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
}catch (ValidationException e){
checkValidationException(e, "Bad JSON encoding! ");
}
try{
ImportFilesQuickValidator.validate(generateTestS3Key("test_invalid_1_geojson.csv"), Format.CSV_GEOJSON, gzip);
validate(generateTestS3Key("test_invalid_1_geojson.csv"), Format.CSV_GEOJSON, gzip);
}catch (ValidationException e){
checkValidationException(e, "Bad JSON encoding! ");
}
try{
ImportFilesQuickValidator.validate(generateTestS3Key("test_invalid_1_geojson.txt"), Format.GEOJSON, gzip);
validate(generateTestS3Key("test_invalid_1_geojson.txt"), Format.GEOJSON, gzip);
}catch (ValidationException e){
checkValidationException(e, "Bad JSON encoding! ");
}
Expand All @@ -132,13 +138,13 @@ private void uploadAndValidateFilesWithInvalidWKB(boolean gzip) throws IOExcepti
"\"{'\"properties'\": {'\"test'\": 1}}\",invalid".getBytes(),
gzip);
try{
ImportFilesQuickValidator.validate(generateTestS3Key("test_invalid_1_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
validate(generateTestS3Key("test_invalid_1_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
}catch (ValidationException e){
checkValidationException(e, "Bad WKB encoding! ");
}

try{
ImportFilesQuickValidator.validate(generateTestS3Key("test_invalid_2_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
validate(generateTestS3Key("test_invalid_2_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
}catch (ValidationException e){
checkValidationException(e, "Bad WKB encoding! ");
}
Expand All @@ -159,13 +165,13 @@ private void uploadAndValidateFilesWithEmptyColumn(boolean gzip) throws IOExcept
);

try{
ImportFilesQuickValidator.validate(generateTestS3Key("test_invalid_3_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
validate(generateTestS3Key("test_invalid_3_jsonwkb.csv"), Format.CSV_JSONWKB, gzip);
}catch (ValidationException e){
checkValidationException(e, "Empty Column detected!");
}

try{
ImportFilesQuickValidator.validate(generateTestS3Key("test_invalid_3_geojson.csv"), Format.CSV_GEOJSON, gzip);
validate(generateTestS3Key("test_invalid_3_geojson.csv"), Format.CSV_GEOJSON, gzip);
}catch (ValidationException e){
checkValidationException(e, "Empty Column detected!");
}
Expand Down

0 comments on commit 741cd42

Please sign in to comment.