Skip to content

Commit

Permalink
Also use cached space() & db() method calls in Copy steps
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Rögner <[email protected]>
  • Loading branch information
roegi committed Jan 9, 2025
1 parent a32bda2 commit 2cdb424
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ public CopySpace withVersion(long version) {
public List<Load> getNeededResources() {
try {
List<Load> expectedLoads = new ArrayList<>();
Space sourceSpace = loadSpace(getSpaceId());
Space targetSpace = loadSpace(getTargetSpaceId());
Space sourceSpace = space();
Space targetSpace = targetSpace();

expectedLoads.add(new Load().withResource(loadDatabase(targetSpace.getStorage().getId(), WRITER))
.withEstimatedVirtualUnits(calculateNeededAcus()));
Expand Down Expand Up @@ -280,9 +280,9 @@ public boolean validate() throws ValidationException {
throw new ValidationException("Source = Target!");

try {
Space sourceSpace = loadSpace(getSpaceId());
Space sourceSpace = space();
boolean isExtended = sourceSpace.getExtension() != null;
StatisticsResponse sourceStatistics = loadSpaceStatistics(getSpaceId(), isExtended ? EXTENSION : null);
StatisticsResponse sourceStatistics = loadSpaceStatistics(getSpaceId(), isExtended ? EXTENSION : null); //TODO: use caching?
estimatedSourceFeatureCount = sourceStatistics.getCount().getValue();
estimatedSourceByteSize = sourceStatistics.getDataSize().getValue();
}
Expand All @@ -291,9 +291,9 @@ public boolean validate() throws ValidationException {
}

try {
Space targetSpace = loadSpace(getSpaceId());
Space targetSpace = targetSpace();
boolean isExtended = targetSpace.getExtension() != null;
StatisticsResponse targetStatistics = loadSpaceStatistics(getTargetSpaceId(), isExtended ? EXTENSION : null);
StatisticsResponse targetStatistics = loadSpaceStatistics(getTargetSpaceId(), isExtended ? EXTENSION : null); //TODO: use caching?
estimatedTargetFeatureCount = targetStatistics.getCount().getValue();
}catch (WebClientException e) {
throw new ValidationException("Error loading target space \"" + getTargetSpaceId() + "\"", e);
Expand All @@ -316,6 +316,10 @@ public boolean validate() throws ValidationException {
return true;
}

private Space targetSpace() throws WebClientException {
return space(getTargetSpaceId());
}

//TODO: Remove that workaround once the 3 copy steps were properly merged into one step again
long _getCreatedVersion() {
for (InputFromOutput input : (List<InputFromOutput>)(List<?>) loadInputs(InputFromOutput.class))
Expand All @@ -328,22 +332,22 @@ long _getCreatedVersion() {
public void execute() throws Exception {
setVersion(_getCreatedVersion());

logger.info("Using fetched version " + getVersion());
logger.info("[{}] Using fetched version {}", getGlobalStepId(), getVersion());

logger.info( "Loading space config for source-space "+getSpaceId());
Space sourceSpace = loadSpace(getSpaceId());
logger.info("[{}] Loading space config for source-space {} ...", getGlobalStepId(), getSpaceId());
Space sourceSpace = space();

logger.info( "Loading space config for target-space " + getTargetSpaceId());
Space targetSpace = loadSpace(getTargetSpaceId());
logger.info("[{}] Loading space config for target-space {} ...", getGlobalStepId(), getTargetSpaceId());
Space targetSpace = targetSpace();

logger.info("Getting storage database for space "+getSpaceId());
Database db = loadDatabase(targetSpace.getStorage().getId(), WRITER);
logger.info("[{}] Getting storage database for space {} ...", getGlobalStepId(), getSpaceId());
Database targetDb = loadDatabase(targetSpace.getStorage().getId(), WRITER);

int threadId = getThreadInfo()[0],
threadCount = getThreadInfo()[1];

infoLog(STEP_EXECUTE, this, "Start ImlCopy thread number: " + threadId + " / " + threadCount);
runReadQueryAsync(buildCopySpaceQuery(sourceSpace,targetSpace,threadCount, threadId), db, calculateNeededAcus()/threadCount, true);
runReadQueryAsync(buildCopySpaceQuery(sourceSpace,targetSpace,threadCount, threadId), targetDb, calculateNeededAcus()/threadCount, true);
}

@Override
Expand Down Expand Up @@ -392,9 +396,7 @@ private boolean isRemoteCopy(Space sourceSpace, Space targetSpace) {
}

private boolean isRemoteCopy() throws WebClientException {
Space sourceSpace = loadSpace(getSpaceId());
Space targetSpace = loadSpace(getTargetSpaceId());
return isRemoteCopy(sourceSpace, targetSpace);
return isRemoteCopy(space(), targetSpace());
}

private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace, int threadCount, int threadId)
Expand Down Expand Up @@ -485,7 +487,7 @@ private SQLQuery buildCopyContentQuery(Space space, int threadCount, int threadI

private SearchForFeatures getQueryRunner(GetFeaturesByGeometryEvent event) throws SQLException, ErrorResponseException,
TooManyResourcesClaimed, WebClientException {
Space sourceSpace = loadSpace(getSpaceId());
Space sourceSpace = space();

Database db = !isRemoteCopy()
? loadDatabase(sourceSpace.getStorage().getId(), WRITER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@

import static com.here.xyz.jobs.steps.Step.Visibility.USER;
import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC;
import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER;
import static com.here.xyz.jobs.steps.execution.db.Database.loadDatabase;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog;

import com.fasterxml.jackson.annotation.JsonView;
import com.here.xyz.jobs.steps.execution.StepException;
import com.here.xyz.jobs.steps.execution.db.Database;
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
import com.here.xyz.jobs.steps.inputs.InputFromOutput;
import com.here.xyz.jobs.steps.outputs.CreatedVersion;
import com.here.xyz.jobs.steps.outputs.FeatureStatistics;
import com.here.xyz.jobs.steps.resources.IOResource;
import com.here.xyz.jobs.steps.resources.Load;
import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed;
import com.here.xyz.models.hub.Space;
import com.here.xyz.util.db.SQLQuery;
import com.here.xyz.util.service.Core;
import com.here.xyz.util.web.XyzWebClient.ErrorResponseException;
Expand Down Expand Up @@ -67,20 +63,20 @@ public class CopySpacePost extends SpaceBasedStep<CopySpacePost> {
@Override
public List<Load> getNeededResources() {
try {
Space sourceSpace = loadSpace(getSpaceId());

Load expectedDbLoad = new Load()
.withResource(loadDatabase(sourceSpace.getStorage().getId(), WRITER))
.withResource(db())
.withEstimatedVirtualUnits(0.0);

//Billing, reporting
Load expectedIoLoad = new Load().withResource(IOResource.getInstance()).withEstimatedVirtualUnits(getCopiedByteSize());

logger.info("[{}] getNeededResources {}", getGlobalStepId(), sourceSpace.getStorage().getId());
logger.info("[{}] getNeededResources {}", getGlobalStepId(), getSpaceId());

return List.of(expectedDbLoad, expectedIoLoad);
}
catch (WebClientException e) {
//TODO: log error
//TODO: is the step failed? Retry later? It could be a retryable error as the prior validation succeeded, depending on the type of HubWebClientException
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -138,10 +134,8 @@ long _getCreatedVersion() {
}

private FeatureStatistics getCopiedFeatures(long fetchedVersion) throws SQLException, TooManyResourcesClaimed, WebClientException {
Space targetSpace = loadSpace(getSpaceId());
Database targetDb = loadDatabase(targetSpace.getStorage().getId(), WRITER);
String targetSchema = getSchema(targetDb),
targetTable = getRootTableName(targetSpace);
String targetSchema = getSchema(db()),
targetTable = getRootTableName(space());

SQLQuery incVersionSql = new SQLQuery(
"""
Expand All @@ -153,7 +147,7 @@ select count(1), coalesce( sum( (coalesce(pg_column_size(jsondata),0) + coalesce
.withVariable("table", targetTable)
.withQueryFragment("fetchedVersion", "" + fetchedVersion);

FeatureStatistics statistics = runReadQuerySync(incVersionSql, targetDb, 0, rs -> rs.next()
FeatureStatistics statistics = runReadQuerySync(incVersionSql, db(), 0, rs -> rs.next()
? new FeatureStatistics().withFeatureCount(rs.getLong(1)).withByteSize(rs.getLong(2))
: new FeatureStatistics());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@

import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM;
import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC;
import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER;
import static com.here.xyz.jobs.steps.execution.db.Database.loadDatabase;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog;

import com.here.xyz.jobs.steps.execution.db.Database;
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
import com.here.xyz.jobs.steps.outputs.CreatedVersion;
import com.here.xyz.jobs.steps.resources.Load;
import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed;
import com.here.xyz.models.hub.Space;
import com.here.xyz.util.db.SQLQuery;
import com.here.xyz.util.web.XyzWebClient.WebClientException;
import java.sql.SQLException;
Expand All @@ -56,17 +52,17 @@ public class CopySpacePre extends SpaceBasedStep<CopySpacePre> {
@Override
public List<Load> getNeededResources() {
try {
Space sourceSpace = loadSpace(getSpaceId());

Load expectedLoad = new Load()
.withResource(loadDatabase(sourceSpace.getStorage().getId(), WRITER))
.withResource(db())
.withEstimatedVirtualUnits(0.0);

logger.info("[{}] getNeededResources {}", getGlobalStepId(), sourceSpace.getStorage().getId());
logger.info("[{}] getNeededResources {}", getGlobalStepId(), getSpaceId());

return List.of(expectedLoad);
}
catch (WebClientException e) {
//TODO: log error
//TODO: is the step failed? Retry later? It could be a retryable error as the prior validation succeeded, depending on the type of HubWebClientException
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -99,16 +95,14 @@ public void execute() throws Exception {

private long setVersionToNextInSequence() throws SQLException, TooManyResourcesClaimed, WebClientException {
//TODO: Remove the following duplicated code by simply re-using GetNextVersion QueryRunner
Space targetSpace = loadSpace(getSpaceId());
Database targetDb = loadDatabase(targetSpace.getStorage().getId(), WRITER);
String targetSchema = getSchema(targetDb),
targetTable = getRootTableName(targetSpace);
String targetSchema = getSchema(db()),
targetTable = getRootTableName(space());

SQLQuery incVersionSql = new SQLQuery("SELECT nextval('${schema}.${versionSequenceName}')")
.withVariable("schema", targetSchema)
.withVariable("versionSequenceName", targetTable + "_version_seq");

long newVersion = runReadQuerySync(incVersionSql, targetDb, 0, rs -> {
long newVersion = runReadQuerySync(incVersionSql, db(), 0, rs -> {
rs.next();
return rs.getLong(1);
});
Expand Down

0 comments on commit 2cdb424

Please sign in to comment.