Skip to content

Commit

Permalink
imlCopy: add resumes
Browse files Browse the repository at this point in the history
Signed-off-by: qGYdXbY2 <[email protected]>
  • Loading branch information
qGYdXbY2 committed Jan 13, 2025
1 parent d9f1f51 commit f4cc271
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private static Future<Long> getMinTag(Marker marker, String space){
)
.compose(r -> {
/* Return min tag of this space */
return Future.succeededFuture((r.isPresent() ? r.getAsLong() : null));
return Future.succeededFuture((r.isPresent() ? Long.valueOf(r.getAsLong()) : null));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER;
import static com.here.xyz.jobs.steps.execution.db.Database.loadDatabase;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_RESUME;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.createQueryContext;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog;

import com.fasterxml.jackson.annotation.JsonView;
import com.here.xyz.connectors.ErrorResponseException;
import com.here.xyz.events.GetFeaturesByGeometryEvent;
import com.here.xyz.events.PropertiesQuery;
import com.here.xyz.jobs.steps.execution.StepException;
import com.here.xyz.jobs.steps.execution.db.Database;
import com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole;
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
Expand Down Expand Up @@ -328,8 +330,7 @@ long _getCreatedVersion() {
return 0; //FIXME: Rather throw an exception here?
}

@Override
public void execute() throws Exception {
private void _execute(boolean resumed) throws Exception {
setVersion(_getCreatedVersion());

logger.info("[{}] Using fetched version {}", getGlobalStepId(), getVersion());
Expand All @@ -346,10 +347,20 @@ public void execute() throws Exception {
int threadId = getThreadInfo()[0],
threadCount = getThreadInfo()[1];

infoLog(STEP_EXECUTE, this, "Start ImlCopy thread number: " + threadId + " / " + threadCount);
infoLog(STEP_EXECUTE, this, "Start ImlCopy thread number: " + threadId + " / " + threadCount + (resumed ? " - resumed" : ""));
runReadQueryAsync(buildCopySpaceQuery(sourceSpace,targetSpace,threadCount, threadId), targetDb, calculateNeededAcus()/threadCount, true);
}

@Override
public void execute() throws Exception {
try { _execute(false); }
catch(Exception e)
{
String errMsg = String.format("Error iml-copy chunk-id %d#%d", getThreadInfo()[0], getThreadInfo()[1] );
throw new StepException(errMsg, e).withRetryable( true ); // always retryable at first place
}
}

@Override
protected void onAsyncSuccess() throws WebClientException, SQLException, TooManyResourcesClaimed, IOException {
logger.info("[{}] AsyncSuccess Copy {} -> {}", getGlobalStepId(), getSpaceId() , getTargetSpaceId());
Expand All @@ -364,8 +375,8 @@ protected void onStateCheck() {

@Override
public void resume() throws Exception {
//@TODO: Implement
logger.info("resume was called");
infoLog(STEP_RESUME, this, "resume was called");
_execute(true);
}

private String _getRootTableName(Space targetSpace) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.here.xyz.jobs.steps.Step.Visibility.USER;
import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_RESUME;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog;

import com.fasterxml.jackson.annotation.JsonView;
Expand Down Expand Up @@ -139,10 +140,10 @@ private FeatureStatistics getCopiedFeatures(long fetchedVersion) throws SQLExcep

SQLQuery incVersionSql = new SQLQuery(
"""
select count(1), coalesce( sum( (coalesce(pg_column_size(jsondata),0) + coalesce(pg_column_size(geo),0))::bigint ), 0::bigint )
from ${schema}.${table}
where version = ${{fetchedVersion}}
""")
select count(1), coalesce( sum( (coalesce(pg_column_size(jsondata),0) + coalesce(pg_column_size(geo),0))::bigint ), 0::bigint )
from ${schema}.${table}
where version = ${{fetchedVersion}}
""")
.withVariable("schema", targetSchema)
.withVariable("table", targetTable)
.withQueryFragment("fetchedVersion", "" + fetchedVersion);
Expand All @@ -169,8 +170,9 @@ private void writeContentUpdatedAtTs() throws WebClientException {
else {
logger.error("[{}] could not write contentUpadtedAt to space {}", getGlobalStepId(), getSpaceId());
throw new StepException("Error while updating the final target settings.", e)
.withRetryable(e instanceof ErrorResponseException responseException
&& responseException.getErrorResponse().statusCode() == 504);
.withRetryable( e instanceof ErrorResponseException responseException
&& responseException.getErrorResponse().statusCode() == 504
);
}
}
}
Expand All @@ -184,7 +186,7 @@ private void sleepWithIncr(int attempt) {

@Override
public void resume() throws Exception {
//TODO: Implement
logger.info("resume was called");
infoLog(STEP_RESUME, this, "resume was called");
execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM;
import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_RESUME;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog;

import com.here.xyz.jobs.steps.execution.StepException;
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
import com.here.xyz.jobs.steps.outputs.CreatedVersion;
import com.here.xyz.jobs.steps.resources.Load;
import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed;
import com.here.xyz.util.db.SQLQuery;
import com.here.xyz.util.web.XyzWebClient.ErrorResponseException;
import com.here.xyz.util.web.XyzWebClient.WebClientException;
import java.sql.SQLException;
import java.util.List;
Expand Down Expand Up @@ -87,12 +90,29 @@ public ExecutionMode getExecutionMode() {
return SYNC;
}

private void _execute(boolean resumed) throws Exception {
infoLog(STEP_EXECUTE, this, String.format("Fetch next version for %s%s", getSpaceId(), resumed ? " - resumed" : "") );
long nextVersionToUse = 0;
try {
nextVersionToUse = setVersionToNextInSequence();
}
catch( WebClientException e ) {
// retryable in case of webclient error
throw new StepException("Error retrieving next version to use", e)
.withRetryable( e instanceof ErrorResponseException responseException
&& responseException.getErrorResponse().statusCode() == 504
);
}

registerOutputs(List.of(new CreatedVersion().withVersion(nextVersionToUse)), VERSION);
}

@Override
public void execute() throws Exception {
infoLog(STEP_EXECUTE, this, "Fetch next version for " + getSpaceId());
registerOutputs(List.of(new CreatedVersion().withVersion(setVersionToNextInSequence())), VERSION);
_execute(false);
}


private long setVersionToNextInSequence() throws SQLException, TooManyResourcesClaimed, WebClientException {
//TODO: Remove the following duplicated code by simply re-using GetNextVersion QueryRunner
String targetSchema = getSchema(db()),
Expand All @@ -112,7 +132,8 @@ private long setVersionToNextInSequence() throws SQLException, TooManyResourcesC

@Override
public void resume() throws Exception {
//TODO: Implement
logger.info("resume was called");

infoLog(STEP_RESUME, this, "resume was called");
_execute(true);
}
}

0 comments on commit f4cc271

Please sign in to comment.