From c7beee239ff25ca7afef117d0e08127567672225 Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Sat, 25 Feb 2023 22:32:46 +0530 Subject: [PATCH 1/6] MAJOR: added some new modules to implement. eg inmemory, mongodb, jdbc --- pom.xml | 7 + .../adapter/base/BasePersistenceAdapter.java | 26 ++- .../workflow/api/bean/id/Id.java | 3 + .../workflow/api/bean/id/ManagerId.java | 4 + .../workflow/api/serde/IdSerializer.java | 19 ++ workflow-inmemory/pom.xml | 20 +++ .../java/io/github/pavansharma36/App.java | 13 ++ .../adapter/InmemoryPersistenceAdapter.java | 145 +++++++++++++++ .../java/io/github/pavansharma36/AppTest.java | 38 ++++ workflow-jdbc/pom.xml | 16 ++ workflow-jdbc/workflow-jdbc-common/pom.xml | 21 +++ .../java/io/github/pavansharma36/App.java | 13 ++ .../java/io/github/pavansharma36/AppTest.java | 38 ++++ workflow-jedis/pom.xml | 4 - workflow-mongodb/pom.xml | 34 ++++ .../java/io/github/pavansharma36/App.java | 13 ++ .../adapter/MongoDBPersistenceAdapter.java | 166 ++++++++++++++++++ .../workflow/mongodb/helper/JacksonCodec.java | 51 ++++++ .../mongodb/helper/JacksonCodecProvider.java | 29 +++ .../mongodb/helper/MongoDBQueryHelper.java | 34 ++++ .../java/io/github/pavansharma36/AppTest.java | 38 ++++ .../workflow/mongodb/MongoDBNormalTest.java | 32 ++++ 22 files changed, 755 insertions(+), 9 deletions(-) create mode 100644 workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java create mode 100644 workflow-inmemory/pom.xml create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/App.java create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java create mode 100644 workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java create mode 100644 workflow-jdbc/pom.xml create mode 100644 workflow-jdbc/workflow-jdbc-common/pom.xml create mode 100644 workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java create mode 100644 workflow-jdbc/workflow-jdbc-common/src/test/java/io/github/pavansharma36/AppTest.java create mode 100644 workflow-mongodb/pom.xml create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/App.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java create mode 100644 workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java create mode 100644 workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java diff --git a/pom.xml b/pom.xml index dd79b19..7f42b92 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,10 @@ 4.7.0 9.3 2.0.0-M3 + + + 3.7.1 + 4.9.0 @@ -255,5 +259,8 @@ workflow-api workflow-jedis workflow-examples + workflow-jdbc + workflow-mongodb + workflow-inmemory diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java index ef863fd..30a5802 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java @@ -1,24 +1,40 @@ package io.github.pavansharma36.workflow.api.adapter.base; import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import io.github.pavansharma36.workflow.api.model.ManagerInfo; import io.github.pavansharma36.workflow.api.serde.Deserializer; import io.github.pavansharma36.workflow.api.serde.Serde; import io.github.pavansharma36.workflow.api.serde.Serializer; import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; +import java.util.Date; +import java.util.List; +import lombok.extern.slf4j.Slf4j; /** * Base class for all {@link PersistenceAdapter}. */ +@Slf4j public abstract class BasePersistenceAdapter extends BaseAdapter implements PersistenceAdapter { - protected final Serializer serializer; - protected final Deserializer deserializer; - protected BasePersistenceAdapter(String namespace, PollDelayGenerator pollDelayGenerator, Serde serde) { super(namespace, pollDelayGenerator, serde); - this.serializer = serde.serializer(); - this.deserializer = serde.deserializer(); } + + @Override + public void maintenance(WorkflowAdapter adapter) { + List managerInfos = getAllManagerInfos(); + long minHeartbeatTimestamp = System.currentTimeMillis() + - (heartbeatDelayGenerator().delay(false).toMillis() * 15); + managerInfos.forEach(m -> { + if (m.getHeartbeatEpoch() < minHeartbeatTimestamp) { + log.info("WorkflowManager's heartbeat is not updated since {}, purging", + new Date(m.getHeartbeatEpoch())); + removeManagerInfo(m.getManagerId()); + } + }); + } + } diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java index aaa1e72..c4bdc3a 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java @@ -1,5 +1,7 @@ package io.github.pavansharma36.workflow.api.bean.id; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.github.pavansharma36.workflow.api.serde.IdSerializer; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -12,6 +14,7 @@ @ToString @EqualsAndHashCode @AllArgsConstructor +@JsonSerialize(using = IdSerializer.class) public class Id { /** * string representation of this id. diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java index 3ae0ba3..ab2c088 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java @@ -9,4 +9,8 @@ public class ManagerId extends Id { public ManagerId() { super(Utils.random()); } + + public ManagerId(String id) { + super(id); + } } diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java new file mode 100644 index 0000000..e135df6 --- /dev/null +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java @@ -0,0 +1,19 @@ +package io.github.pavansharma36.workflow.api.serde; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import io.github.pavansharma36.workflow.api.bean.id.Id; +import java.io.IOException; + +public class IdSerializer extends StdSerializer { + protected IdSerializer(Class t) { + super(t); + } + + @Override + public void serialize(Id id, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeString(id.getId()); + } +} diff --git a/workflow-inmemory/pom.xml b/workflow-inmemory/pom.xml new file mode 100644 index 0000000..2419afc --- /dev/null +++ b/workflow-inmemory/pom.xml @@ -0,0 +1,20 @@ + + + workflow + io.github.pavansharma36 + 0.0.1-SNAPSHOT + + 4.0.0 + + workflow-inmemory + jar + + + + io.github.pavansharma36 + workflow-api + ${project.version} + + + diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/App.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/App.java new file mode 100644 index 0000000..9ecae98 --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/App.java @@ -0,0 +1,13 @@ +package io.github.pavansharma36; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java new file mode 100644 index 0000000..0954274 --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java @@ -0,0 +1,145 @@ +package io.github.pavansharma36.workflow.inmemory.adapter; + +import io.github.pavansharma36.workflow.api.WorkflowManager; +import io.github.pavansharma36.workflow.api.adapter.base.BasePersistenceAdapter; +import io.github.pavansharma36.workflow.api.bean.id.ManagerId; +import io.github.pavansharma36.workflow.api.bean.id.RunId; +import io.github.pavansharma36.workflow.api.bean.id.TaskId; +import io.github.pavansharma36.workflow.api.executor.ExecutableTask; +import io.github.pavansharma36.workflow.api.executor.ExecutionResult; +import io.github.pavansharma36.workflow.api.model.ManagerInfo; +import io.github.pavansharma36.workflow.api.model.RunInfo; +import io.github.pavansharma36.workflow.api.model.TaskInfo; +import io.github.pavansharma36.workflow.api.serde.Serde; +import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class InmemoryPersistenceAdapter extends BasePersistenceAdapter { + + private Map managerInfos = new HashMap<>(); + private Map runInfos = new HashMap<>(); + private Map> taskInfos = new HashMap<>(); + + protected InmemoryPersistenceAdapter(String namespace, PollDelayGenerator pollDelayGenerator, + Serde serde) { + super(namespace, pollDelayGenerator, serde); + } + + @Override + public void start(WorkflowManager workflowManager) { + + } + + @Override + public void stop() { + + } + + @Override + public boolean createOrUpdateManagerInfo(ManagerInfo managerInfo) { + managerInfos.computeIfAbsent(managerInfo.getManagerId(), managerId -> managerInfo); + return true; + } + + @Override + public List getAllManagerInfos() { + return new ArrayList<>(managerInfos.values()); + } + + @Override + public boolean removeManagerInfo(ManagerId id) { + return managerInfos.remove(id) != null; + } + + @Override + public boolean updateQueuedTime(RunId runId, TaskId taskId) { + taskInfos.computeIfPresent(runId, (i, taskInfoMap) -> { + taskInfoMap.computeIfPresent(taskId, (j, task) -> { + task.setQueuedTimeEpoch(System.currentTimeMillis()); + return task; + }); + return taskInfoMap; + }); + return true; + } + + @Override + public boolean updateStartTime(RunId runId) { + runInfos.computeIfPresent(runId, (i, run) -> { + run.setStartTimeEpoch(System.currentTimeMillis()); + return run; + }); + return true; + } + + @Override + public boolean updateStartTime(RunId runId, TaskId taskId, ManagerId processedBy) { + taskInfos.computeIfPresent(runId, (i, taskInfoMap) -> { + taskInfoMap.computeIfPresent(taskId, (j, task) -> { + task.setStartTimeEpoch(System.currentTimeMillis()); + task.setProcessedBy(processedBy); + return task; + }); + return taskInfoMap; + }); + return true; + } + + @Override + public boolean completeTask(ExecutableTask executableTask, ExecutionResult executionResult) { + return false; + } + + @Override + public Optional getTaskInfo(RunId runId, TaskId taskId) { + return Optional.ofNullable(taskInfos.getOrDefault(runId, Collections.emptyMap()).get(taskId)); + } + + @Override + public Optional getRunInfo(RunId runId) { + return Optional.ofNullable(runInfos.get(runId)); + } + + @Override + public void createRunInfo(RunInfo runInfo) { + runInfos.put(runInfo.getRunId(), runInfo); + } + + @Override + public boolean updateRunInfoEpoch(RunId runId) { + runInfos.computeIfPresent(runId, (i, run) -> { + run.setLastUpdateEpoch(System.currentTimeMillis()); + return run; + }); + return true; + } + + @Override + public void createTaskInfos(RunId runId, List taskInfos) { + this.taskInfos.put(runId, taskInfos.stream().collect(Collectors.toMap(TaskInfo::getTaskId, t -> t))); + } + + @Override + public boolean cleanup(RunId runId) { + taskInfos.remove(runId); + runInfos.remove(runId); + return true; + } + + @Override + public List getStuckRunInfos(Duration maxDuration) { + return null; + } + + @Override + public PollDelayGenerator heartbeatDelayGenerator() { + return null; + } +} diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java new file mode 100644 index 0000000..98d2799 --- /dev/null +++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java @@ -0,0 +1,38 @@ +package io.github.pavansharma36; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/workflow-jdbc/pom.xml b/workflow-jdbc/pom.xml new file mode 100644 index 0000000..179fa1e --- /dev/null +++ b/workflow-jdbc/pom.xml @@ -0,0 +1,16 @@ + + + workflow + io.github.pavansharma36 + 0.0.1-SNAPSHOT + + 4.0.0 + + workflow-jdbc + pom + + workflow-jdbc-common + + + diff --git a/workflow-jdbc/workflow-jdbc-common/pom.xml b/workflow-jdbc/workflow-jdbc-common/pom.xml new file mode 100644 index 0000000..b69e88f --- /dev/null +++ b/workflow-jdbc/workflow-jdbc-common/pom.xml @@ -0,0 +1,21 @@ + + + workflow-jdbc + io.github.pavansharma36 + 0.0.1-SNAPSHOT + + 4.0.0 + + workflow-jdbc-common + jar + + + + io.github.pavansharma36 + workflow-api + ${project.version} + + + + diff --git a/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java b/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java new file mode 100644 index 0000000..9ecae98 --- /dev/null +++ b/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java @@ -0,0 +1,13 @@ +package io.github.pavansharma36; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/workflow-jdbc/workflow-jdbc-common/src/test/java/io/github/pavansharma36/AppTest.java b/workflow-jdbc/workflow-jdbc-common/src/test/java/io/github/pavansharma36/AppTest.java new file mode 100644 index 0000000..98d2799 --- /dev/null +++ b/workflow-jdbc/workflow-jdbc-common/src/test/java/io/github/pavansharma36/AppTest.java @@ -0,0 +1,38 @@ +package io.github.pavansharma36; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/workflow-jedis/pom.xml b/workflow-jedis/pom.xml index 5bb60b0..8b85f09 100644 --- a/workflow-jedis/pom.xml +++ b/workflow-jedis/pom.xml @@ -11,10 +11,6 @@ http://maven.apache.org Module with WorkflowManager api implementation using Jedis - - UTF-8 - 3.7.1 - diff --git a/workflow-mongodb/pom.xml b/workflow-mongodb/pom.xml new file mode 100644 index 0000000..9ae7baa --- /dev/null +++ b/workflow-mongodb/pom.xml @@ -0,0 +1,34 @@ + + + workflow + io.github.pavansharma36 + 0.0.1-SNAPSHOT + + 4.0.0 + + workflow-mongodb + jar + + + + io.github.pavansharma36 + workflow-api + ${project.version} + + + + org.mongodb + mongodb-driver-sync + ${mongodb.driver.sync.version} + + + + io.github.pavansharma36 + workflow-api + ${project.version} + test-jar + test + + + diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java new file mode 100644 index 0000000..9ecae98 --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java @@ -0,0 +1,13 @@ +package io.github.pavansharma36; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java new file mode 100644 index 0000000..de9122b --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java @@ -0,0 +1,166 @@ +package io.github.pavansharma36.workflow.mongodb.adapter; + +import com.mongodb.DBObjectCodecProvider; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.Updates; +import io.github.pavansharma36.workflow.api.WorkflowManager; +import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; +import io.github.pavansharma36.workflow.api.adapter.base.BasePersistenceAdapter; +import io.github.pavansharma36.workflow.api.bean.id.ManagerId; +import io.github.pavansharma36.workflow.api.bean.id.RunId; +import io.github.pavansharma36.workflow.api.bean.id.TaskId; +import io.github.pavansharma36.workflow.api.executor.ExecutableTask; +import io.github.pavansharma36.workflow.api.executor.ExecutionResult; +import io.github.pavansharma36.workflow.api.model.ManagerInfo; +import io.github.pavansharma36.workflow.api.model.RunInfo; +import io.github.pavansharma36.workflow.api.model.TaskInfo; +import io.github.pavansharma36.workflow.api.serde.Serde; +import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; +import io.github.pavansharma36.workflow.mongodb.helper.JacksonCodecProvider; +import io.github.pavansharma36.workflow.mongodb.helper.MongoDBQueryHelper; +import java.time.Duration; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import org.bson.BsonDocument; +import org.bson.BsonDocumentWriter; +import org.bson.BsonString; +import org.bson.Document; +import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.ValueCodecProvider; +import org.bson.codecs.configuration.CodecRegistries; +import org.bson.codecs.configuration.CodecRegistry; +import org.bson.conversions.Bson; + +public class MongoDBPersistenceAdapter extends BasePersistenceAdapter implements PersistenceAdapter { + + private static final CodecRegistry CODEC = new JacksonCodecProvider(CodecRegistries.fromProviders( + Arrays.asList(new ValueCodecProvider(), new BsonValueCodecProvider(), new DBObjectCodecProvider()))); + + private final String database; + private final MongoClient mongoClient; + + protected MongoDBPersistenceAdapter(String namespace, PollDelayGenerator pollDelayGenerator, + Serde serde, String database, MongoClient mongoClient) { + super(namespace, pollDelayGenerator, serde); + this.database = database; + this.mongoClient = mongoClient; + } + + @Override + public void start(WorkflowManager workflowManager) { + + } + + @Override + public void stop() { + + } + + @Override + public boolean createOrUpdateManagerInfo(ManagerInfo managerInfo) { + Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, + new BsonString(managerInfo.getManagerId().getId())); + BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); + CODEC.get(ManagerInfo.class).encode(writer, managerInfo, null); + Bson update = writer.getDocument(); + UpdateOptions options = new UpdateOptions().upsert(true); + return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)) + .updateOne(filter, update, options).wasAcknowledged(); + } + + @Override + public List getAllManagerInfos() { + try (MongoCursor infos = collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)) + .find(ManagerInfo.class).cursor()) { + List docs = new LinkedList<>(); + while (infos.hasNext()) { + docs.add(infos.next()); + } + return docs; + } + } + + @Override + public boolean removeManagerInfo(ManagerId id) { + Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, + new BsonString(id.getId())); + return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)).deleteOne(filter) + .wasAcknowledged(); + } + + @Override + public boolean updateQueuedTime(RunId runId, TaskId taskId) { + Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), + Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); + Bson update = Updates.set(MongoDBQueryHelper.TaskInfo.QUEUED_TIME_KEY, System.currentTimeMillis()); + return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update) + .wasAcknowledged(); + } + + @Override + public boolean updateStartTime(RunId runId) { + return false; + } + + @Override + public boolean updateStartTime(RunId runId, TaskId taskId, ManagerId processedBy) { + return false; + } + + @Override + public boolean completeTask(ExecutableTask executableTask, ExecutionResult executionResult) { + return false; + } + + @Override + public Optional getTaskInfo(RunId runId, TaskId taskId) { + return Optional.empty(); + } + + @Override + public Optional getRunInfo(RunId runId) { + return Optional.empty(); + } + + @Override + public void createRunInfo(RunInfo runInfo) { + + } + + @Override + public boolean updateRunInfoEpoch(RunId runId) { + return false; + } + + @Override + public void createTaskInfos(RunId runId, List taskInfos) { + + } + + @Override + public boolean cleanup(RunId runId) { + return false; + } + + @Override + public List getStuckRunInfos(Duration maxDuration) { + return null; + } + + @Override + public PollDelayGenerator heartbeatDelayGenerator() { + return null; + } + + private MongoCollection collection(String collection) { + return mongoClient.getDatabase(database).getCollection(collection).withCodecRegistry( + new JacksonCodecProvider(mongoClient.getDatabase(database).getCodecRegistry())); + } + +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java new file mode 100644 index 0000000..cdf0394 --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java @@ -0,0 +1,51 @@ +package io.github.pavansharma36.workflow.mongodb.helper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import org.bson.BsonReader; +import org.bson.BsonWriter; +import org.bson.RawBsonDocument; +import org.bson.codecs.Codec; +import org.bson.codecs.DecoderContext; +import org.bson.codecs.EncoderContext; +import org.bson.codecs.configuration.CodecRegistry; + +public class JacksonCodec implements Codec { + + private final ObjectMapper bsonObjectMapper; + private final Codec rawBsonDocumentCodec; + private final Class type; + + public JacksonCodec(ObjectMapper bsonObjectMapper, + CodecRegistry codecRegistry, + Class type) { + this.bsonObjectMapper = bsonObjectMapper; + this.rawBsonDocumentCodec = codecRegistry.get(RawBsonDocument.class); + this.type = type; + } + @Override + public T decode(BsonReader bsonReader, DecoderContext decoderContext) { + try { + RawBsonDocument document = rawBsonDocumentCodec.decode(bsonReader, decoderContext); + return bsonObjectMapper.readValue(document.getByteBuffer().array(), type); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void encode(BsonWriter bsonWriter, T t, EncoderContext encoderContext) { + try { + byte[] data = bsonObjectMapper.writeValueAsBytes(t); + rawBsonDocumentCodec.encode(bsonWriter, new RawBsonDocument(data), encoderContext); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public Class getEncoderClass() { + return type; + } +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java new file mode 100644 index 0000000..95f5d1e --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java @@ -0,0 +1,29 @@ +package io.github.pavansharma36.workflow.mongodb.helper; + +import java.util.HashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.bson.codecs.Codec; +import org.bson.codecs.configuration.CodecRegistry; + +@RequiredArgsConstructor +public class JacksonCodecProvider implements CodecRegistry { + + private static final Map, Codec> CODECS = new HashMap<>(); + + private final CodecRegistry delegation; + + @Override + public Codec get(Class aClass, CodecRegistry codecRegistry) { + if (CODECS.containsKey(aClass)) { + return (Codec) CODECS.get(aClass); + } else { + return codecRegistry.get(aClass); + } + } + + @Override + public Codec get(Class aClass) { + return get(aClass, delegation); + } +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java new file mode 100644 index 0000000..89c6220 --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java @@ -0,0 +1,34 @@ +package io.github.pavansharma36.workflow.mongodb.helper; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class MongoDBQueryHelper { + + @NoArgsConstructor(access = AccessLevel.PRIVATE) + public static class ManagerInfo { + public static final String MANAGER_ID_KEY = "managerId"; + + public static String collectionName(String namespace) { + return namespace + "_manager_info"; + } + } + + @NoArgsConstructor(access = AccessLevel.PRIVATE) + public static class TaskInfo { + + public static final String QUEUED_TIME_KEY = "queuedTimeEpoch"; + public static final String RUN_ID_KEY = "runId"; + public static final String TASK_ID_KEY = "taskId"; + + public static String collectionName(String namespace) { + return namespace + "_task_info"; + } + } + + + +} diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java new file mode 100644 index 0000000..98d2799 --- /dev/null +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java @@ -0,0 +1,38 @@ +package io.github.pavansharma36; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java new file mode 100644 index 0000000..846ce37 --- /dev/null +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java @@ -0,0 +1,32 @@ +package io.github.pavansharma36.workflow.mongodb; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import io.github.pavansharma36.workflow.api.NormalTest; +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder; +import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class MongoDBNormalTest extends NormalTest { + +// @Rule +// public GenericContainer redis = new GenericContainer(DockerImageName.parse("mongo")) +// .withExposedPorts(27017); + + + @Override + protected WorkflowAdapter adapter() { + final String namespace = "test"; + MongoClient client = MongoClients.create(); + return new WorkflowAdapterBuilder<>() + .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .build(); + } +} From 0e5325149eee539c04c8a6b6507379fbe0ae94e4 Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Sun, 26 Feb 2023 00:59:25 +0530 Subject: [PATCH 2/6] WIP some cleanup --- .../api/adapter/base/BaseAdapter.java | 7 +- .../adapter/base/BasePersistenceAdapter.java | 9 +- .../api/adapter/base/BaseQueueAdapter.java | 24 ----- .../api/adapter/base/BaseScheduleAdapter.java | 17 ---- .../BasePersistenceAdapterBuilder.java | 13 +++ .../builder/BaseScheduleAdapterBuilder.java | 8 +- .../builder/WorkflowAdapterBuilder.java | 43 ++++++--- .../workflow/api/bean/task/TaskType.java | 2 + .../workflow/api/serde/IdSerializer.java | 7 +- .../java/io/github/pavansharma36/App.java | 13 --- .../adapter/InmemoryPersistenceAdapter.java | 23 ++--- .../adapter/InmemoryQueueAdapter.java | 73 +++++++++++++++ .../adapter/InmemorySchedulerAdapter.java | 50 +++++++++++ .../InmemoryPersistenceAdapterBuilder.java | 13 +++ .../builder/InmemoryQueueAdapterBuilder.java | 13 +++ .../InmemoryScheduleAdapterBuilder.java | 12 +++ .../InmemoryWorkflowAdapterBuilder.java | 16 ++++ .../java/io/github/pavansharma36/AppTest.java | 38 -------- .../JedisPersistenceAdapterBuilder.java | 9 +- .../builder/JedisWorkflowAdapterBuilder.java | 3 +- workflow-mongodb/pom.xml | 6 ++ .../adapter/MongoDBPersistenceAdapter.java | 88 ++++++++++++++----- .../MongoDBPersistanceAdapterBuilder.java | 25 ++++++ .../workflow/mongodb/helper/JacksonCodec.java | 51 ----------- .../mongodb/helper/JacksonCodecProvider.java | 29 ------ .../mongodb/helper/MongoDBQueryHelper.java | 16 ++++ .../workflow/mongodb/helper/SerdeCodec.java | 45 ++++++++++ .../mongodb/helper/SerdeCodecProvider.java | 35 ++++++++ .../workflow/mongodb/MongoDBNormalTest.java | 11 ++- 29 files changed, 458 insertions(+), 241 deletions(-) delete mode 100644 workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseQueueAdapter.java delete mode 100644 workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseScheduleAdapter.java create mode 100644 workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java delete mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/App.java create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java create mode 100644 workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java delete mode 100644 workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java delete mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java delete mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java index 307a379..53490fc 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java @@ -13,7 +13,12 @@ public abstract class BaseAdapter implements Adapter { protected final String namespace; + + /** + * delay generator can have use according to type of adapter. + * eg. poll delay for queue and scheduler. + * heartbeat for persistence adapter. + */ protected final PollDelayGenerator pollDelayGenerator; - protected final Serde serde; } diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java index 30a5802..24babf0 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java @@ -18,9 +18,8 @@ public abstract class BasePersistenceAdapter extends BaseAdapter implements PersistenceAdapter { protected BasePersistenceAdapter(String namespace, - PollDelayGenerator pollDelayGenerator, - Serde serde) { - super(namespace, pollDelayGenerator, serde); + PollDelayGenerator heartbeatDelayGenerator) { + super(namespace, heartbeatDelayGenerator); } @Override @@ -37,4 +36,8 @@ public void maintenance(WorkflowAdapter adapter) { }); } + @Override + public PollDelayGenerator heartbeatDelayGenerator() { + return pollDelayGenerator; + } } diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseQueueAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseQueueAdapter.java deleted file mode 100644 index 1f4dcb5..0000000 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseQueueAdapter.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.github.pavansharma36.workflow.api.adapter.base; - -import io.github.pavansharma36.workflow.api.adapter.QueueAdapter; -import io.github.pavansharma36.workflow.api.serde.Deserializer; -import io.github.pavansharma36.workflow.api.serde.Serde; -import io.github.pavansharma36.workflow.api.serde.Serializer; -import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; - -/** - * Base class for all {@link QueueAdapter}. - */ -public abstract class BaseQueueAdapter extends BaseAdapter implements QueueAdapter { - - protected final Serializer serializer; - protected final Deserializer deserializer; - - protected BaseQueueAdapter(String namespace, - PollDelayGenerator pollDelayGenerator, - Serde serde) { - super(namespace, pollDelayGenerator, serde); - this.serializer = serde.serializer(); - this.deserializer = serde.deserializer(); - } -} diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseScheduleAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseScheduleAdapter.java deleted file mode 100644 index c58531d..0000000 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseScheduleAdapter.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.github.pavansharma36.workflow.api.adapter.base; - -import io.github.pavansharma36.workflow.api.adapter.ScheduleAdapter; -import io.github.pavansharma36.workflow.api.serde.Serde; -import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; - -/** - * Base class for all {@link ScheduleAdapter}. - */ -public abstract class BaseScheduleAdapter extends BaseAdapter implements ScheduleAdapter { - - protected BaseScheduleAdapter(String namespace, - PollDelayGenerator pollDelayGenerator, - Serde serde) { - super(namespace, pollDelayGenerator, serde); - } -} diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java new file mode 100644 index 0000000..cf98ef7 --- /dev/null +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java @@ -0,0 +1,13 @@ +package io.github.pavansharma36.workflow.api.adapter.builder; + +import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; +import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; + +public abstract class BasePersistenceAdapterBuilder> + extends BaseAdapterBuilder { + + public T withHeartbeatDelayGenerator(PollDelayGenerator heartbeatDelayGenerator) { + this.pollDelayGenerator = heartbeatDelayGenerator; + return (T) this; + } +} diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java index f9ded81..c7d8bf8 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java @@ -18,15 +18,15 @@ public abstract class BaseScheduleAdapterBuilder withMaintenanceDelayGenerator( + public S withMaintenanceDelayGenerator( @NonNull final PollDelayGenerator maintenanceDelayGenerator) { this.maintenanceDelayGenerator = maintenanceDelayGenerator; - return this; + return (S) this; } - public BaseScheduleAdapterBuilder maxRunDuration(@NonNull Duration maxRunDuration) { + public S maxRunDuration(@NonNull Duration maxRunDuration) { this.maxRunDuration = maxRunDuration; - return this; + return (S) this; } } diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java index c3c51a6..3ed01d9 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java @@ -10,45 +10,66 @@ /** * Base class for all adapter builder. */ -public class WorkflowAdapterBuilder, - S1 extends BaseScheduleAdapterBuilder, - S2 extends BaseAdapterBuilder, - S3 extends BaseAdapterBuilder> { +public class WorkflowAdapterBuilder { - protected S1 scheduleAdapterBuilder; - protected S2 persistenceAdapterBuilder; - protected S3 queueAdapterBuilder; + protected BaseScheduleAdapterBuilder scheduleAdapterBuilder; + protected BasePersistenceAdapterBuilder persistenceAdapterBuilder; + protected BaseAdapterBuilder queueAdapterBuilder; - public WorkflowAdapterBuilder withQueuePollDelayGenerator( + public WorkflowAdapterBuilder withQueuePollDelayGenerator( final PollDelayGenerator pollDelayGenerator) { this.queueAdapterBuilder.withPollDelayGenerator(pollDelayGenerator); return this; } - public WorkflowAdapterBuilder withSchedulePollDelayGenerator( + public WorkflowAdapterBuilder withSchedulePollDelayGenerator( final PollDelayGenerator pollDelayGenerator) { this.scheduleAdapterBuilder.withPollDelayGenerator(pollDelayGenerator); return this; } - public WorkflowAdapterBuilder withMaintenancePollDelayGenerator( + public WorkflowAdapterBuilder withHeartbeatDelayGenerator(PollDelayGenerator heartbeatDelayGenerator) { + this.persistenceAdapterBuilder.withHeartbeatDelayGenerator(heartbeatDelayGenerator); + return this; + } + + public WorkflowAdapterBuilder withMaintenancePollDelayGenerator( final PollDelayGenerator pollDelayGenerator ) { this.scheduleAdapterBuilder.withMaintenanceDelayGenerator(pollDelayGenerator); return this; } - public WorkflowAdapterBuilder withMaxRunDuration(Duration duration) { + public WorkflowAdapterBuilder withMaxRunDuration(Duration duration) { this.scheduleAdapterBuilder.maxRunDuration(duration); return this; } + public WorkflowAdapterBuilder withScheduleAdapterBuilder(BaseScheduleAdapterBuilder scheduleAdapterBuilder) { + this.scheduleAdapterBuilder = scheduleAdapterBuilder; + return this; + } + + public WorkflowAdapterBuilder withPersistenceAdapterBuilder(BasePersistenceAdapterBuilder persistenceAdapterBuilder) { + this.persistenceAdapterBuilder = persistenceAdapterBuilder; + return this; + } + + public WorkflowAdapterBuilder withQueueAdapterBuilder(BaseAdapterBuilder queueAdapterBuilder) { + this.queueAdapterBuilder = queueAdapterBuilder; + return this; + } + /** * build {@link WorkflowAdapter}. * * @return - instance of workflowadaper. */ public WorkflowAdapter build() { + scheduleAdapterBuilder.validate(); + queueAdapterBuilder.validate(); + persistenceAdapterBuilder.validate(); + return new WorkflowAdapterImpl(scheduleAdapterBuilder.build(), queueAdapterBuilder.build(), persistenceAdapterBuilder.build()); diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java index 49f3062..46d192e 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java @@ -1,6 +1,7 @@ package io.github.pavansharma36.workflow.api.bean.task; import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; @@ -13,6 +14,7 @@ @ToString @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode public class TaskType { private int version = 1; private @NonNull String type; diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java index e135df6..b647d8f 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java @@ -1,16 +1,13 @@ package io.github.pavansharma36.workflow.api.serde; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import io.github.pavansharma36.workflow.api.bean.id.Id; import java.io.IOException; -public class IdSerializer extends StdSerializer { - protected IdSerializer(Class t) { - super(t); - } - +public class IdSerializer extends JsonSerializer { @Override public void serialize(Id id, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/App.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/App.java deleted file mode 100644 index 9ecae98..0000000 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/App.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.github.pavansharma36; - -/** - * Hello world! - * - */ -public class App -{ - public static void main( String[] args ) - { - System.out.println( "Hello World!" ); - } -} diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java index 0954274..5ff27ff 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java @@ -27,9 +27,8 @@ public class InmemoryPersistenceAdapter extends BasePersistenceAdapter { private Map runInfos = new HashMap<>(); private Map> taskInfos = new HashMap<>(); - protected InmemoryPersistenceAdapter(String namespace, PollDelayGenerator pollDelayGenerator, - Serde serde) { - super(namespace, pollDelayGenerator, serde); + public InmemoryPersistenceAdapter(String namespace, PollDelayGenerator heartbeatDelayGenerator) { + super(namespace, heartbeatDelayGenerator); } @Override @@ -94,7 +93,15 @@ public boolean updateStartTime(RunId runId, TaskId taskId, ManagerId processedBy @Override public boolean completeTask(ExecutableTask executableTask, ExecutionResult executionResult) { - return false; + taskInfos.computeIfPresent(executableTask.getRunId(), (i, tasks) -> { + tasks.computeIfPresent(executableTask.getTaskId(), (j, task) -> { + task.setCompletionTimeEpoch(System.currentTimeMillis()); + task.setResult(executionResult); + return task; + }); + return tasks; + }); + return true; } @Override @@ -135,11 +142,7 @@ public boolean cleanup(RunId runId) { @Override public List getStuckRunInfos(Duration maxDuration) { - return null; - } - - @Override - public PollDelayGenerator heartbeatDelayGenerator() { - return null; + // in memory dont stuck. + return Collections.emptyList(); } } diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java new file mode 100644 index 0000000..ea79e55 --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java @@ -0,0 +1,73 @@ +package io.github.pavansharma36.workflow.inmemory.adapter; + +import io.github.pavansharma36.workflow.api.WorkflowManager; +import io.github.pavansharma36.workflow.api.adapter.QueueAdapter; +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import io.github.pavansharma36.workflow.api.bean.id.RunId; +import io.github.pavansharma36.workflow.api.bean.task.TaskType; +import io.github.pavansharma36.workflow.api.executor.ExecutableTask; +import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class InmemoryQueueAdapter implements QueueAdapter { + + private final LinkedList updatedRunQueue = new LinkedList<>(); + private final Map> queue = new HashMap<>(); + + private final PollDelayGenerator pollDelayGenerator; + + @Override + public void start(WorkflowManager workflowManager) { + + } + + @Override + public void stop() { + + } + + @Override + public void maintenance(WorkflowAdapter workflowAdapter) { + + } + + @Override + public PollDelayGenerator pollDelayGenerator() { + return pollDelayGenerator; + } + + @Override + public void pushTask(ExecutableTask task) { + queue.computeIfAbsent(task.getTaskType(), i -> new LinkedList<>()).offer(task); + } + + @Override + public Optional pollTask(TaskType taskType) { + return Optional.ofNullable(queue.computeIfAbsent(taskType, i -> new LinkedList<>()).poll()); + } + + @Override + public boolean commitTaskProcessed(ExecutableTask task) { + return true; + } + + @Override + public void pushUpdatedRun(RunId runId) { + updatedRunQueue.offer(runId); + } + + @Override + public Optional pollUpdatedRun() { + return Optional.ofNullable(updatedRunQueue.poll()); + } + + @Override + public boolean commitUpdatedRunProcess(RunId runId) { + return true; + } +} diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java new file mode 100644 index 0000000..83e3627 --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java @@ -0,0 +1,50 @@ +package io.github.pavansharma36.workflow.inmemory.adapter; + +import io.github.pavansharma36.workflow.api.WorkflowManager; +import io.github.pavansharma36.workflow.api.adapter.ScheduleAdapter; +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; +import java.time.Duration; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class InmemorySchedulerAdapter implements ScheduleAdapter { + + private final PollDelayGenerator pollDelayGenerator; + private final PollDelayGenerator maintenanceDelayGenerator; + + @Override + public void start(WorkflowManager workflowManager) { + + } + + @Override + public void stop() { + + } + + @Override + public void maintenance(WorkflowAdapter workflowAdapter) { + + } + + @Override + public PollDelayGenerator pollDelayGenerator() { + return pollDelayGenerator; + } + + @Override + public PollDelayGenerator maintenanceDelayGenerator() { + return maintenanceDelayGenerator; + } + + @Override + public Duration maxRunDuration() { + return Duration.ofDays(7L); + } + + @Override + public boolean isScheduler() { + return true; + } +} diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java new file mode 100644 index 0000000..e13ed9b --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java @@ -0,0 +1,13 @@ +package io.github.pavansharma36.workflow.inmemory.builder; + +import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; +import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder; +import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryPersistenceAdapter; + +public class InmemoryPersistenceAdapterBuilder + extends BasePersistenceAdapterBuilder { + @Override + public PersistenceAdapter build() { + return new InmemoryPersistenceAdapter(namespace, pollDelayGenerator); + } +} diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java new file mode 100644 index 0000000..4c79082 --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java @@ -0,0 +1,13 @@ +package io.github.pavansharma36.workflow.inmemory.builder; + +import io.github.pavansharma36.workflow.api.adapter.QueueAdapter; +import io.github.pavansharma36.workflow.api.adapter.builder.BaseAdapterBuilder; +import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryQueueAdapter; + +public class InmemoryQueueAdapterBuilder + extends BaseAdapterBuilder { + @Override + public QueueAdapter build() { + return new InmemoryQueueAdapter(pollDelayGenerator); + } +} diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java new file mode 100644 index 0000000..5c5f89c --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java @@ -0,0 +1,12 @@ +package io.github.pavansharma36.workflow.inmemory.builder; + +import io.github.pavansharma36.workflow.api.adapter.ScheduleAdapter; +import io.github.pavansharma36.workflow.api.adapter.builder.BaseScheduleAdapterBuilder; +import io.github.pavansharma36.workflow.inmemory.adapter.InmemorySchedulerAdapter; + +public class InmemoryScheduleAdapterBuilder extends BaseScheduleAdapterBuilder { + @Override + public ScheduleAdapter build() { + return new InmemorySchedulerAdapter(pollDelayGenerator, maintenanceDelayGenerator); + } +} diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java new file mode 100644 index 0000000..59b9303 --- /dev/null +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java @@ -0,0 +1,16 @@ +package io.github.pavansharma36.workflow.inmemory.builder; + +import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder; + +public class InmemoryWorkflowAdapterBuilder + extends WorkflowAdapterBuilder { + + + public static InmemoryWorkflowAdapterBuilder builder() { + InmemoryWorkflowAdapterBuilder builder = new InmemoryWorkflowAdapterBuilder(); + builder.scheduleAdapterBuilder = new InmemoryScheduleAdapterBuilder(); + builder.queueAdapterBuilder = new InmemoryQueueAdapterBuilder(); + builder.persistenceAdapterBuilder = new InmemoryPersistenceAdapterBuilder(); + return builder; + } +} diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java deleted file mode 100644 index 98d2799..0000000 --- a/workflow-inmemory/src/test/java/io/github/pavansharma36/AppTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.github.pavansharma36; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -/** - * Unit test for simple App. - */ -public class AppTest - extends TestCase -{ - /** - * Create the test case - * - * @param testName name of the test case - */ - public AppTest( String testName ) - { - super( testName ); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() - { - return new TestSuite( AppTest.class ); - } - - /** - * Rigourous Test :-) - */ - public void testApp() - { - assertTrue( true ); - } -} diff --git a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java index 94688f2..262a282 100644 --- a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java +++ b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java @@ -2,6 +2,7 @@ import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; import io.github.pavansharma36.workflow.api.adapter.builder.BaseAdapterBuilder; +import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder; import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; import io.github.pavansharma36.workflow.api.util.WorkflowException; @@ -18,7 +19,7 @@ */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class JedisPersistenceAdapterBuilder - extends BaseAdapterBuilder { + extends BasePersistenceAdapterBuilder { private JedisPool jedis; @@ -31,12 +32,6 @@ public JedisPersistenceAdapterBuilder withJedisPool(final JedisPool pool) { return this; } - public JedisPersistenceAdapterBuilder heartbeatDelayGenerator( - @NonNull final PollDelayGenerator heartbeatDelayGenerator) { - this.pollDelayGenerator = heartbeatDelayGenerator; - return this; - } - /** * Build {@link io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter} * with given details. diff --git a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java index e559473..be3f8e7 100644 --- a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java +++ b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java @@ -12,8 +12,7 @@ * Builder class to build redis based {@link WorkflowAdapter}. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public class JedisWorkflowAdapterBuilder extends WorkflowAdapterBuilder { +public class JedisWorkflowAdapterBuilder extends WorkflowAdapterBuilder { public static JedisWorkflowAdapterBuilder builder(final JedisPool jedisPool, final String namespace) { diff --git a/workflow-mongodb/pom.xml b/workflow-mongodb/pom.xml index 9ae7baa..bd80575 100644 --- a/workflow-mongodb/pom.xml +++ b/workflow-mongodb/pom.xml @@ -30,5 +30,11 @@ test-jar test + + + io.github.pavansharma36 + workflow-inmemory + 0.0.1-SNAPSHOT + diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java index de9122b..d4e284e 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java @@ -20,18 +20,21 @@ import io.github.pavansharma36.workflow.api.model.TaskInfo; import io.github.pavansharma36.workflow.api.serde.Serde; import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; -import io.github.pavansharma36.workflow.mongodb.helper.JacksonCodecProvider; +import io.github.pavansharma36.workflow.mongodb.helper.SerdeCodecProvider; import io.github.pavansharma36.workflow.mongodb.helper.MongoDBQueryHelper; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import javax.print.Doc; import org.bson.BsonDocument; import org.bson.BsonDocumentWriter; import org.bson.BsonString; import org.bson.Document; import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.EncoderContext; import org.bson.codecs.ValueCodecProvider; import org.bson.codecs.configuration.CodecRegistries; import org.bson.codecs.configuration.CodecRegistry; @@ -39,17 +42,19 @@ public class MongoDBPersistenceAdapter extends BasePersistenceAdapter implements PersistenceAdapter { - private static final CodecRegistry CODEC = new JacksonCodecProvider(CodecRegistries.fromProviders( - Arrays.asList(new ValueCodecProvider(), new BsonValueCodecProvider(), new DBObjectCodecProvider()))); - private final String database; private final MongoClient mongoClient; - protected MongoDBPersistenceAdapter(String namespace, PollDelayGenerator pollDelayGenerator, - Serde serde, String database, MongoClient mongoClient) { - super(namespace, pollDelayGenerator, serde); + private final CodecRegistry codec; + + public MongoDBPersistenceAdapter(String namespace, PollDelayGenerator heartbeatDelayGenerator, + String database, MongoClient mongoClient, Serde serde) { + super(namespace, heartbeatDelayGenerator); this.database = database; this.mongoClient = mongoClient; + this.codec = new SerdeCodecProvider(CodecRegistries.fromProviders( + Arrays.asList(new ValueCodecProvider(), new BsonValueCodecProvider(), new DBObjectCodecProvider())), + serde); } @Override @@ -67,7 +72,7 @@ public boolean createOrUpdateManagerInfo(ManagerInfo managerInfo) { Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, new BsonString(managerInfo.getManagerId().getId())); BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); - CODEC.get(ManagerInfo.class).encode(writer, managerInfo, null); + codec.get(ManagerInfo.class).encode(writer, managerInfo, EncoderContext.builder().build()); Bson update = writer.getDocument(); UpdateOptions options = new UpdateOptions().upsert(true); return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)) @@ -105,62 +110,97 @@ public boolean updateQueuedTime(RunId runId, TaskId taskId) { @Override public boolean updateStartTime(RunId runId) { - return false; + Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + Bson update = Updates.set(MongoDBQueryHelper.RunInfo.START_TIME_KEY, System.currentTimeMillis()); + return collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).updateOne(filter, update) + .wasAcknowledged(); } @Override public boolean updateStartTime(RunId runId, TaskId taskId, ManagerId processedBy) { - return false; + Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), + Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); + Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.TaskInfo.START_TIME_KEY, System.currentTimeMillis()), + Updates.set(MongoDBQueryHelper.TaskInfo.PROCESSED_BY_KEY, processedBy.getId())); + return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update) + .wasAcknowledged(); } @Override public boolean completeTask(ExecutableTask executableTask, ExecutionResult executionResult) { - return false; + Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, executableTask.getRunId().getId()), + Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, executableTask.getTaskId().getId())); + Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.TaskInfo.COMPLETION_TIME_KEY, System.currentTimeMillis()), + Updates.set(MongoDBQueryHelper.TaskInfo.RESULT_KEY, executionResult)); + return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update) + .wasAcknowledged(); } @Override public Optional getTaskInfo(RunId runId, TaskId taskId) { - return Optional.empty(); + Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), + Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); + TaskInfo taskInfo = collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)) + .find(filter, TaskInfo.class).first(); + return Optional.ofNullable(taskInfo); } @Override public Optional getRunInfo(RunId runId) { - return Optional.empty(); + Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + RunInfo runInfo = collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)) + .find(filter, RunInfo.class).first(); + return Optional.ofNullable(runInfo); } @Override public void createRunInfo(RunInfo runInfo) { - + BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); + codec.get(RunInfo.class).encode(writer, runInfo, null); + BsonDocument doc = writer.getDocument(); + collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)) + .insertOne(Document.parse(doc.toJson())); } @Override public boolean updateRunInfoEpoch(RunId runId) { - return false; + Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + Bson update = Updates.set(MongoDBQueryHelper.RunInfo.LAST_UPDATE_KEY, System.currentTimeMillis()); + return collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).updateOne(filter, update) + .wasAcknowledged(); } @Override public void createTaskInfos(RunId runId, List taskInfos) { - + List docs = new ArrayList<>(taskInfos.size()); + for (TaskInfo taskInfo : taskInfos) { + taskInfo.setRunId(runId); + BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); + codec.get(TaskInfo.class).encode(writer, taskInfo, null); + BsonDocument doc = writer.getDocument(); + docs.add(Document.parse(doc.toJson())); + } + collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)) + .insertMany(docs); } @Override public boolean cleanup(RunId runId) { - return false; - } + Bson taskFilter = Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()); + collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).deleteMany(taskFilter); - @Override - public List getStuckRunInfos(Duration maxDuration) { - return null; + Bson runFilter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).deleteMany(runFilter); + return true; } @Override - public PollDelayGenerator heartbeatDelayGenerator() { + public List getStuckRunInfos(Duration maxDuration) { return null; } private MongoCollection collection(String collection) { - return mongoClient.getDatabase(database).getCollection(collection).withCodecRegistry( - new JacksonCodecProvider(mongoClient.getDatabase(database).getCodecRegistry())); + return mongoClient.getDatabase(database).getCollection(collection).withCodecRegistry(codec); } } diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java new file mode 100644 index 0000000..c6c7f9b --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java @@ -0,0 +1,25 @@ +package io.github.pavansharma36.workflow.mongodb.adapter.builder; + +import com.mongodb.client.MongoClient; +import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; +import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder; +import io.github.pavansharma36.workflow.mongodb.adapter.MongoDBPersistenceAdapter; +import lombok.AccessLevel; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class MongoDBPersistanceAdapterBuilder extends BasePersistenceAdapterBuilder { + + private final String database; + private final MongoClient mongoClient; + + public static MongoDBPersistanceAdapterBuilder builder(@NonNull String database, @NonNull MongoClient mongoClient) { + return new MongoDBPersistanceAdapterBuilder(database, mongoClient); + } + + @Override + public PersistenceAdapter build() { + return new MongoDBPersistenceAdapter(namespace, pollDelayGenerator, database, mongoClient, serde); + } +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java deleted file mode 100644 index cdf0394..0000000 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -package io.github.pavansharma36.workflow.mongodb.helper; - -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.UncheckedIOException; -import org.bson.BsonReader; -import org.bson.BsonWriter; -import org.bson.RawBsonDocument; -import org.bson.codecs.Codec; -import org.bson.codecs.DecoderContext; -import org.bson.codecs.EncoderContext; -import org.bson.codecs.configuration.CodecRegistry; - -public class JacksonCodec implements Codec { - - private final ObjectMapper bsonObjectMapper; - private final Codec rawBsonDocumentCodec; - private final Class type; - - public JacksonCodec(ObjectMapper bsonObjectMapper, - CodecRegistry codecRegistry, - Class type) { - this.bsonObjectMapper = bsonObjectMapper; - this.rawBsonDocumentCodec = codecRegistry.get(RawBsonDocument.class); - this.type = type; - } - @Override - public T decode(BsonReader bsonReader, DecoderContext decoderContext) { - try { - RawBsonDocument document = rawBsonDocumentCodec.decode(bsonReader, decoderContext); - return bsonObjectMapper.readValue(document.getByteBuffer().array(), type); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public void encode(BsonWriter bsonWriter, T t, EncoderContext encoderContext) { - try { - byte[] data = bsonObjectMapper.writeValueAsBytes(t); - rawBsonDocumentCodec.encode(bsonWriter, new RawBsonDocument(data), encoderContext); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public Class getEncoderClass() { - return type; - } -} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java deleted file mode 100644 index 95f5d1e..0000000 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/JacksonCodecProvider.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.github.pavansharma36.workflow.mongodb.helper; - -import java.util.HashMap; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.bson.codecs.Codec; -import org.bson.codecs.configuration.CodecRegistry; - -@RequiredArgsConstructor -public class JacksonCodecProvider implements CodecRegistry { - - private static final Map, Codec> CODECS = new HashMap<>(); - - private final CodecRegistry delegation; - - @Override - public Codec get(Class aClass, CodecRegistry codecRegistry) { - if (CODECS.containsKey(aClass)) { - return (Codec) CODECS.get(aClass); - } else { - return codecRegistry.get(aClass); - } - } - - @Override - public Codec get(Class aClass) { - return get(aClass, delegation); - } -} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java index 89c6220..167ef5f 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java @@ -23,12 +23,28 @@ public static class TaskInfo { public static final String QUEUED_TIME_KEY = "queuedTimeEpoch"; public static final String RUN_ID_KEY = "runId"; public static final String TASK_ID_KEY = "taskId"; + public static final String START_TIME_KEY = "startTimeEpoch"; + public static final String PROCESSED_BY_KEY = "processedBy"; + public static final String COMPLETION_TIME_KEY = "completionTimeEpoch"; + public static final String RESULT_KEY = "result"; public static String collectionName(String namespace) { return namespace + "_task_info"; } } + public static class RunInfo { + + public static final String RUN_ID_KEY = "runId"; + public static final String QUEUED_TIME_KEY = "queuedTime"; + public static final String START_TIME_KEY = "startTimeEpoch"; + public static final String LAST_UPDATE_KEY = "lastUpdateEpoch"; + + public static String collectionName(String namespace) { + return namespace + "_run_info"; + } + } + } diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java new file mode 100644 index 0000000..0083b6b --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java @@ -0,0 +1,45 @@ +package io.github.pavansharma36.workflow.mongodb.helper; + +import io.github.pavansharma36.workflow.api.serde.Deserializer; +import io.github.pavansharma36.workflow.api.serde.Serde; +import io.github.pavansharma36.workflow.api.serde.Serializer; +import org.bson.BsonReader; +import org.bson.BsonWriter; +import org.bson.RawBsonDocument; +import org.bson.codecs.Codec; +import org.bson.codecs.DecoderContext; +import org.bson.codecs.EncoderContext; +import org.bson.codecs.configuration.CodecRegistry; + +public class SerdeCodec implements Codec { + + private final Serializer serializer; + private final Deserializer deserializer; + private final Codec rawBsonDocumentCodec; + private final Class type; + + public SerdeCodec(Serde serde, + CodecRegistry codecRegistry, + Class type) { + this.serializer = serde.serializer(); + this.deserializer = serde.deserializer(); + this.rawBsonDocumentCodec = codecRegistry.get(RawBsonDocument.class); + this.type = type; + } + @Override + public T decode(BsonReader bsonReader, DecoderContext decoderContext) { + RawBsonDocument document = rawBsonDocumentCodec.decode(bsonReader, decoderContext); + return deserializer.deserialize(document.getByteBuffer().array(), type); + } + + @Override + public void encode(BsonWriter bsonWriter, T t, EncoderContext encoderContext) { + byte[] data = serializer.serialize(t); + rawBsonDocumentCodec.encode(bsonWriter, new RawBsonDocument(data), encoderContext); + } + + @Override + public Class getEncoderClass() { + return type; + } +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java new file mode 100644 index 0000000..d7819bb --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java @@ -0,0 +1,35 @@ +package io.github.pavansharma36.workflow.mongodb.helper; + +import io.github.pavansharma36.workflow.api.model.ManagerInfo; +import io.github.pavansharma36.workflow.api.serde.Serde; +import java.util.HashMap; +import java.util.Map; +import org.bson.codecs.Codec; +import org.bson.codecs.configuration.CodecRegistry; + +public class SerdeCodecProvider implements CodecRegistry { + + private final CodecRegistry delegation; + private final Map, Codec> codecs; + + public SerdeCodecProvider(CodecRegistry delegation, Serde serde) { + this.delegation = delegation; + this.codecs = new HashMap<>(); + codecs.put(ManagerInfo.class, new SerdeCodec<>(serde, delegation, + ManagerInfo.class)); + } + + @Override + public Codec get(Class aClass, CodecRegistry codecRegistry) { + if (codecs.containsKey(aClass)) { + return (Codec) codecs.get(aClass); + } else { + return codecRegistry.get(aClass); + } + } + + @Override + public Codec get(Class aClass) { + return get(aClass, delegation); + } +} diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java index 846ce37..787c9a5 100644 --- a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java @@ -6,6 +6,9 @@ import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder; import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; +import io.github.pavansharma36.workflow.inmemory.builder.InmemoryQueueAdapterBuilder; +import io.github.pavansharma36.workflow.inmemory.builder.InmemoryScheduleAdapterBuilder; +import io.github.pavansharma36.workflow.mongodb.adapter.builder.MongoDBPersistanceAdapterBuilder; import java.time.Duration; import org.junit.After; import org.junit.Before; @@ -23,8 +26,12 @@ public class MongoDBNormalTest extends NormalTest { @Override protected WorkflowAdapter adapter() { final String namespace = "test"; - MongoClient client = MongoClients.create(); - return new WorkflowAdapterBuilder<>() + MongoClient client = MongoClients.create("mongodb://mongoadmin:secret@localhost:27017/test"); + return new WorkflowAdapterBuilder() + .withPersistenceAdapterBuilder(MongoDBPersistanceAdapterBuilder.builder("test", client) + .withNamespace(namespace)) + .withScheduleAdapterBuilder(new InmemoryScheduleAdapterBuilder().withNamespace(namespace)) + .withQueueAdapterBuilder(new InmemoryQueueAdapterBuilder().withNamespace(namespace)) .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) .build(); From cf6f8b4f43d5224f2a0800c58ac2a6aad7936842 Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Sun, 26 Feb 2023 01:54:42 +0530 Subject: [PATCH 3/6] wip mongodb persistence impl --- .../workflow/api/bean/task/TaskType.java | 2 + .../api/executor/ExecutionResult.java | 24 +++++-- .../workflow/api/impl/QueueConsumerImpl.java | 4 +- .../workflow/api/schedule/Scheduler.java | 5 +- .../workflow/api/NormalTest.java | 22 +++--- .../workflow/api/TestTaskExecutor.java | 4 +- .../pavansharma36/workflow/examples/App.java | 6 +- .../adapter/MongoDBPersistenceAdapter.java | 42 +++++++----- .../workflow/mongodb/helper/IdCodecs.java | 67 +++++++++++++++++++ .../mongodb/helper/MongoDBQueryHelper.java | 2 + .../workflow/mongodb/MongoDBNormalTest.java | 2 +- 11 files changed, 132 insertions(+), 48 deletions(-) create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java index 46d192e..b002c71 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java @@ -5,12 +5,14 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; +import lombok.Setter; import lombok.ToString; /** * Task type to differentiate queue and executor for submitted tasks. */ @Getter +@Setter @ToString @NoArgsConstructor @AllArgsConstructor diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java index 340ff0f..4b31c1e 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java @@ -4,17 +4,29 @@ import java.util.Map; import lombok.Builder; import lombok.Getter; +import lombok.Setter; import lombok.extern.jackson.Jacksonized; /** * Task executor needs to return result. */ @Getter -@Builder -@Jacksonized +@Setter public class ExecutionResult { - private final TaskExecutionStatus status; - private final String message; - private final Map resultMeta; - private final TaskId decision; + private TaskExecutionStatus status; + private String message; + private Map resultMeta; + private TaskId decision; + + public ExecutionResult() { + + } + + public ExecutionResult(TaskExecutionStatus status, String message, Map resultMeta, + TaskId decision) { + this.status = status; + this.message = message; + this.resultMeta = resultMeta; + this.decision = decision; + } } diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java index de37760..ceab70b 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java @@ -132,9 +132,7 @@ private void run(final WorkflowManager workflowManager) { break; } catch (final Throwable e) { log.error("Unhandled error in task execution {}", e.getMessage(), e); - executionResult = ExecutionResult.builder() - .status(TaskExecutionStatus.FAILED_STOP).message(e.getMessage()) - .build(); + executionResult = new ExecutionResult(TaskExecutionStatus.FAILED_STOP, e.getMessage(), null, null); force = true; } } while (retry++ < taskInfo.getRetryCount()); diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java index fc9e2e8..48d665d 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java @@ -229,7 +229,7 @@ private void updateRun(final WorkflowManager workflowManager, final RunId runId, } else { adapter.persistenceAdapter().completeTask( ExecutableTask.builder().runId(runId).taskId(tid).build(), - ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS).build()); + new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null)); taskInfo.setCompletionTimeEpoch(System.currentTimeMillis()); adapter.queueAdapter().pushUpdatedRun(runId); @@ -268,8 +268,7 @@ private void ignoreAllChildrenTasks(final WorkflowManager workflowManager, final final RunId runId = runInfo.getRunId(); log.info("Ignoring task {}", taskId); - ExecutionResult result = - ExecutionResult.builder().message(message).status(TaskExecutionStatus.IGNORED).build(); + ExecutionResult result = new ExecutionResult(TaskExecutionStatus.IGNORED, message, null, null); adapter.persistenceAdapter() .completeTask(ExecutableTask.builder().runId(runId).taskId(taskId).build(), result); diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java index 9688b50..a155cc5 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java @@ -39,7 +39,7 @@ public void testFailedStop() throws Exception { @Override public ExecutionResult execute(WorkflowManager manager, ExecutableTask task) { if (task.getTaskId().getId().equals("task3")) { - return ExecutionResult.builder().status(TaskExecutionStatus.FAILED_STOP).build(); + return new ExecutionResult(TaskExecutionStatus.FAILED_STOP, null, null, null); } return super.execute(manager, task); } @@ -102,7 +102,7 @@ public void testCanceling() throws Exception { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null); }; TaskType taskType = new TaskType(1, "test"); WorkflowManager workflowManager = builder() @@ -187,6 +187,9 @@ public void testMultiClientSimple() throws Exception { Assert.assertEquals(sets, expectedSets); taskExecutor.getChecker().assertNoDuplicates(); + } catch (Exception e) { + log.error("Error {}", e.getMessage(), e); + throw e; } finally { workflowManagers.forEach(this::closeWorkflow); } @@ -216,8 +219,7 @@ public void testTaskData() throws Exception { Map resultData = new HashMap<>(); resultData.put("one", "1"); resultData.put("two", "2"); - return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS) - .message("").resultMeta(resultData).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, "", resultData, null); }; TaskType taskType = new TaskType(1, "test"); WorkflowManager workflowManager = builder() @@ -266,8 +268,7 @@ public void testSubTask() throws Exception { } RunId subTaskRunId = task.getTaskId().equals(groupAParent.getId()) ? workflowManager.submit(groupBTask) : null; - return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS) - .message("test").resultMeta(new HashMap<>()).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, "test", new HashMap<>(), null); }; WorkflowManager workflowManager = builder() .addingTaskExecutor(taskType, 10, taskExecutor) @@ -334,22 +335,19 @@ public void testMultiTypes() throws Exception { BlockingQueue queue1 = new LinkedBlockingDeque<>(); TaskExecutor taskExecutor1 = (manager, task) -> { queue1.add(task.getTaskId()); - return ExecutionResult.builder() - .status(TaskExecutionStatus.SUCCESS).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null); }; BlockingQueue queue2 = new LinkedBlockingDeque<>(); TaskExecutor taskExecutor2 = (manager, task) -> { queue2.add(task.getTaskId()); - return ExecutionResult.builder() - .status(TaskExecutionStatus.SUCCESS).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null); }; BlockingQueue queue3 = new LinkedBlockingDeque<>(); TaskExecutor taskExecutor3 = (manager, task) -> { queue3.add(task.getTaskId()); - return ExecutionResult.builder() - .status(TaskExecutionStatus.SUCCESS).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null); }; WorkflowManager workflowManager = builder() diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java index 31f96a0..12d7952 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java @@ -52,9 +52,7 @@ public ExecutionResult execute(WorkflowManager manager, ExecutableTask task) { checker.decrement(); latch.countDown(); } - return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS) - .message("hey") - .resultMeta(new HashMap<>()).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, "hey", new HashMap<>(), null); } diff --git a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java index 6b60f95..2231b57 100644 --- a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java +++ b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java @@ -63,7 +63,7 @@ public static void main(final String[] args) throws InterruptedException, IOExce final TaskExecutor te = (w, t) -> { log.info("Executing {}", t.getTaskType()); Utils.sleep(Duration.ofMillis(10)); - return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS).build(); + return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null); }; final JedisPool jedisPool = new JedisPool(); @@ -77,9 +77,7 @@ public static void main(final String[] args) throws InterruptedException, IOExce final WorkflowManager workflowManager = WorkflowManagerBuilder.builder().withAdapter(adapter) .addingTaskExecutor(taskTypeA, 2, te).addingTaskExecutor(taskTypeB, 2, te) .addingTaskExecutor(taskTypeC, 2, te).addingTaskExecutor(decisionType, 1, - (manager, task) -> ExecutionResult.builder() - .status(TaskExecutionStatus.SUCCESS).decision(new TaskId("taske")) - .build()).build(); + (manager, task) -> new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, new TaskId("taske"))).build(); CountDownLatch countDownLatch = new CountDownLatch(SUBMIT_COUNT); workflowManager.workflowManagerListener().addListener(new WorkflowListener() { diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java index d4e284e..21b95f5 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java @@ -1,6 +1,10 @@ package io.github.pavansharma36.workflow.mongodb.adapter; -import com.mongodb.DBObjectCodecProvider; +import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry; +import static org.bson.codecs.configuration.CodecRegistries.fromCodecs; +import static org.bson.codecs.configuration.CodecRegistries.fromProviders; +import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; + import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -13,6 +17,8 @@ import io.github.pavansharma36.workflow.api.bean.id.ManagerId; import io.github.pavansharma36.workflow.api.bean.id.RunId; import io.github.pavansharma36.workflow.api.bean.id.TaskId; +import io.github.pavansharma36.workflow.api.bean.task.TaskType; +import io.github.pavansharma36.workflow.api.dag.RunnableTaskDag; import io.github.pavansharma36.workflow.api.executor.ExecutableTask; import io.github.pavansharma36.workflow.api.executor.ExecutionResult; import io.github.pavansharma36.workflow.api.model.ManagerInfo; @@ -20,24 +26,22 @@ import io.github.pavansharma36.workflow.api.model.TaskInfo; import io.github.pavansharma36.workflow.api.serde.Serde; import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; -import io.github.pavansharma36.workflow.mongodb.helper.SerdeCodecProvider; +import io.github.pavansharma36.workflow.mongodb.helper.IdCodecs; import io.github.pavansharma36.workflow.mongodb.helper.MongoDBQueryHelper; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; -import javax.print.Doc; import org.bson.BsonDocument; import org.bson.BsonDocumentWriter; import org.bson.BsonString; import org.bson.Document; -import org.bson.codecs.BsonValueCodecProvider; import org.bson.codecs.EncoderContext; -import org.bson.codecs.ValueCodecProvider; -import org.bson.codecs.configuration.CodecRegistries; +import org.bson.codecs.configuration.CodecProvider; import org.bson.codecs.configuration.CodecRegistry; +import org.bson.codecs.pojo.PojoCodecProvider; import org.bson.conversions.Bson; public class MongoDBPersistenceAdapter extends BasePersistenceAdapter implements PersistenceAdapter { @@ -52,9 +56,14 @@ public MongoDBPersistenceAdapter(String namespace, PollDelayGenerator heartbeatD super(namespace, heartbeatDelayGenerator); this.database = database; this.mongoClient = mongoClient; - this.codec = new SerdeCodecProvider(CodecRegistries.fromProviders( - Arrays.asList(new ValueCodecProvider(), new BsonValueCodecProvider(), new DBObjectCodecProvider())), - serde); + + CodecProvider pojoCodecProvider = PojoCodecProvider.builder() + .register(ManagerInfo.class, RunInfo.class, TaskInfo.class, ExecutionResult.class, RunnableTaskDag.class, + TaskType.class).build(); + CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(), + fromProviders(pojoCodecProvider), fromCodecs(new IdCodecs.ManagerIdCodec(), new IdCodecs.RunIdCodec(), new IdCodecs.TaskIdCodec())); + + this.codec = pojoCodecRegistry; } @Override @@ -71,9 +80,9 @@ public void stop() { public boolean createOrUpdateManagerInfo(ManagerInfo managerInfo) { Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, new BsonString(managerInfo.getManagerId().getId())); - BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); - codec.get(ManagerInfo.class).encode(writer, managerInfo, EncoderContext.builder().build()); - Bson update = writer.getDocument(); + Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, managerInfo.getManagerId().getId()), + Updates.set(MongoDBQueryHelper.ManagerInfo.START_TIME_KEY, managerInfo.getStartTimeEpoch()), + Updates.set(MongoDBQueryHelper.ManagerInfo.HEARTBEAT_KEY, managerInfo.getHeartbeatEpoch())); UpdateOptions options = new UpdateOptions().upsert(true); return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)) .updateOne(filter, update, options).wasAcknowledged(); @@ -156,7 +165,7 @@ public Optional getRunInfo(RunId runId) { @Override public void createRunInfo(RunInfo runInfo) { BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); - codec.get(RunInfo.class).encode(writer, runInfo, null); + codec.get(RunInfo.class).encode(writer, runInfo, EncoderContext.builder().build()); BsonDocument doc = writer.getDocument(); collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)) .insertOne(Document.parse(doc.toJson())); @@ -176,7 +185,7 @@ public void createTaskInfos(RunId runId, List taskInfos) { for (TaskInfo taskInfo : taskInfos) { taskInfo.setRunId(runId); BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); - codec.get(TaskInfo.class).encode(writer, taskInfo, null); + codec.get(TaskInfo.class).encode(writer, taskInfo, EncoderContext.builder().build()); BsonDocument doc = writer.getDocument(); docs.add(Document.parse(doc.toJson())); } @@ -196,7 +205,8 @@ public boolean cleanup(RunId runId) { @Override public List getStuckRunInfos(Duration maxDuration) { - return null; + // TODO + return Collections.emptyList(); } private MongoCollection collection(String collection) { diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java new file mode 100644 index 0000000..d1d76dc --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java @@ -0,0 +1,67 @@ +package io.github.pavansharma36.workflow.mongodb.helper; + +import io.github.pavansharma36.workflow.api.bean.id.ManagerId; +import io.github.pavansharma36.workflow.api.bean.id.RunId; +import io.github.pavansharma36.workflow.api.bean.id.TaskId; +import org.bson.BsonReader; +import org.bson.BsonWriter; +import org.bson.codecs.Codec; +import org.bson.codecs.DecoderContext; +import org.bson.codecs.EncoderContext; + +public class IdCodecs { + + public static class ManagerIdCodec implements Codec { + @Override + public ManagerId decode(BsonReader bsonReader, DecoderContext decoderContext) { + return new ManagerId(bsonReader.readString()); + } + + @Override + public void encode(BsonWriter bsonWriter, ManagerId managerId, EncoderContext encoderContext) { + bsonWriter.writeString(managerId.getId()); + } + + @Override + public Class getEncoderClass() { + return ManagerId.class; + } + } + + public static class RunIdCodec implements Codec { + + @Override + public RunId decode(BsonReader bsonReader, DecoderContext decoderContext) { + return new RunId(bsonReader.readString()); + } + + @Override + public void encode(BsonWriter bsonWriter, RunId runId, EncoderContext encoderContext) { + bsonWriter.writeString(runId.getId()); + } + + @Override + public Class getEncoderClass() { + return RunId.class; + } + } + + public static class TaskIdCodec implements Codec { + + @Override + public TaskId decode(BsonReader bsonReader, DecoderContext decoderContext) { + return new TaskId(bsonReader.readString()); + } + + @Override + public void encode(BsonWriter bsonWriter, TaskId taskId, EncoderContext encoderContext) { + bsonWriter.writeString(taskId.getId()); + } + + @Override + public Class getEncoderClass() { + return TaskId.class; + } + } + +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java index 167ef5f..9a9f09d 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java @@ -11,6 +11,8 @@ public class MongoDBQueryHelper { @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class ManagerInfo { public static final String MANAGER_ID_KEY = "managerId"; + public static final String START_TIME_KEY = "startTimeEpoch"; + public static final String HEARTBEAT_KEY = "heartbeatEpoch"; public static String collectionName(String namespace) { return namespace + "_manager_info"; diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java index 787c9a5..0470709 100644 --- a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java @@ -26,7 +26,7 @@ public class MongoDBNormalTest extends NormalTest { @Override protected WorkflowAdapter adapter() { final String namespace = "test"; - MongoClient client = MongoClients.create("mongodb://mongoadmin:secret@localhost:27017/test"); + MongoClient client = MongoClients.create("mongodb://mongoadmin:secret@localhost:27017/"); return new WorkflowAdapterBuilder() .withPersistenceAdapterBuilder(MongoDBPersistanceAdapterBuilder.builder("test", client) .withNamespace(namespace)) From dd587b8b3f7b45d9865a1477026e7a80e4145b7e Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Thu, 7 Dec 2023 01:21:45 +0530 Subject: [PATCH 4/6] Major Test cases cleanu --- pom.xml | 13 ++--- workflow-api/pom.xml | 2 - .../api/impl/WorkflowManagerImpl.java | 2 +- .../workflow/api/schedule/Scheduler.java | 23 +++++---- .../pavansharma36/workflow/api/BaseTest.java | 13 ++++- .../workflow/api/NormalTest.java | 17 ++++--- .../workflow/api/PerformanceTest.java | 1 + .../workflow/api/WorkflowListenerTest.java | 1 + .../{ => helper}/ConcurrentTaskChecker.java | 16 +++---- .../api/{ => helper}/TestTaskExecutor.java | 4 +- .../workflow/api/junit/WorkflowTestRule.java | 37 +++++++++++++++ .../pavansharma36/workflow/examples/App.java | 1 + workflow-inmemory/pom.xml | 8 ++++ .../adapter/InmemoryPersistenceAdapter.java | 4 +- .../InmemoryPersistenceAdapterBuilder.java | 11 +++++ .../builder/InmemoryQueueAdapterBuilder.java | 10 ++++ .../InmemoryScheduleAdapterBuilder.java | 11 +++++ .../workflow/inmemory/InmemoryNormalTest.java | 12 +++++ .../inmemory/InmemoryPerformanceTest.java | 12 +++++ .../InmemoryWorkflowListenerTest.java | 12 +++++ .../workflow/inmemory/rule/InmemoryRule.java | 28 +++++++++++ workflow-jedis/pom.xml | 1 - .../workflow/jedis/JedisNormalTest.java | 40 +++------------- .../workflow/jedis/JedisPerformanceTest.java | 40 ++-------------- .../jedis/JedisWorkflowListenerTest.java | 38 ++------------- .../workflow/jedis/rule/JedisRule.java | 40 ++++++++++++++++ workflow-mongodb/pom.xml | 10 +++- .../java/io/github/pavansharma36/AppTest.java | 38 --------------- .../workflow/mongodb/MongoDBNormalTest.java | 33 ++----------- .../mongodb/MongoDBPerformanceTest.java | 11 +++++ .../mongodb/MongoDBWorkflowListenerTest.java | 12 +++++ .../workflow/mongodb/rule/MongoDBRule.java | 47 +++++++++++++++++++ 32 files changed, 335 insertions(+), 213 deletions(-) rename workflow-api/src/test/java/io/github/pavansharma36/workflow/api/{ => helper}/ConcurrentTaskChecker.java (72%) rename workflow-api/src/test/java/io/github/pavansharma36/workflow/api/{ => helper}/TestTaskExecutor.java (90%) create mode 100644 workflow-api/src/test/java/io/github/pavansharma36/workflow/api/junit/WorkflowTestRule.java create mode 100644 workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryNormalTest.java create mode 100644 workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryPerformanceTest.java create mode 100644 workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryWorkflowListenerTest.java create mode 100644 workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/rule/InmemoryRule.java create mode 100644 workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/rule/JedisRule.java delete mode 100644 workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java create mode 100644 workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBPerformanceTest.java create mode 100644 workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBWorkflowListenerTest.java create mode 100644 workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java diff --git a/pom.xml b/pom.xml index 7f42b92..3b56e89 100644 --- a/pom.xml +++ b/pom.xml @@ -9,19 +9,22 @@ http://maven.apache.org Workflow - A Distributed Dag Processing Library + + org.springframework.boot + spring-boot-dependencies + 2.7.18 + + UTF-8 1.8 ${java.version} ${java.version} - 2.13.4 1.3.1 - 1.18.24 - 2.0.1 4.13.2 - 1.17.3 + 1.19.3 3.12.1 @@ -37,8 +40,6 @@ 2.0.0-M3 - 3.7.1 - 4.9.0 diff --git a/workflow-api/pom.xml b/workflow-api/pom.xml index b1057b9..b54ba91 100644 --- a/workflow-api/pom.xml +++ b/workflow-api/pom.xml @@ -14,12 +14,10 @@ com.fasterxml.jackson.core jackson-annotations - ${jackson.version} com.fasterxml.jackson.core jackson-databind - ${jackson.version} org.jgrapht diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java index 5be7cea..7cf683a 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java @@ -81,7 +81,7 @@ public void close() throws IOException { log.info("Successfully stopped scheduled executor service"); } else { List list = scheduledExecutorService().shutdownNow(); - log.info("Force shutdown scheduled executor service, dropped {} tasks", list.size()); + log.warn("Force shutdown scheduled executor service, dropped {} tasks", list.size()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java index 48d665d..f6308a8 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java @@ -71,11 +71,16 @@ public void stop() { } } + private boolean isNotStopped() { + State s = state.get(); + return s != State.STOPPING && s != State.STOPPED; + } + private void startMaintenanceLoop(final WorkflowManager workflowManager, final ScheduledExecutorService scheduledExecutorService) { boolean result = false; try { - if (adapter.scheduleAdapter().isScheduler()) { + if (adapter.scheduleAdapter().isScheduler() && isNotStopped()) { log.info("Clearing all stuck workflows"); Duration maxRunDuration = adapter.scheduleAdapter().maxRunDuration(); List stuckRuns = adapter.persistenceAdapter().getStuckRunInfos(maxRunDuration); @@ -88,7 +93,7 @@ private void startMaintenanceLoop(final WorkflowManager workflowManager, adapter.maintenance(); } } finally { - if (state.get() != State.STOPPED) { + if (isNotStopped()) { final Duration duration = adapter.scheduleAdapter().maintenanceDelayGenerator().delay(result); scheduledExecutorService.schedule(() -> startMaintenanceLoop(workflowManager, @@ -101,10 +106,12 @@ private void startHeartbeatLoop(final WorkflowManager workflowManager, final ScheduledExecutorService scheduledExecutorService) { boolean result = false; try { - ManagerInfo managerInfo = workflowManager.info(); - log.info("Updating heartbeat {}", managerInfo.getManagerId()); - managerInfo.setHeartbeatEpoch(System.currentTimeMillis()); - result = adapter.persistenceAdapter().createOrUpdateManagerInfo(managerInfo); + if (isNotStopped()) { + ManagerInfo managerInfo = workflowManager.info(); + log.info("Updating heartbeat {}", managerInfo.getManagerId()); + managerInfo.setHeartbeatEpoch(System.currentTimeMillis()); + result = adapter.persistenceAdapter().createOrUpdateManagerInfo(managerInfo); + } } finally { if (state.get() != State.STOPPED) { final Duration duration = adapter.persistenceAdapter() @@ -119,7 +126,7 @@ private void startHandleUpdatedRunLoop(final WorkflowManager workflowManager, final ScheduledExecutorService scheduledExecutorService) { boolean result = false; try { - if (adapter.scheduleAdapter().isScheduler()) { + if (adapter.scheduleAdapter().isScheduler() && isNotStopped()) { try { result = handleRun(workflowManager); } catch (final Exception e) { @@ -129,7 +136,7 @@ private void startHandleUpdatedRunLoop(final WorkflowManager workflowManager, log.debug("Not scheduler"); } } finally { - if (state.get() != State.STOPPED) { + if (isNotStopped()) { final Duration duration = adapter.scheduleAdapter().pollDelayGenerator().delay(result); scheduledExecutorService.schedule(() -> startHandleUpdatedRunLoop(workflowManager, scheduledExecutorService), diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java index fdbaed6..c47eea9 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java @@ -3,12 +3,13 @@ import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; import io.github.pavansharma36.workflow.api.bean.task.Task; import io.github.pavansharma36.workflow.api.impl.WorkflowManagerBuilder; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; import io.github.pavansharma36.workflow.api.serde.JacksonTaskLoader; import io.github.pavansharma36.workflow.api.util.WorkflowException; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; -import lombok.extern.slf4j.Slf4j; +import org.junit.Rule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,6 +17,12 @@ public abstract class BaseTest { protected final Logger log = LoggerFactory.getLogger(getClass()); + @Rule + public final WorkflowTestRule rule = rule(); + + + protected abstract WorkflowTestRule rule(); + protected WorkflowManagerBuilder builder() { log.warn("Building workflow manager"); return WorkflowManagerBuilder.builder() @@ -40,6 +47,8 @@ protected void closeWorkflow(WorkflowManager workflowManager) { } } - protected abstract WorkflowAdapter adapter(); + protected WorkflowAdapter adapter() { + return rule.adapter(); + } } diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java index a155cc5..4b6e3e5 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java @@ -12,6 +12,7 @@ import io.github.pavansharma36.workflow.api.executor.ExecutionResult; import io.github.pavansharma36.workflow.api.executor.TaskExecutionStatus; import io.github.pavansharma36.workflow.api.executor.TaskExecutor; +import io.github.pavansharma36.workflow.api.helper.TestTaskExecutor; import io.github.pavansharma36.workflow.api.model.RunInfo; import java.util.ArrayList; import java.util.Arrays; @@ -200,13 +201,17 @@ public void testNoData() throws Exception { WorkflowManager workflowManager = builder() .addingTaskExecutor(new TaskType(1, "test"), 10, new TestTaskExecutor(1)) .build(); - workflowManager.start(); + try { + workflowManager.start(); - Thread.sleep(1000L); + Thread.sleep(1000L); - Optional taskData = - workflowManager.getTaskExecutionResult(new RunId(), new TaskId()); - Assert.assertFalse(taskData.isPresent()); + Optional taskData = + workflowManager.getTaskExecutionResult(new RunId(), new TaskId()); + Assert.assertFalse(taskData.isPresent()); + } finally { + closeWorkflow(workflowManager); + } } @Test @@ -387,6 +392,4 @@ public void testMultiTypes() throws Exception { } } - protected abstract WorkflowAdapter adapter(); - } diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java index bcbdb2a..0f0c911 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java @@ -6,6 +6,7 @@ import io.github.pavansharma36.workflow.api.bean.id.TaskId; import io.github.pavansharma36.workflow.api.bean.task.Task; import io.github.pavansharma36.workflow.api.bean.task.TaskType; +import io.github.pavansharma36.workflow.api.helper.TestTaskExecutor; import io.github.pavansharma36.workflow.api.util.RoundRobinIterator; import java.util.LinkedList; import java.util.List; diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java index 064b6c8..14c73f0 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java @@ -6,6 +6,7 @@ import io.github.pavansharma36.workflow.api.bean.task.Task; import io.github.pavansharma36.workflow.api.bean.task.TaskType; import io.github.pavansharma36.workflow.api.bean.task.impl.SimpleTask; +import io.github.pavansharma36.workflow.api.helper.TestTaskExecutor; import java.util.HashSet; import java.util.Set; import org.junit.Assert; diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/ConcurrentTaskChecker.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/ConcurrentTaskChecker.java similarity index 72% rename from workflow-api/src/test/java/io/github/pavansharma36/workflow/api/ConcurrentTaskChecker.java rename to workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/ConcurrentTaskChecker.java index c99a396..99f5dbf 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/ConcurrentTaskChecker.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/ConcurrentTaskChecker.java @@ -1,4 +1,4 @@ -package io.github.pavansharma36.workflow.api; +package io.github.pavansharma36.workflow.api.helper; import io.github.pavansharma36.workflow.api.bean.id.RunId; import io.github.pavansharma36.workflow.api.bean.id.TaskId; @@ -9,26 +9,26 @@ import org.junit.Assert; import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair; -class ConcurrentTaskChecker { +public class ConcurrentTaskChecker { private final Set currentSet = new HashSet<>(); private final List> all = new ArrayList<>(); private int count = 0; private final List> sets = new ArrayList<>(); - synchronized void reset() { + public synchronized void reset() { currentSet.clear(); all.clear(); sets.clear(); count = 0; } - synchronized void add(RunId runId, TaskId taskId) { + public synchronized void add(RunId runId, TaskId taskId) { all.add(Pair.of(runId, taskId)); currentSet.add(taskId); ++count; } - synchronized void decrement() { + public synchronized void decrement() { if (--count == 0) { HashSet copy = new HashSet<>(currentSet); currentSet.clear(); @@ -37,15 +37,15 @@ synchronized void decrement() { } } - synchronized List> getSets() { + public synchronized List> getSets() { return new ArrayList<>(sets); } - synchronized List> getAll() { + public synchronized List> getAll() { return new ArrayList<>(all); } - synchronized void assertNoDuplicates() { + public synchronized void assertNoDuplicates() { Assert.assertEquals(all.size(), new HashSet<>(all).size()); // no dups } } diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/TestTaskExecutor.java similarity index 90% rename from workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java rename to workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/TestTaskExecutor.java index 12d7952..e56bfea 100644 --- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/TestTaskExecutor.java @@ -1,9 +1,11 @@ -package io.github.pavansharma36.workflow.api; +package io.github.pavansharma36.workflow.api.helper; +import io.github.pavansharma36.workflow.api.WorkflowManager; import io.github.pavansharma36.workflow.api.executor.ExecutableTask; import io.github.pavansharma36.workflow.api.executor.ExecutionResult; import io.github.pavansharma36.workflow.api.executor.TaskExecutionStatus; import io.github.pavansharma36.workflow.api.executor.TaskExecutor; +import io.github.pavansharma36.workflow.api.helper.ConcurrentTaskChecker; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import lombok.extern.slf4j.Slf4j; diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/junit/WorkflowTestRule.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/junit/WorkflowTestRule.java new file mode 100644 index 0000000..301eda7 --- /dev/null +++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/junit/WorkflowTestRule.java @@ -0,0 +1,37 @@ +package io.github.pavansharma36.workflow.api.junit; + +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import java.util.LinkedList; +import java.util.List; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.MultipleFailureException; +import org.junit.runners.model.Statement; + +public abstract class WorkflowTestRule implements TestRule { + + public abstract WorkflowAdapter adapter(); + + @Override + public Statement apply(Statement statement, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + List errors = new LinkedList<>(); + try { + pre(); + statement.evaluate(); + } catch (Throwable e) { + errors.add(e); + } finally { + post(); + } + MultipleFailureException.assertEmpty(errors); + } + }; + } + + protected abstract void pre(); + + protected abstract void post(); +} diff --git a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java index 2231b57..b488588 100644 --- a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java +++ b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java @@ -118,5 +118,6 @@ public void onTaskEvent(TaskEvent event) { System.currentTimeMillis() - startTimeMillis); workflowManager.close(); + jedisPool.close(); } } diff --git a/workflow-inmemory/pom.xml b/workflow-inmemory/pom.xml index 2419afc..49a41eb 100644 --- a/workflow-inmemory/pom.xml +++ b/workflow-inmemory/pom.xml @@ -16,5 +16,13 @@ workflow-api ${project.version} + + + io.github.pavansharma36 + workflow-api + ${project.version} + test + test-jar + diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java index 5ff27ff..70573e3 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java @@ -135,9 +135,7 @@ public void createTaskInfos(RunId runId, List taskInfos) { @Override public boolean cleanup(RunId runId) { - taskInfos.remove(runId); - runInfos.remove(runId); - return true; + return taskInfos.remove(runId) != null && runInfos.remove(runId) != null; } @Override diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java index e13ed9b..b5beccd 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java @@ -2,12 +2,23 @@ import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder; +import io.github.pavansharma36.workflow.api.util.WorkflowException; import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryPersistenceAdapter; public class InmemoryPersistenceAdapterBuilder extends BasePersistenceAdapterBuilder { + + public InmemoryPersistenceAdapterBuilder() { + namespace = "NA"; + } @Override public PersistenceAdapter build() { return new InmemoryPersistenceAdapter(namespace, pollDelayGenerator); } + + @Override + public InmemoryPersistenceAdapterBuilder withNamespace(String namespace) { + throw new WorkflowException("Namespace is not supported for inmemory"); + } + } diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java index 4c79082..68a3f7e 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java @@ -2,12 +2,22 @@ import io.github.pavansharma36.workflow.api.adapter.QueueAdapter; import io.github.pavansharma36.workflow.api.adapter.builder.BaseAdapterBuilder; +import io.github.pavansharma36.workflow.api.util.WorkflowException; import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryQueueAdapter; public class InmemoryQueueAdapterBuilder extends BaseAdapterBuilder { + + public InmemoryQueueAdapterBuilder() { + namespace = "NA"; + } @Override public QueueAdapter build() { return new InmemoryQueueAdapter(pollDelayGenerator); } + + @Override + public InmemoryQueueAdapterBuilder withNamespace(String namespace) { + throw new WorkflowException("Namespace is not supported for inmemory"); + } } diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java index 5c5f89c..b110c69 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java @@ -2,11 +2,22 @@ import io.github.pavansharma36.workflow.api.adapter.ScheduleAdapter; import io.github.pavansharma36.workflow.api.adapter.builder.BaseScheduleAdapterBuilder; +import io.github.pavansharma36.workflow.api.util.WorkflowException; import io.github.pavansharma36.workflow.inmemory.adapter.InmemorySchedulerAdapter; public class InmemoryScheduleAdapterBuilder extends BaseScheduleAdapterBuilder { + + public InmemoryScheduleAdapterBuilder() { + namespace = "NA"; + } + @Override public ScheduleAdapter build() { return new InmemorySchedulerAdapter(pollDelayGenerator, maintenanceDelayGenerator); } + + @Override + public InmemoryScheduleAdapterBuilder withNamespace(String namespace) { + throw new WorkflowException("Namespace is not supported for inmemory"); + } } diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryNormalTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryNormalTest.java new file mode 100644 index 0000000..1ef98d0 --- /dev/null +++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryNormalTest.java @@ -0,0 +1,12 @@ +package io.github.pavansharma36.workflow.inmemory; + +import io.github.pavansharma36.workflow.api.NormalTest; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.inmemory.rule.InmemoryRule; + +public class InmemoryNormalTest extends NormalTest { + @Override + protected WorkflowTestRule rule() { + return new InmemoryRule(); + } +} diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryPerformanceTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryPerformanceTest.java new file mode 100644 index 0000000..3a1cf25 --- /dev/null +++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryPerformanceTest.java @@ -0,0 +1,12 @@ +package io.github.pavansharma36.workflow.inmemory; + +import io.github.pavansharma36.workflow.api.PerformanceTest; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.inmemory.rule.InmemoryRule; + +public class InmemoryPerformanceTest extends PerformanceTest { + @Override + protected WorkflowTestRule rule() { + return new InmemoryRule(); + } +} diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryWorkflowListenerTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryWorkflowListenerTest.java new file mode 100644 index 0000000..d076089 --- /dev/null +++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryWorkflowListenerTest.java @@ -0,0 +1,12 @@ +package io.github.pavansharma36.workflow.inmemory; + +import io.github.pavansharma36.workflow.api.WorkflowListenerTest; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.inmemory.rule.InmemoryRule; + +public class InmemoryWorkflowListenerTest extends WorkflowListenerTest { + @Override + protected WorkflowTestRule rule() { + return new InmemoryRule(); + } +} diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/rule/InmemoryRule.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/rule/InmemoryRule.java new file mode 100644 index 0000000..97852b6 --- /dev/null +++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/rule/InmemoryRule.java @@ -0,0 +1,28 @@ +package io.github.pavansharma36.workflow.inmemory.rule; + +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; +import io.github.pavansharma36.workflow.inmemory.builder.InmemoryWorkflowAdapterBuilder; +import java.time.Duration; + +public class InmemoryRule extends WorkflowTestRule { + @Override + public WorkflowAdapter adapter() { + return InmemoryWorkflowAdapterBuilder.builder() + .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .build(); + } + + + @Override + protected void pre() { + // Nothing to do + } + + @Override + protected void post() { + // Nothing to do + } +} diff --git a/workflow-jedis/pom.xml b/workflow-jedis/pom.xml index 8b85f09..4f7ccbd 100644 --- a/workflow-jedis/pom.xml +++ b/workflow-jedis/pom.xml @@ -22,7 +22,6 @@ redis.clients jedis - ${jedis.version} diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java index 5fc7d98..0a0f146 100644 --- a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java +++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java @@ -1,42 +1,14 @@ package io.github.pavansharma36.workflow.jedis; import io.github.pavansharma36.workflow.api.NormalTest; -import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; -import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; -import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder; -import java.time.Duration; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.DockerImageName; -import redis.clients.jedis.JedisPool; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.jedis.rule.JedisRule; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class JedisNormalTest extends NormalTest { - - @Rule - public GenericContainer redis = new GenericContainer(DockerImageName.parse("redis")) - .withExposedPorts(6379); - - private JedisPool jedisPool = null; - - @Before - public void init() { - jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort()); - } - - @After - public void tearDown() { - jedisPool.close(); - } - @Override - protected WorkflowAdapter adapter() { - final String namespace = "test"; - //final JedisPool jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort()); - return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace) - .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) - .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) - .build(); + protected WorkflowTestRule rule() { + return new JedisRule(); } } diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java index 93d98a5..81e74ae 100644 --- a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java +++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java @@ -1,45 +1,13 @@ package io.github.pavansharma36.workflow.jedis; import io.github.pavansharma36.workflow.api.PerformanceTest; -import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; -import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; -import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder; -import java.time.Duration; -import lombok.extern.slf4j.Slf4j; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.DockerImageName; -import redis.clients.jedis.JedisPool; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.jedis.rule.JedisRule; -@Slf4j public class JedisPerformanceTest extends PerformanceTest { - @Rule - public GenericContainer redis = new GenericContainer(DockerImageName.parse("redis")) - .withExposedPorts(6379); - - private JedisPool jedisPool = null; - - @Before - public void init() { - log.warn("Initializing jedis pool"); - jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort()); - } - - @After - public void tearDown() { - log.warn("Closing jedis pool"); - jedisPool.close(); - } @Override - protected WorkflowAdapter adapter() { - final String namespace = "test"; - //final JedisPool jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort()); - return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace) - .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(10L))) - .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(10L))) - .build(); + protected WorkflowTestRule rule() { + return new JedisRule(); } } diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java index e12d686..962c4bb 100644 --- a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java +++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java @@ -1,42 +1,12 @@ package io.github.pavansharma36.workflow.jedis; import io.github.pavansharma36.workflow.api.WorkflowListenerTest; -import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; -import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; -import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder; -import java.time.Duration; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.DockerImageName; -import redis.clients.jedis.JedisPool; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.jedis.rule.JedisRule; public class JedisWorkflowListenerTest extends WorkflowListenerTest { - - @Rule - public GenericContainer redis = new GenericContainer(DockerImageName.parse("redis")) - .withExposedPorts(6379); - - private JedisPool jedisPool = null; - - @Before - public void init() { - jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort()); - } - - @After - public void tearDown() { - jedisPool.close(); - } - @Override - protected WorkflowAdapter adapter() { - final String namespace = "test"; - return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace) - .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) - .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) - .build(); + protected WorkflowTestRule rule() { + return new JedisRule(); } - } diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/rule/JedisRule.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/rule/JedisRule.java new file mode 100644 index 0000000..89387ff --- /dev/null +++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/rule/JedisRule.java @@ -0,0 +1,40 @@ +package io.github.pavansharma36.workflow.jedis.rule; + +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; +import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder; +import java.time.Duration; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; +import redis.clients.jedis.JedisPool; + +public class JedisRule extends WorkflowTestRule { + + private GenericContainer redis = new GenericContainer(DockerImageName.parse("redis")) + .withExposedPorts(6379); + private JedisPool jedisPool = null; + + @Override + public WorkflowAdapter adapter() { + final String namespace = "test"; + //final JedisPool jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort()); + return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace) + .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .build(); + } + + @Override + protected void pre() { + redis.start(); + jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort()); + } + + @Override + protected void post() { + jedisPool.close(); + redis.stop(); + } + +} diff --git a/workflow-mongodb/pom.xml b/workflow-mongodb/pom.xml index bd80575..82f7651 100644 --- a/workflow-mongodb/pom.xml +++ b/workflow-mongodb/pom.xml @@ -20,7 +20,6 @@ org.mongodb mongodb-driver-sync - ${mongodb.driver.sync.version} @@ -34,7 +33,14 @@ io.github.pavansharma36 workflow-inmemory - 0.0.1-SNAPSHOT + ${project.version} + test + + + org.testcontainers + mongodb + ${test.containers.version} + test diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java deleted file mode 100644 index 98d2799..0000000 --- a/workflow-mongodb/src/test/java/io/github/pavansharma36/AppTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.github.pavansharma36; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -/** - * Unit test for simple App. - */ -public class AppTest - extends TestCase -{ - /** - * Create the test case - * - * @param testName name of the test case - */ - public AppTest( String testName ) - { - super( testName ); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() - { - return new TestSuite( AppTest.class ); - } - - /** - * Rigourous Test :-) - */ - public void testApp() - { - assertTrue( true ); - } -} diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java index 0470709..0e1a02f 100644 --- a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java @@ -1,39 +1,12 @@ package io.github.pavansharma36.workflow.mongodb; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; import io.github.pavansharma36.workflow.api.NormalTest; -import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; -import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder; -import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; -import io.github.pavansharma36.workflow.inmemory.builder.InmemoryQueueAdapterBuilder; -import io.github.pavansharma36.workflow.inmemory.builder.InmemoryScheduleAdapterBuilder; -import io.github.pavansharma36.workflow.mongodb.adapter.builder.MongoDBPersistanceAdapterBuilder; -import java.time.Duration; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.DockerImageName; +import io.github.pavansharma36.workflow.mongodb.rule.MongoDBRule; public class MongoDBNormalTest extends NormalTest { -// @Rule -// public GenericContainer redis = new GenericContainer(DockerImageName.parse("mongo")) -// .withExposedPorts(27017); - - @Override - protected WorkflowAdapter adapter() { - final String namespace = "test"; - MongoClient client = MongoClients.create("mongodb://mongoadmin:secret@localhost:27017/"); - return new WorkflowAdapterBuilder() - .withPersistenceAdapterBuilder(MongoDBPersistanceAdapterBuilder.builder("test", client) - .withNamespace(namespace)) - .withScheduleAdapterBuilder(new InmemoryScheduleAdapterBuilder().withNamespace(namespace)) - .withQueueAdapterBuilder(new InmemoryQueueAdapterBuilder().withNamespace(namespace)) - .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) - .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) - .build(); + protected MongoDBRule rule() { + return new MongoDBRule(); } } diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBPerformanceTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBPerformanceTest.java new file mode 100644 index 0000000..9c933e3 --- /dev/null +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBPerformanceTest.java @@ -0,0 +1,11 @@ +package io.github.pavansharma36.workflow.mongodb; + +import io.github.pavansharma36.workflow.api.PerformanceTest; +import io.github.pavansharma36.workflow.mongodb.rule.MongoDBRule; + +public class MongoDBPerformanceTest extends PerformanceTest { + @Override + protected MongoDBRule rule() { + return new MongoDBRule(); + } +} diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBWorkflowListenerTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBWorkflowListenerTest.java new file mode 100644 index 0000000..06b632d --- /dev/null +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBWorkflowListenerTest.java @@ -0,0 +1,12 @@ +package io.github.pavansharma36.workflow.mongodb; + +import io.github.pavansharma36.workflow.api.WorkflowListenerTest; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.mongodb.rule.MongoDBRule; + +public class MongoDBWorkflowListenerTest extends WorkflowListenerTest { + @Override + protected WorkflowTestRule rule() { + return new MongoDBRule(); + } +} diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java new file mode 100644 index 0000000..b15ff45 --- /dev/null +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java @@ -0,0 +1,47 @@ +package io.github.pavansharma36.workflow.mongodb.rule; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter; +import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder; +import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule; +import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; +import io.github.pavansharma36.workflow.inmemory.builder.InmemoryQueueAdapterBuilder; +import io.github.pavansharma36.workflow.inmemory.builder.InmemoryScheduleAdapterBuilder; +import io.github.pavansharma36.workflow.mongodb.adapter.builder.MongoDBPersistanceAdapterBuilder; +import java.time.Duration; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.utility.DockerImageName; + +public class MongoDBRule extends WorkflowTestRule { + + public final MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10")); + + private MongoClient client = null; + + @Override + protected void pre() { + mongoDBContainer.start(); + client = MongoClients.create(mongoDBContainer.getConnectionString()); + } + + @Override + protected void post() { + client.close(); + mongoDBContainer.stop(); + } + + + @Override + public WorkflowAdapter adapter() { + final String namespace = "test"; + return new WorkflowAdapterBuilder() + .withPersistenceAdapterBuilder(MongoDBPersistanceAdapterBuilder.builder("test", client) + .withNamespace(namespace)) + .withScheduleAdapterBuilder(new InmemoryScheduleAdapterBuilder().withNamespace(namespace)) + .withQueueAdapterBuilder(new InmemoryQueueAdapterBuilder().withNamespace(namespace)) + .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) + .build(); + } +} From f6b8d284ff514090f3c27516439d8be492a7b485 Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Thu, 7 Dec 2023 01:34:07 +0530 Subject: [PATCH 5/6] cleaned up actions --- .github/workflows/maven.yml | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 5b63839..5691dea 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -5,9 +5,9 @@ name: Java CI with Maven on: push: - branches: [ main ] + branches: [ main, release ] pull_request: - branches: [ main ] + branches: [ main, release ] jobs: build: @@ -22,11 +22,6 @@ jobs: java-version: '8' distribution: 'adopt' cache: maven - server-id: ossrh - server-username: MAVEN_USERNAME - server-password: MAVEN_PASSWORD - gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} - gpg-passphrase: MAVEN_GPG_PASSPHRASE - name: Build with Maven run: mvn -B install -DskipTests --file pom.xml - name: Checkstyle with Maven @@ -35,10 +30,30 @@ jobs: run: mvn spotbugs:check --file pom.xml - name: Unit Test with Maven run: mvn test --file pom.xml + + deploy: + if: github.ref == 'refs/heads/main' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 8 + uses: actions/setup-java@v2 + with: + java-version: '8' + distribution: 'adopt' + cache: maven + server-id: ossrh + server-username: MAVEN_USERNAME + server-password: MAVEN_PASSWORD + gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} + gpg-passphrase: MAVEN_GPG_PASSPHRASE + - name: Deploy with Maven run: mvn -B clean deploy -DskipTests -Pci-cd + env: MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} + From febaad0b775f4d196979cad05e03e23e284f184f Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Thu, 7 Dec 2023 01:38:11 +0530 Subject: [PATCH 6/6] deploy on build completion --- .github/workflows/maven.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 5691dea..0bf3c1d 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -33,6 +33,8 @@ jobs: deploy: if: github.ref == 'refs/heads/main' + needs: + - build runs-on: ubuntu-latest steps: - uses: actions/checkout@v2