Skip to content

Commit

Permalink
Merge branch 'master' into fix-subscr-deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
lceni committed Dec 20, 2024
2 parents 374bf5d + 33cf30f commit 340cd4d
Show file tree
Hide file tree
Showing 81 changed files with 3,653 additions and 922 deletions.
1 change: 1 addition & 0 deletions .run/JobService.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
</extension>
<method v="2">
<option name="Make" enabled="true" />
<option name="RunConfigurationTask" enabled="true" run_configuration_name="jobServiceSetup" run_configuration_type="ShConfigurationType" />
</method>
</configuration>
</component>
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<version>3.12.10-SNAPSHOT</version>
<version>3.14.2-SNAPSHOT</version>
<packaging>pom</packaging>

<modules>
Expand Down Expand Up @@ -614,6 +614,12 @@
<version>5.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>hamcrest</artifactId>
<groupId>org.hamcrest</groupId>
Expand Down
2 changes: 1 addition & 1 deletion xyz-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../pom.xml</relativePath>
<version>3.12.10-SNAPSHOT</version>
<version>3.14.2-SNAPSHOT</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-hub-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../pom.xml</relativePath>
<version>3.12.10-SNAPSHOT</version>
<version>3.14.2-SNAPSHOT</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-hub-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../pom.xml</relativePath>
<version>3.12.10-SNAPSHOT</version>
<version>3.14.2-SNAPSHOT</version>
</parent>

