Skip to content

Commit

Permalink
add changes json_wkb, fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: qGYdXbY2 <[email protected]>
  • Loading branch information
qGYdXbY2 committed Oct 30, 2023
1 parent 790d084 commit 24401a6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.EXTENSION;
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.SUPER;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.PARTITIONID_FC_B64;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.JSON_WKB;

import com.here.xyz.events.ContextAwareEvent;
import com.here.xyz.events.GetFeaturesByGeometryEvent;
Expand Down Expand Up @@ -144,7 +145,7 @@ Is used for incremental exports (tiles) - here we have to export modified tiles.
});
default:
exportQuery = generateFilteredExportQuery(job.getId(), schema, job.getTargetSpaceId(), propertyFilter, spatialFilter,
job.getTargetVersion(), job.getParams(), job.getCsvFormat());
job.getTargetVersion(), job.getParams(), job.getCsvFormat(), compositeCalculation);
return calculateThreadCountForDownload(job, schema, exportQuery)
.compose(threads -> {
try {
Expand All @@ -153,7 +154,7 @@ Is used for incremental exports (tiles) - here we have to export modified tiles.

for (int i = 0; i < threads; i++) {
String s3Prefix = i + "_";
SQLQuery q2 = buildS3ExportQuery(job, schema, s3Bucket, s3Path, s3Prefix, s3Region,
SQLQuery q2 = buildS3ExportQuery(job, schema, s3Bucket, s3Path, s3Prefix, s3Region, compositeCalculation,
(threads > 1 ? new SQLQuery("AND i%% " + threads + " = " + i) : null));
exportFutures.add(exportTypeDownload(job.getTargetConnector(), q2, job, s3Path));
}
Expand Down Expand Up @@ -313,14 +314,14 @@ public static SQLQuery buildVMLCalculateQuery(Export job, String schema, SQLQuer

public static SQLQuery buildS3ExportQuery(Export j, String schema,
String s3Bucket, String s3Path, String s3FilePrefix, String s3Region,
SQLQuery customWhereCondition) throws SQLException {
boolean isForCompositeContentDetection, SQLQuery customWhereCondition) throws SQLException {

String propertyFilter = (j.getFilters() == null ? null : j.getFilters().getPropertyFilter());
Export.SpatialFilter spatialFilter= (j.getFilters() == null ? null : j.getFilters().getSpatialFilter());

s3Path = s3Path+ "/" +(s3FilePrefix == null ? "" : s3FilePrefix)+"export.csv";
SQLQuery exportSelectString = generateFilteredExportQuery(j.getId(), schema, j.getTargetSpaceId(), propertyFilter, spatialFilter,
j.getTargetVersion(), j.getParams(), j.getCsvFormat(), customWhereCondition, false,
j.getTargetVersion(), j.getParams(), j.getCsvFormat(), customWhereCondition, isForCompositeContentDetection,
j.getPartitionKey(), j.getOmitOnNull());

SQLQuery q = new SQLQuery("SELECT * /* s3_export_hint m499#jobId(" + j.getId() + ") */ from aws_s3.query_export_to_s3( "+
Expand Down Expand Up @@ -409,6 +410,12 @@ private static SQLQuery generateFilteredExportQuery(String jobId, String schema,
return generateFilteredExportQuery(jobId, schema, spaceId, propertyFilter, spatialFilter, targetVersion, params, csvFormat, null, false, null, false);
}

private static SQLQuery generateFilteredExportQuery(String jobId, String schema, String spaceId, String propertyFilter,
Export.SpatialFilter spatialFilter, String targetVersion, Map params, CSVFormat csvFormat, boolean isForCompositeContentDetection) throws SQLException {
return generateFilteredExportQuery(jobId, schema, spaceId, propertyFilter, spatialFilter, targetVersion, params, csvFormat, null, isForCompositeContentDetection, null, false);
}


private static SQLQuery generateFilteredExportQueryForCompositeTileCalculation(String jobId, String schema, String spaceId, String propertyFilter,
Export.SpatialFilter spatialFilter, String targetVersion, Map params, CSVFormat csvFormat) throws SQLException {
return generateFilteredExportQuery(jobId, schema, spaceId, propertyFilter, spatialFilter, targetVersion, params, csvFormat, null, true, null, false);
Expand Down Expand Up @@ -473,12 +480,13 @@ private static SQLQuery generateFilteredExportQuery(String jobId, String schema,
dbHandler.setConfig(new PSQLConfig(event, schema));

boolean partitionByPropertyValue = ( csvFormat == PARTITIONID_FC_B64 && partitionKey != null && !"id".equalsIgnoreCase(partitionKey)),
partitionByFeatureId = ( csvFormat == PARTITIONID_FC_B64 && !partitionByPropertyValue );
partitionByFeatureId = ( csvFormat == PARTITIONID_FC_B64 && !partitionByPropertyValue ),
downloadAsJsonWkb = ( csvFormat == JSON_WKB );

SpaceContext ctxStashed = event.getContext();

if (isForCompositeContentDetection)
event.setContext( partitionByFeatureId ? EXTENSION : COMPOSITE_EXTENSION);
event.setContext( (partitionByFeatureId || downloadAsJsonWkb) ? EXTENSION : COMPOSITE_EXTENSION);

SQLQuery sqlQuery,
sqlQueryContentByPropertyValue = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.here.xyz.httpconnector.util.jobs.Export.ExportTarget.Type.VML;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.PARTITIONID_FC_B64;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.TILEID_FC_B64;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.JSON_WKB;
import static com.here.xyz.httpconnector.util.jobs.Job.Status.*;
import static com.here.xyz.httpconnector.util.scheduler.JobQueue.*;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
Expand Down Expand Up @@ -252,10 +253,10 @@ private Export validateExport() throws HttpException {
}

if(!compositeMode.equals(CompositeMode.DEACTIVATED)) {
if (getExportTarget().getType() == DOWNLOAD)
if (getExportTarget().getType() == DOWNLOAD && getCsvFormat() != JSON_WKB)
throw new HttpException(HttpResponseStatus.BAD_REQUEST, "CompositeMode is not available for Type Download!");

if (getCsvFormat() != TILEID_FC_B64 && getCsvFormat() != PARTITIONID_FC_B64)
if (getCsvFormat() != TILEID_FC_B64 && getCsvFormat() != PARTITIONID_FC_B64 && getCsvFormat() != JSON_WKB)
throw new HttpException(BAD_REQUEST, "CompositeMode does not support the provided CSV format!");

if(ext == null) {
Expand Down Expand Up @@ -299,9 +300,12 @@ protected Future<Job> isValidForStart() {
CompositeMode compositeMode = readParamCompositeMode();

if (!compositeMode.equals(CompositeMode.DEACTIVATED)) {
if (getCsvFormat() != TILEID_FC_B64 && getCsvFormat() != PARTITIONID_FC_B64)
return Future.failedFuture(new HttpException(BAD_REQUEST, "CSV format is not supported for CompositeMode!"));
if (getExportTarget().getType() == DOWNLOAD)
switch( getCsvFormat() )
{ case TILEID_FC_B64 : case PARTITIONID_FC_B64 : case JSON_WKB : break;
default: return Future.failedFuture(new HttpException(BAD_REQUEST, "CSV format is not supported for CompositeMode!"));
}

if (getExportTarget().getType() == DOWNLOAD && getCsvFormat() != JSON_WKB )
return Future.failedFuture(new HttpException(HttpResponseStatus.BAD_REQUEST,
"CompositeMode Export is not available for Type Download!"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,6 @@ public void invalidConfig() throws Exception{
}
deleteAllJobsOnSpace(testSpaceId1Ext);

job = buildTestJob(testExportJobId, null, new Export.ExportTarget().withType(DOWNLOAD), JSON_WKB);
try {
/** Invalid Type - creation fails */
performExport(job, testSpaceId1, finalized, failed, Export.CompositeMode.CHANGES);
}catch (HttpException e){
assertEquals(BAD_REQUEST, e.status);
exceptionCnt++;
}

deleteAllJobsOnSpace(testSpaceId1Ext);

job = buildTestJob(testExportJobId, null, new Export.ExportTarget().withType(DOWNLOAD), GEOJSON);
try {
/** No extended layer - creation fails */
Expand All @@ -181,7 +170,7 @@ public void invalidConfig() throws Exception{
}

/** Check if we got the expected amount of failures */
assertEquals(3, exceptionCnt);
assertEquals(2, exceptionCnt);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,20 @@ public void testFullVMLCompositeL1ExportByTileChanges() throws Exception {
downloadAndCheckFC(urls, 1306, 3, mustContain, 3);
}

@Test
public void testFullVMLCompositeL1ExportJsonWkbChanges() throws Exception {
// export json_wkb only changes
Export.ExportTarget exportTarget = new Export.ExportTarget().withType(DOWNLOAD);

/** Create job */
Export job = buildTestJob(testExportJobId, null, exportTarget, Job.CSVFormat.JSON_WKB);
List<URL> urls = performExport(job, getScopedSpaceId(testSpaceId3Ext, scope), finalized, failed, Export.CompositeMode.CHANGES );

List<String> mustContain = Arrays.asList("id000", "id002", "movedFromEmpty", "deltaonly");

downloadAndCheck(urls, 635, 2, mustContain);
}


/** ------------------- only for local testing with big spaces -------------------- */
// @Test
Expand Down

0 comments on commit 24401a6

Please sign in to comment.