Skip to content

Commit

Permalink
Export: add csvFormat PARTITIONED_JSON_WKB - tiles, featureId, value (h…
Browse files Browse the repository at this point in the history
…eremaps#1087)

* PARTITIONED_JSON_WKB - tiles

Signed-off-by: qGYdXbY2 <[email protected]>

* PARTITIONED_JSON_WKB - featureId

Signed-off-by: qGYdXbY2 <[email protected]>

* PARTITIONED_JSON_WKB - property value

Signed-off-by: qGYdXbY2 <[email protected]>

* add missing qk_s_get_fc_of_tiles_txt_v5

Signed-off-by: qGYdXbY2 <[email protected]>

* fx exp2_build_sql_inhabited_txt

Signed-off-by: qGYdXbY2 <[email protected]>

---------

Signed-off-by: qGYdXbY2 <[email protected]>
  • Loading branch information
qGYdXbY2 authored Nov 6, 2023
1 parent 7a7af01 commit 33f4bc1
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.COMPOSITE_EXTENSION;
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.EXTENSION;
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.SUPER;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.TILEID_FC_B64;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.PARTITIONID_FC_B64;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.JSON_WKB;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.PARTITIONED_JSON_WKB;

import com.here.xyz.events.ContextAwareEvent;
import com.here.xyz.events.GetFeaturesByGeometryEvent;
Expand Down Expand Up @@ -69,11 +71,17 @@ public static Future<ExportStatistic> executeExport(Export job, String schema, S
String propertyFilter = (job.getFilters() == null ? null : job.getFilters().getPropertyFilter());
Export.SpatialFilter spatialFilter = (job.getFilters() == null ? null : job.getFilters().getSpatialFilter());
SQLQuery exportQuery;
boolean compositeCalculation = job.readParamCompositeMode() == Export.CompositeMode.CHANGES
|| job.readParamCompositeMode() == Export.CompositeMode.FULL_OPTIMIZED;
switch (job.getCsvFormat()) {
case PARTITIONID_FC_B64:
exportQuery = generateFilteredExportQuery(job.getId(), schema, job.getTargetSpaceId(), propertyFilter, spatialFilter,

boolean compositeCalculation = job.readParamCompositeMode() == Export.CompositeMode.CHANGES
|| job.readParamCompositeMode() == Export.CompositeMode.FULL_OPTIMIZED;

CSVFormat pseudoCsvFormat = ( job.getCsvFormat() != PARTITIONED_JSON_WKB
? job.getCsvFormat()
: ( job.getPartitionKey() == null || "tileid".equalsIgnoreCase(job.getPartitionKey()) ? TILEID_FC_B64 : PARTITIONID_FC_B64 )
);

switch ( pseudoCsvFormat ) {
case PARTITIONID_FC_B64: exportQuery = generateFilteredExportQuery(job.getId(), schema, job.getTargetSpaceId(), propertyFilter, spatialFilter,
job.getTargetVersion(), job.getParams(), job.getCsvFormat(), null,
compositeCalculation , job.getPartitionKey(), job.getOmitOnNull());
return calculateThreadCountForDownload(job, schema, exportQuery)
Expand Down Expand Up @@ -102,7 +110,9 @@ public static Future<ExportStatistic> executeExport(Export job, String schema, S
return Future.failedFuture(e);
}
});

case TILEID_FC_B64:

exportQuery = generateFilteredExportQuery(job.getId(), schema, job.getTargetSpaceId(), propertyFilter, spatialFilter,
job.getTargetVersion(), job.getParams(), job.getCsvFormat(), null,
compositeCalculation , job.getPartitionKey(), job.getOmitOnNull());
Expand All @@ -118,23 +128,28 @@ Is used for incremental exports (tiles) - here we have to export modified tiles.
return calculateTileListForVMLExport(job, schema, exportQuery, qkQuery)
.compose(tileList -> {
try {
Promise<Export.ExportStatistic> promise = Promise.promise();
List<Future> exportFutures = new ArrayList<>();
job.setProcessingList(tileList);
Promise<Export.ExportStatistic> promise = Promise.promise();
List<Future> exportFutures = new ArrayList<>();
job.setProcessingList(tileList);

for (int i = 0; i < tileList.size(); i++) {
for (int i = 0; i < tileList.size(); i++) {
/** Build export for each tile of the weighted tile list */
SQLQuery q2 = buildVMLExportQuery(job, schema, s3Bucket, s3Path, s3Region, tileList.get(i), qkQuery);
exportFutures.add(exportTypeVML(job.getTargetConnector(), q2, job, s3Path));

exportFutures.add( job.getCsvFormat() != PARTITIONED_JSON_WKB
? exportTypeVML(job.getTargetConnector(), q2, job, s3Path)
: exportTypeDownload(job.getTargetConnector(), q2, job, s3Path) );

}

return executeParallelExportAndCollectStatistics(job, promise, exportFutures);
}
catch (SQLException e) {
logger.warn("job[{}] ", job.getId(), e);
return Future.failedFuture(e);
}
});

return executeParallelExportAndCollectStatistics(job, promise, exportFutures);
}
catch (SQLException e) {
logger.warn("job[{}] ", job.getId(), e);
return Future.failedFuture(e);
}
});
default:
exportQuery = generateFilteredExportQuery(job.getId(), schema, job.getTargetSpaceId(), propertyFilter, spatialFilter,
job.getTargetVersion(), job.getParams(), job.getCsvFormat(), compositeCalculation);
Expand Down Expand Up @@ -368,20 +383,24 @@ public static SQLQuery buildVMLExportQuery(Export j, String schema, String s3Buc
j.getTargetVersion(), j.getParams(), j.getCsvFormat());

/** QkTileQuery gets used if we are exporting in compositeMode. In this case we need to also include empty tiles to our export. */
boolean includeEmpty = qkTileQry != null;
boolean includeEmpty = qkTileQry != null,
bPartJsWkb = ( j.getCsvFormat() == PARTITIONED_JSON_WKB );

SQLQuery q = new SQLQuery(
"select ("+
" aws_s3.query_export_to_s3( o.s3sql, " +
" #{s3Bucket}, " +
" format('%s/%s/%s-%s.csv',#{s3Path}::text, o.qk, o.bucket, o.nrbuckets) ," +
" #{s3Region}," +
" options := 'format csv,delimiter '','' ')).* " +
" options := 'format csv,delimiter '','', encoding ''UTF8'', quote ''\"'', escape '''''''' ')).* " +
" /* vml_export_hint m499#jobId(" + j.getId() + ") */ " +
" from" +
" exp_build_sql_inhabited_txt(true, #{parentQK}, #{targetLevel}, ${{exportSelectString}}, ${{qkTileQry}}, #{maxTilesPerFile}::int, true, #{isClipped}, #{includeEmpty}) o"
" ${{exp_build_fkt}}(true, #{parentQK}, #{targetLevel}, ${{exportSelectString}}, ${{qkTileQry}}, #{maxTilesPerFile}::int, ${{b64EncodeParam}} #{isClipped}, #{includeEmpty}) o"
);

q.setQueryFragment("exp_build_fkt", !bPartJsWkb ? "exp_build_sql_inhabited_txt" : "exp2_build_sql_inhabited_txt" );
q.setQueryFragment("b64EncodeParam", !bPartJsWkb ? "true," : "" );

q.setQueryFragment("exportSelectString", exportSelectString);
q.setQueryFragment("qkTileQry", qkTileQry == null ? new SQLQuery("null::text") : qkTileQry );

Expand Down Expand Up @@ -416,6 +435,8 @@ private static SQLQuery generateFilteredExportQueryForCompositeTileCalculation(S
private static SQLQuery generateFilteredExportQuery(String jobId, String schema, String spaceId, String propertyFilter,
Export.SpatialFilter spatialFilter, String targetVersion, Map params, CSVFormat csvFormat, SQLQuery customWhereCondition, boolean isForCompositeContentDetection, String partitionKey, Boolean omitOnNull )
throws SQLException {

csvFormat = (( csvFormat == PARTITIONED_JSON_WKB && ( partitionKey == null || "tileid".equalsIgnoreCase(partitionKey)) ) ? TILEID_FC_B64 : csvFormat );
//TODO: Re-use existing QR rather than the following duplicated code
SQLQuery geoFragment;

Expand Down Expand Up @@ -471,8 +492,8 @@ private static SQLQuery generateFilteredExportQuery(String jobId, String schema,
PSQLXyzConnector dbHandler = new PSQLXyzConnector(false);
dbHandler.setConfig(new PSQLConfig(event, schema));

boolean partitionByPropertyValue = ( csvFormat == PARTITIONID_FC_B64 && partitionKey != null && !"id".equalsIgnoreCase(partitionKey)),
partitionByFeatureId = ( csvFormat == PARTITIONID_FC_B64 && !partitionByPropertyValue ),
boolean partitionByPropertyValue = ((csvFormat == PARTITIONID_FC_B64 || csvFormat == PARTITIONED_JSON_WKB) && partitionKey != null && !"id".equalsIgnoreCase(partitionKey)),
partitionByFeatureId = ((csvFormat == PARTITIONID_FC_B64 || csvFormat == PARTITIONED_JSON_WKB) && !partitionByPropertyValue ),
downloadAsJsonWkb = ( csvFormat == JSON_WKB );

SpaceContext ctxStashed = event.getContext();
Expand Down Expand Up @@ -527,19 +548,31 @@ private static SQLQuery generateFilteredExportQuery(String jobId, String schema,
.substitute();
return queryToText(geoJson);
}

case PARTITIONID_FC_B64 :

case PARTITIONED_JSON_WKB :
case PARTITIONID_FC_B64 :
{
String partQry = isForCompositeContentDetection
? "select jsondata->>'id' as id, "
String partQry =
csvFormat == PARTITIONID_FC_B64
? ( isForCompositeContentDetection
? "select jsondata->>'id' as id, "
+ " case not coalesce((jsondata#>'{properties,@ns:com:here:xyz,deleted}')::boolean,false) "
+ " when true then replace( encode(convert_to(jsonb_build_object( 'type','FeatureCollection','features', jsonb_build_array( jsondata || jsonb_build_object( 'geometry', ST_AsGeoJSON(geo,8)::jsonb ) ) )::text,'UTF8'),'base64') ,chr(10),'') "
+ " else null::text "
+ " end as data "
+ "from ( ${{contentQuery}}) X"
: "select jsondata->>'id' as id, "
: "select jsondata->>'id' as id, "
+ " replace( encode(convert_to(jsonb_build_object( 'type','FeatureCollection','features', jsonb_build_array( jsondata || jsonb_build_object( 'geometry', ST_AsGeoJSON(geo,8)::jsonb ) ) )::text,'UTF8'),'base64') ,chr(10),'') as data "
+ "from ( ${{contentQuery}}) X" )
/* PARTITIONED_JSON_WKB */
: ( isForCompositeContentDetection
? "select jsondata->>'id' as id, "
+ " case not coalesce((jsondata#>'{properties,@ns:com:here:xyz,deleted}')::boolean,false) when true then jsondata else null::jsonb end as jsondata,"
+ " geo "
+ "from ( ${{contentQuery}}) X"
: "select jsondata->>'id' as id, jsondata, geo"
+ " replace( encode(convert_to(jsonb_build_object( 'type','FeatureCollection','features', jsonb_build_array( jsondata || jsonb_build_object( 'geometry', ST_AsGeoJSON(geo,8)::jsonb ) ) )::text,'UTF8'),'base64') ,chr(10),'') as data "
+ "from ( ${{contentQuery}}) X";
+ "from ( ${{contentQuery}}) X" );

if( partitionByPropertyValue )
{
Expand All @@ -560,11 +593,18 @@ private static SQLQuery generateFilteredExportQuery(String jobId, String schema,
+" iidata as "
+" ( select l.key, (( row_number() over ( partition by l.key ) )/ 20000000)::integer as chunk, r.jsondata, r.geo from ( ${{%2$s}} ) r right join plist l on ( coalesce( r.jsondata->'%1$s', '\"CSVNULL\"'::jsonb) = l.key ) "
+" ), "
+" iiidata as "
+" ( select coalesce( ('[]'::jsonb || key)->>0, 'CSVNULL' ) as id, (count(1) over ()) as nrbuckets, count(1) as nrfeatures, replace( encode(convert_to(('{\"type\":\"FeatureCollection\",\"features\":[' || coalesce( string_agg( (jsondata || jsonb_build_object('geometry',st_asgeojson(geo,8)::jsonb))::text, ',' ), null::text ) || ']}'),'UTF8'),'base64') ,chr(10),'') as data "
+" from iidata group by 1, chunk order by 1, 3 desc "
+" ) "
+" select id, data from iiidata ", partitionKey, sqlQueryContentByPropertyValue != null ? "contentQueryReal" : "contentQuery" );
+ ( csvFormat == PARTITIONID_FC_B64
? " iiidata as "
+" ( select coalesce( ('[]'::jsonb || key)->>0, 'CSVNULL' ) as id, (count(1) over ()) as nrbuckets, count(1) as nrfeatures, replace( encode(convert_to(('{\"type\":\"FeatureCollection\",\"features\":[' || coalesce( string_agg( (jsondata || jsonb_build_object('geometry',st_asgeojson(geo,8)::jsonb))::text, ',' ), null::text ) || ']}'),'UTF8'),'base64') ,chr(10),'') as data "
+" from iidata group by 1, chunk order by 1, 3 desc "
+" ) "
+" select id, data from iiidata "
: " iiidata as "
+" ( select coalesce( ('[]'::jsonb || key)->>0, 'CSVNULL' ) as id, jsondata, geo "
+" from iidata "
+" ) "
+" select id, jsondata, geo from iiidata "
), partitionKey, sqlQueryContentByPropertyValue != null ? "contentQueryReal" : "contentQuery" );
}

SQLQuery geoJson = new SQLQuery(partQry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.PARTITIONID_FC_B64;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.TILEID_FC_B64;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.JSON_WKB;
import static com.here.xyz.httpconnector.util.jobs.Job.CSVFormat.PARTITIONED_JSON_WKB;
import static com.here.xyz.httpconnector.util.jobs.Job.Status.executed;
import static com.here.xyz.httpconnector.util.jobs.Job.Status.failed;
import static com.here.xyz.httpconnector.util.jobs.Job.Status.finalized;
Expand Down Expand Up @@ -168,8 +169,8 @@ public Future<Export> setDefaults() {
//TODO: Do field initialization at instance initialization time
return super.setDefaults()
.compose(job -> {
if (getExportTarget() != null && getExportTarget().getType() == VML
&& (getCsvFormat() == null || getCsvFormat() != PARTITIONID_FC_B64)) {
if ( getExportTarget() != null && getExportTarget().getType() == VML
&& (getCsvFormat() == null || ( getCsvFormat() != PARTITIONID_FC_B64 && getCsvFormat() != PARTITIONED_JSON_WKB ))) {
setCsvFormat(TILEID_FC_B64);
if (getMaxTilesPerFile() == 0)
setMaxTilesPerFile(VML_EXPORT_MAX_TILES_PER_FILE);
Expand Down Expand Up @@ -208,16 +209,17 @@ private Export validateExport() throws HttpException {
switch (getCsvFormat()) {
case TILEID_FC_B64:
case PARTITIONID_FC_B64:
case PARTITIONED_JSON_WKB:
break;
default:
throw new HttpException(BAD_REQUEST, "Invalid Format! Allowed [" + TILEID_FC_B64 + ","
+ PARTITIONID_FC_B64 + "]");
+ PARTITIONID_FC_B64 + "," + PARTITIONED_JSON_WKB + "]");
}

if (getExportTarget().getTargetId() == null)
throw new HttpException(BAD_REQUEST,("Please specify the targetId!"));

if (!getCsvFormat().equals(PARTITIONID_FC_B64)) {
if ( getCsvFormat().equals(TILEID_FC_B64) || ( getCsvFormat().equals(PARTITIONED_JSON_WKB) && (getPartitionKey() == null || "tileid".equalsIgnoreCase(getPartitionKey())))) {
if (getTargetLevel() == null)
throw new HttpException(BAD_REQUEST, "Please specify targetLevel! Allowed range [" + VML_EXPORT_MIN_TARGET_LEVEL + ":"
+ VML_EXPORT_MAX_TARGET_LEVEL + "]");
Expand Down Expand Up @@ -257,8 +259,11 @@ private Export validateExport() throws HttpException {
if (getExportTarget().getType() == DOWNLOAD && getCsvFormat() != JSON_WKB)
throw new HttpException(BAD_REQUEST, "CompositeMode is not available for Type Download!");

if (getCsvFormat() != TILEID_FC_B64 && getCsvFormat() != PARTITIONID_FC_B64 && getCsvFormat() != JSON_WKB)
throw new HttpException(BAD_REQUEST, "CompositeMode does not support the provided CSV format!");
switch(getCsvFormat())
{ case TILEID_FC_B64: case PARTITIONID_FC_B64: case JSON_WKB: case PARTITIONED_JSON_WKB: break;
default :
throw new HttpException(BAD_REQUEST, "CompositeMode does not support the provided CSV format!");
}

if (ext == null) {
// No extension present - so we remove the composite mode
Expand Down Expand Up @@ -299,7 +304,7 @@ protected Future<Job> isValidForStart() {

if (compositeMode != DEACTIVATED ) {
switch( getCsvFormat() )
{ case TILEID_FC_B64 : case PARTITIONID_FC_B64 : case JSON_WKB : break;
{ case TILEID_FC_B64 : case PARTITIONID_FC_B64 : case JSON_WKB : case PARTITIONED_JSON_WKB : break;
default: return Future.failedFuture(new HttpException(BAD_REQUEST, "CSV format is not supported for CompositeMode!"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public static Status of(String value) {

@JsonView({Public.class})
public enum CSVFormat {
GEOJSON, JSON_WKT, JSON_WKB, TILEID_FC_B64, PARTITIONID_FC_B64;
GEOJSON, JSON_WKT, JSON_WKB, TILEID_FC_B64, PARTITIONID_FC_B64, PARTITIONED_JSON_WKB;

public static CSVFormat of(String value) {
if (value == null) {
Expand Down
Loading

0 comments on commit 33f4bc1

Please sign in to comment.