Skip to content

Commit

Permalink
ImlCopy: Catalog + Map, contentUpdatedAt
Browse files Browse the repository at this point in the history
Signed-off-by: qGYdXbY2 <[email protected]>
  • Loading branch information
qGYdXbY2 committed Dec 4, 2024
1 parent fe770f0 commit f9618d1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.here.xyz.jobs.Job;
import com.here.xyz.jobs.steps.compiler.ExportToFiles;
import com.here.xyz.jobs.steps.compiler.CopySpaceToSpace;
import com.here.xyz.jobs.steps.compiler.SpaceCopy;
import com.here.xyz.jobs.steps.compiler.ImportFromFiles;
import com.here.xyz.jobs.steps.compiler.JobCompilationInterceptor;
import com.here.xyz.util.Async;
Expand All @@ -41,7 +41,7 @@ public class JobCompiler {
static {
registerCompilationInterceptor(ImportFromFiles.class);
registerCompilationInterceptor(ExportToFiles.class);
registerCompilationInterceptor(CopySpaceToSpace.class);
registerCompilationInterceptor(SpaceCopy.class);
}

public Future<StepGraph> compile(Job job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.here.xyz.jobs.steps.compiler;

import java.util.Map;
import java.util.Set;

import com.here.xyz.events.PropertiesQuery;
Expand All @@ -37,7 +38,7 @@
import com.here.xyz.util.web.HubWebClient;
import com.here.xyz.util.web.XyzWebClient.WebClientException;

public class CopySpaceToSpace implements JobCompilationInterceptor {
public class SpaceCopy implements JobCompilationInterceptor {

protected boolean validSubType( String subType )
{ return "Space".equals(subType); }
Expand All @@ -50,7 +51,7 @@ && validSubType( job.getSource().getClass().getSimpleName() )
&& validSubType( job.getTarget().getClass().getSimpleName() );
}

private int threadCountCalc( long sourceFeatureCount, long targetFeatureCount )
private static int threadCountCalc( long sourceFeatureCount, long targetFeatureCount )
{
long PARALLELIZTATION_THRESHOLD = 100000;
int PARALLELIZTATION_THREAD_MAX = 8;
Expand All @@ -62,22 +63,21 @@ private int threadCountCalc( long sourceFeatureCount, long targetFeatureCount )
return PARALLELIZTATION_THREAD_MAX;
}

@Override
public CompilationStepGraph compile(Job job) {
final String sourceSpaceId = job.getSource().getKey();
final String targetSpaceId = job.getTarget().getKey();

public static CompilationStepGraph compileSteps(String sourceId, String targetId, String jobId, Filters filters, Ref versionRef)
{
final String sourceSpaceId = sourceId,
targetSpaceId = targetId;

StatisticsResponse sourceStatistics = null, targetStatistics = null;
try {
sourceStatistics = HubWebClient.getInstance(Config.instance.HUB_ENDPOINT).loadSpaceStatistics(sourceSpaceId);
targetStatistics = HubWebClient.getInstance(Config.instance.HUB_ENDPOINT).loadSpaceStatistics(targetSpaceId);
} catch (WebClientException e) {
String errMsg = String.format("Unable to get Staistics for %s", sourceStatistics != null ? sourceSpaceId : targetSpaceId );
String errMsg = String.format("Unable to get Staistics for %s", sourceStatistics == null ? sourceSpaceId : targetSpaceId );
throw new JobCompiler.CompilationError(errMsg);
}

CopySpacePre preCopySpace = new CopySpacePre().withSpaceId(targetSpaceId).withJobId(job.getId());
CopySpacePre preCopySpace = new CopySpacePre().withSpaceId(targetSpaceId).withJobId(jobId);

CompilationStepGraph startGraph = new CompilationStepGraph();
startGraph.addExecution(preCopySpace);
Expand All @@ -87,9 +87,6 @@ public CompilationStepGraph compile(Job job) {

int threadCount = threadCountCalc(sourceFeatureCount, targetFeatureCount);

Filters filters = ((DatasetDescription.Space<?>) job.getSource()).getFilters();
Ref versionRef = ((DatasetDescription.Space<?>) job.getSource()).getVersionRef();

SpatialFilter spatialFilter = filters != null ? filters.getSpatialFilter() : null;
PropertiesQuery propertyFilter = filters != null ? filters.getPropertyFilter() : null;

Expand All @@ -103,7 +100,7 @@ public CompilationStepGraph compile(Job job) {
.withSourceVersionRef(versionRef)
.withPropertyFilter(propertyFilter)
.withThreadInfo(new int[]{ threadId, threadCount })
.withJobId(job.getId());
.withJobId(jobId);

if (spatialFilter != null) {
copySpaceStep.setGeometry(spatialFilter.getGeometry());
Expand All @@ -117,10 +114,22 @@ public CompilationStepGraph compile(Job job) {
startGraph.addExecution(cGraph);

CopySpacePost postCopySpace = new CopySpacePost().withSpaceId(targetSpaceId)
.withJobId(job.getId());
.withJobId(jobId);

startGraph.addExecution(postCopySpace);

return startGraph;

}

@Override
public CompilationStepGraph compile(Job job) {

return compileSteps(job.getSource().getKey(),
job.getTarget().getKey(),
job.getId(),
((DatasetDescription.Space<?>) job.getSource()).getFilters(),
((DatasetDescription.Space<?>) job.getSource()).getVersionRef() );

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -45,6 +46,7 @@
import com.here.xyz.models.hub.Space;
import com.here.xyz.util.db.SQLQuery;
import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException;
import com.here.xyz.util.service.Core;
import com.here.xyz.util.web.XyzWebClient.WebClientException;


Expand Down Expand Up @@ -190,6 +192,8 @@ public void execute() throws Exception {
registerOutputs(List.of(statistics), true);

setCopiedByteSize( statistics.getByteSize() );

hubWebClient().patchSpace(getSpaceId(), Map.of("contentUpdatedAt", Core.currentTimeMillis()));

}

Expand Down

0 comments on commit f9618d1

Please sign in to comment.