<licenses>
Expand Down
10 changes: 5 additions & 5 deletions xyz-hub-test/src/test/java/com/here/xyz/hub/rest/TagApiIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void createTag() {
_createTag()
.statusCode(OK.code())
.body("id", equalTo("XYZ_1"))
.body("version", equalTo(-1));
.body("version", equalTo(0));

given()
.headers(getAuthHeaders(AuthProfile.ACCESS_ALL))
Expand Down Expand Up @@ -155,7 +155,7 @@ public void getTagVersion() {
.get("/spaces/" + getSpaceId() + "/tags/XYZ_1")
.then()
.statusCode(OK.code())
.body("version", equalTo(-1));
.body("version", equalTo(0));
}

@Test
Expand Down Expand Up @@ -242,7 +242,7 @@ public void testListSpacesFilterByTagId() {
.then()
.body("size()", is(1))
.body( "[0].tags.XYZ_1.id", equalTo("XYZ_1"))
.body( "[0].tags.XYZ_1.version", equalTo(-1));
.body( "[0].tags.XYZ_1.version", equalTo(0));
}

@Ignore("Disabled. Takes too long")
Expand Down Expand Up @@ -398,7 +398,7 @@ public void testGetSystemTag() {
.then()
.statusCode(OK.code())
.body("id", equalTo("XYZ_1"))
.body("version", equalTo(-1))
.body("version", equalTo(0))
.body("$", not(hasKey("system")));

given()
Expand All @@ -407,7 +407,7 @@ public void testGetSystemTag() {
.then()
.statusCode(OK.code())
.body("id", equalTo("XYZ_2"))
.body("version", equalTo(-1))
.body("version", equalTo(0))
.body("$", hasKey("system"))
.body("system", equalTo(true));
}
Expand Down
2 changes: 1 addition & 1 deletion xyz-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<version>3.12.10-SNAPSHOT</version>
<version>3.14.2-SNAPSHOT</version>
</parent>

<name>XYZ Job Framework</name>
Expand Down
24 changes: 22 additions & 2 deletions xyz-jobs/xyz-job-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.here.xyz</groupId>
<artifactId>xyz-jobs</artifactId>
<version>3.12.10-SNAPSHOT</version>
<version>3.14.2-SNAPSHOT</version>
</parent>

<name>XYZ Job Service</name>
Expand Down Expand Up @@ -162,7 +162,12 @@
<groupId>com.lmax</groupId>
</dependency>

<!-- Test -->
<!-- Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>junit-jupiter-api</artifactId>
<groupId>org.junit.jupiter</groupId>
Expand All @@ -173,6 +178,21 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>hamcrest</artifactId>
<groupId>org.hamcrest</groupId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
74 changes: 56 additions & 18 deletions xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.here.xyz.jobs.datasets.DatasetDescription;
import com.here.xyz.jobs.datasets.Files;
import com.here.xyz.jobs.datasets.streams.DynamicStream;
import com.here.xyz.jobs.processes.ProcessDescription;
import com.here.xyz.jobs.steps.Config;
import com.here.xyz.jobs.steps.JobCompiler;
import com.here.xyz.jobs.steps.Step;
import com.here.xyz.jobs.steps.StepGraph;
Expand All @@ -53,8 +55,12 @@
import com.here.xyz.jobs.steps.outputs.Output;
import com.here.xyz.jobs.steps.resources.ExecutionResource;
import com.here.xyz.jobs.steps.resources.Load;
import com.here.xyz.models.hub.Space;
import com.here.xyz.util.Async;
import com.here.xyz.util.service.Core;
import com.here.xyz.util.web.HubWebClient;
import com.here.xyz.util.web.XyzWebClient;
import com.here.xyz.util.web.XyzWebClient.ErrorResponseException;
import io.vertx.core.Future;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -87,6 +93,8 @@ public class Job implements XyzSerializable {
@JsonView({Public.class, Static.class})
private String description;
@JsonView({Public.class, Static.class})
private ProcessDescription process;
@JsonView({Public.class, Static.class})
private DatasetDescription source;
@JsonView({Public.class, Static.class})
private DatasetDescription target;
Expand All @@ -97,23 +105,13 @@ public class Job implements XyzSerializable {
private String executionId;
@JsonView({Public.class, Static.class})
private JobClientInfo clientInfo;
@JsonView(Static.class)
private String secondaryResourceKey;

private static final Async ASYNC = new Async(20, Job.class);
private static final Logger logger = LogManager.getLogger();
private static final long DEFAULT_JOB_TTL = TimeUnit.DAYS.toMillis(4 * 7); //4 weeks

public static void main(String[] args) {
long createdAt = 1727910277687l;
long keepUntil = 1728515077;

final long actual = keepUntil * 1000;
System.out.println("Actual keep Until : " + actual);
final long expected = createdAt + DEFAULT_JOB_TTL;
System.out.println("Expected keep until: " + expected);
System.out.println("Difference: " + (expected - actual));

}

/**
* Creates a new Job.
* The new job will have the following properties being filled by the framework:
Expand Down Expand Up @@ -210,15 +208,16 @@ private Future<Void> prepareStep(Step step) {
*/
protected Future<Boolean> validate() {
//TODO: Collect exceptions and forward them accordingly as one exception object with (potentially) multiple error objects inside
return Future.all(Job.forEach(getSteps().stepStream().collect(Collectors.toList()), step -> validateStep(step)))
.compose(cf -> Future.succeededFuture(cf.list().stream().allMatch(validation -> (boolean) validation)));
return Future.all(Job.forEach(getSteps().stepStream().toList(), step -> validateStep(step)))
.compose(cf -> Future.succeededFuture(cf.list().stream().allMatch(isReady -> (boolean) isReady)));
}

private static Future<Boolean> validateStep(Step step) {
return ASYNC.run(() -> {
boolean isReady = step.validate();
if (isReady && step.getStatus().getState() != SUBMITTED)
step.getStatus().setState(SUBMITTED);
State targetState = isReady ? SUBMITTED : NOT_READY;
if (step.getStatus().getState() != targetState)
step.getStatus().setState(targetState);
return isReady;
});
}
Expand Down Expand Up @@ -489,7 +488,7 @@ public Future<List<Input>> loadInputs() {

public Future<List<Output>> loadOutputs() {
return ASYNC.run(() -> steps.stepStream()
.map(step -> (List<Output>) step.loadOutputs(true))
.map(step -> (List<Output>) step.loadUserOutputs())
.flatMap(ol -> ol.stream())
.collect(Collectors.toList()));
}
Expand Down Expand Up @@ -522,10 +521,36 @@ public Job withId(String id) {
@JsonView(Static.class)
public String getResourceKey() {
//Always use key from the source except when the source is Files
if(getSource() == null) return null;
if (getSource() == null)
return null;
return getSource() instanceof Files<?> ? getTarget().getKey() : getSource().getKey();
}

public String getSecondaryResourceKey() {
if (secondaryResourceKey != null)
return secondaryResourceKey;

String key = getResourceKey();
if (key == null)
return null;

try {
Space.Extension extension = HubWebClient.getInstance(Config.instance.HUB_ENDPOINT).loadSpace(key).getExtension();
if (extension != null)
secondaryResourceKey = extension.getSpaceId();
}
catch (XyzWebClient.WebClientException e) {
//Ignore if space is not present anymore
if (!(e instanceof ErrorResponseException errorResponseException && errorResponseException.getStatusCode() == 404))
throw new RuntimeException(e);
}
return secondaryResourceKey;
}

private void setSecondaryResourceKey(String secondaryResourceKey) {
this.secondaryResourceKey = secondaryResourceKey;
}

public String getDescription() {
return description;
}
Expand Down Expand Up @@ -640,6 +665,19 @@ public Job withClientInfo(JobClientInfo clientInfo) {
return this;
}

public ProcessDescription getProcess() {
return process;
}

public void setProcess(ProcessDescription process) {
this.process = process;
}

public Job withProcess(ProcessDescription process) {
setProcess(process);
return this;
}

public DatasetDescription getSource() {
return source;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@
import com.here.xyz.util.service.aws.dynamo.DynamoClient;
import com.here.xyz.util.service.aws.dynamo.IndexDefinition;
import io.vertx.core.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DynamoJobConfigClient extends JobConfigClient {
private static final Logger logger = LogManager.getLogger();
Expand Down Expand Up @@ -97,6 +98,43 @@ public Future<List<Job>> loadJobs(State state) {
});
}

public Future<List<Job>> loadJobs(String resourceKey, String secondaryResourceKey) {
if(secondaryResourceKey == null)
return loadJobs(resourceKey, resourceKey);

return dynamoClient.executeQueryAsync(() -> {
List<Job> jobs = new LinkedList<>();

// Query by resourceKey
jobTable.getIndex("resourceKey-index")
.query("resourceKey", resourceKey)
.pages()
.forEach(page ->
page.forEach(jobItem ->
jobs.add(XyzSerializable.fromMap(jobItem.asMap(), Job.class))
)
);

// If secondaryResourceKey is provided, query it too
if (secondaryResourceKey != null) {
jobTable.getIndex("secondaryResourceKey-index")
.query("secondaryResourceKey", secondaryResourceKey)
.pages()
.forEach(page ->
page.forEach(jobItem -> {
Job job = XyzSerializable.fromMap(jobItem.asMap(), Job.class);
// Avoid duplicates
if (!jobs.contains(job)) {
jobs.add(job);
}
})
);
}

return jobs;
});
}

@Override
public Future<List<Job>> loadJobs(String resourceKey) {
return dynamoClient.executeQueryAsync(() -> {
Expand All @@ -120,6 +158,17 @@ else if (state != null)
return loadJobs();
}

@Override
public Future<List<Job>> loadJobs(String resourceKey, String secondaryResourceKey, State state) {
if (resourceKey != null)
//TODO: Use an index with hash- *and* range-key
return loadJobs(resourceKey, secondaryResourceKey).map(jobs -> jobs.stream().filter(job -> state == null || job.getStatus().getState() == state).toList());
else if (state != null)
return loadJobs(state);
else
return loadJobs();
}

@Override
public Future<Void> storeJob(Job job) {
return dynamoClient.executeQueryAsync(() -> {
Expand Down Expand Up @@ -223,8 +272,9 @@ public Future<Void> init() {
if (dynamoClient.isLocal()) {
logger.info("DynamoDB running locally, initializing Jobs table.");
try {
List<IndexDefinition> indexes = List.of(new IndexDefinition("resourceKey"), new IndexDefinition("state"));
dynamoClient.createTable(jobTable.getTableName(), "id:S,resourceKey:S,state:S", "id", indexes, "keepUntil");
List<IndexDefinition> indexes = List.of(new IndexDefinition("resourceKey"), new IndexDefinition("state"),
new IndexDefinition("secondaryResourceKey"));
dynamoClient.createTable(jobTable.getTableName(), "id:S,resourceKey:S,secondaryResourceKey:S,state:S", "id", indexes, "keepUntil");
//TODO: Register a dynamo stream (also in CFN) to ensure we're getting informed when a job expires
}
catch (Exception e) {
Expand Down
Loading

0 comments on commit 340cd4d

Please sign in to comment.