From f4cc271b4edd7fbb294100ac9b4ee08deb01655a Mon Sep 17 00:00:00 2001 From: qGYdXbY2 <47661341+qGYdXbY2@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:28:42 +0100 Subject: [PATCH] imlCopy: add resumes Signed-off-by: qGYdXbY2 <47661341+qGYdXbY2@users.noreply.github.com> --- .../hub/task/SpaceConnectorBasedHandler.java | 2 +- .../jobs/steps/impl/transport/CopySpace.java | 21 ++++++++++---- .../steps/impl/transport/CopySpacePost.java | 18 +++++++----- .../steps/impl/transport/CopySpacePre.java | 29 ++++++++++++++++--- 4 files changed, 52 insertions(+), 18 deletions(-) diff --git a/xyz-hub-service/src/main/java/com/here/xyz/hub/task/SpaceConnectorBasedHandler.java b/xyz-hub-service/src/main/java/com/here/xyz/hub/task/SpaceConnectorBasedHandler.java index 36c379845..9721384e3 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/hub/task/SpaceConnectorBasedHandler.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/hub/task/SpaceConnectorBasedHandler.java @@ -103,7 +103,7 @@ private static Future getMinTag(Marker marker, String space){ ) .compose(r -> { /* Return min tag of this space */ - return Future.succeededFuture((r.isPresent() ? r.getAsLong() : null)); + return Future.succeededFuture((r.isPresent() ? Long.valueOf(r.getAsLong()) : null)); }); } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java index 2537cd870..a737f20bf 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java @@ -23,6 +23,7 @@ import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER; import static com.here.xyz.jobs.steps.execution.db.Database.loadDatabase; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE; +import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_RESUME; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.createQueryContext; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog; @@ -30,6 +31,7 @@ import com.here.xyz.connectors.ErrorResponseException; import com.here.xyz.events.GetFeaturesByGeometryEvent; import com.here.xyz.events.PropertiesQuery; +import com.here.xyz.jobs.steps.execution.StepException; import com.here.xyz.jobs.steps.execution.db.Database; import com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole; import com.here.xyz.jobs.steps.impl.SpaceBasedStep; @@ -328,8 +330,7 @@ long _getCreatedVersion() { return 0; //FIXME: Rather throw an exception here? } - @Override - public void execute() throws Exception { + private void _execute(boolean resumed) throws Exception { setVersion(_getCreatedVersion()); logger.info("[{}] Using fetched version {}", getGlobalStepId(), getVersion()); @@ -346,10 +347,20 @@ public void execute() throws Exception { int threadId = getThreadInfo()[0], threadCount = getThreadInfo()[1]; - infoLog(STEP_EXECUTE, this, "Start ImlCopy thread number: " + threadId + " / " + threadCount); + infoLog(STEP_EXECUTE, this, "Start ImlCopy thread number: " + threadId + " / " + threadCount + (resumed ? " - resumed" : "")); runReadQueryAsync(buildCopySpaceQuery(sourceSpace,targetSpace,threadCount, threadId), targetDb, calculateNeededAcus()/threadCount, true); } + @Override + public void execute() throws Exception { + try { _execute(false); } + catch(Exception e) + { + String errMsg = String.format("Error iml-copy chunk-id %d#%d", getThreadInfo()[0], getThreadInfo()[1] ); + throw new StepException(errMsg, e).withRetryable( true ); // always retryable at first place + } + } + @Override protected void onAsyncSuccess() throws WebClientException, SQLException, TooManyResourcesClaimed, IOException { logger.info("[{}] AsyncSuccess Copy {} -> {}", getGlobalStepId(), getSpaceId() , getTargetSpaceId()); @@ -364,8 +375,8 @@ protected void onStateCheck() { @Override public void resume() throws Exception { - //@TODO: Implement - logger.info("resume was called"); + infoLog(STEP_RESUME, this, "resume was called"); + _execute(true); } private String _getRootTableName(Space targetSpace) throws SQLException { diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePost.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePost.java index d049ef638..4660f2c2c 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePost.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePost.java @@ -22,6 +22,7 @@ import static com.here.xyz.jobs.steps.Step.Visibility.USER; import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE; +import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_RESUME; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog; import com.fasterxml.jackson.annotation.JsonView; @@ -139,10 +140,10 @@ private FeatureStatistics getCopiedFeatures(long fetchedVersion) throws SQLExcep SQLQuery incVersionSql = new SQLQuery( """ - select count(1), coalesce( sum( (coalesce(pg_column_size(jsondata),0) + coalesce(pg_column_size(geo),0))::bigint ), 0::bigint ) - from ${schema}.${table} - where version = ${{fetchedVersion}} - """) + select count(1), coalesce( sum( (coalesce(pg_column_size(jsondata),0) + coalesce(pg_column_size(geo),0))::bigint ), 0::bigint ) + from ${schema}.${table} + where version = ${{fetchedVersion}} + """) .withVariable("schema", targetSchema) .withVariable("table", targetTable) .withQueryFragment("fetchedVersion", "" + fetchedVersion); @@ -169,8 +170,9 @@ private void writeContentUpdatedAtTs() throws WebClientException { else { logger.error("[{}] could not write contentUpadtedAt to space {}", getGlobalStepId(), getSpaceId()); throw new StepException("Error while updating the final target settings.", e) - .withRetryable(e instanceof ErrorResponseException responseException - && responseException.getErrorResponse().statusCode() == 504); + .withRetryable( e instanceof ErrorResponseException responseException + && responseException.getErrorResponse().statusCode() == 504 + ); } } } @@ -184,7 +186,7 @@ private void sleepWithIncr(int attempt) { @Override public void resume() throws Exception { - //TODO: Implement - logger.info("resume was called"); + infoLog(STEP_RESUME, this, "resume was called"); + execute(); } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePre.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePre.java index 867757d16..7cd41a5a8 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePre.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpacePre.java @@ -22,13 +22,16 @@ import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM; import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE; +import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_RESUME; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog; +import com.here.xyz.jobs.steps.execution.StepException; import com.here.xyz.jobs.steps.impl.SpaceBasedStep; import com.here.xyz.jobs.steps.outputs.CreatedVersion; import com.here.xyz.jobs.steps.resources.Load; import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed; import com.here.xyz.util.db.SQLQuery; +import com.here.xyz.util.web.XyzWebClient.ErrorResponseException; import com.here.xyz.util.web.XyzWebClient.WebClientException; import java.sql.SQLException; import java.util.List; @@ -87,12 +90,29 @@ public ExecutionMode getExecutionMode() { return SYNC; } + private void _execute(boolean resumed) throws Exception { + infoLog(STEP_EXECUTE, this, String.format("Fetch next version for %s%s", getSpaceId(), resumed ? " - resumed" : "") ); + long nextVersionToUse = 0; + try { + nextVersionToUse = setVersionToNextInSequence(); + } + catch( WebClientException e ) { + // retryable in case of webclient error + throw new StepException("Error retrieving next version to use", e) + .withRetryable( e instanceof ErrorResponseException responseException + && responseException.getErrorResponse().statusCode() == 504 + ); + } + + registerOutputs(List.of(new CreatedVersion().withVersion(nextVersionToUse)), VERSION); + } + @Override public void execute() throws Exception { - infoLog(STEP_EXECUTE, this, "Fetch next version for " + getSpaceId()); - registerOutputs(List.of(new CreatedVersion().withVersion(setVersionToNextInSequence())), VERSION); + _execute(false); } + private long setVersionToNextInSequence() throws SQLException, TooManyResourcesClaimed, WebClientException { //TODO: Remove the following duplicated code by simply re-using GetNextVersion QueryRunner String targetSchema = getSchema(db()), @@ -112,7 +132,8 @@ private long setVersionToNextInSequence() throws SQLException, TooManyResourcesC @Override public void resume() throws Exception { - //TODO: Implement - logger.info("resume was called"); + + infoLog(STEP_RESUME, this, "resume was called"); + _execute(true); } }