diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 5b63839..0bf3c1d 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -5,9 +5,9 @@ name: Java CI with Maven
on:
push:
- branches: [ main ]
+ branches: [ main, release ]
pull_request:
- branches: [ main ]
+ branches: [ main, release ]
jobs:
build:
@@ -22,11 +22,6 @@ jobs:
java-version: '8'
distribution: 'adopt'
cache: maven
- server-id: ossrh
- server-username: MAVEN_USERNAME
- server-password: MAVEN_PASSWORD
- gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }}
- gpg-passphrase: MAVEN_GPG_PASSPHRASE
- name: Build with Maven
run: mvn -B install -DskipTests --file pom.xml
- name: Checkstyle with Maven
@@ -35,10 +30,32 @@ jobs:
run: mvn spotbugs:check --file pom.xml
- name: Unit Test with Maven
run: mvn test --file pom.xml
+
+ deploy:
+ if: github.ref == 'refs/heads/main'
+ needs:
+ - build
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK 8
+ uses: actions/setup-java@v2
+ with:
+ java-version: '8'
+ distribution: 'adopt'
+ cache: maven
+ server-id: ossrh
+ server-username: MAVEN_USERNAME
+ server-password: MAVEN_PASSWORD
+ gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }}
+ gpg-passphrase: MAVEN_GPG_PASSPHRASE
+
- name: Deploy with Maven
run: mvn -B clean deploy -DskipTests -Pci-cd
+
env:
MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }}
MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }}
MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }}
+
diff --git a/pom.xml b/pom.xml
index dd79b19..3b56e89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,19 +9,22 @@
http://maven.apache.org
Workflow - A Distributed Dag Processing Library
+
+ org.springframework.boot
+ spring-boot-dependencies
+ 2.7.18
+
+
UTF-8
1.8
${java.version}
${java.version}
- 2.13.4
1.3.1
- 1.18.24
- 2.0.1
4.13.2
- 1.17.3
+ 1.19.3
3.12.1
@@ -35,6 +38,8 @@
4.7.0
9.3
2.0.0-M3
+
+
@@ -255,5 +260,8 @@
workflow-api
workflow-jedis
workflow-examples
+ workflow-jdbc
+ workflow-mongodb
+ workflow-inmemory
diff --git a/workflow-api/pom.xml b/workflow-api/pom.xml
index b1057b9..b54ba91 100644
--- a/workflow-api/pom.xml
+++ b/workflow-api/pom.xml
@@ -14,12 +14,10 @@
com.fasterxml.jackson.core
jackson-annotations
- ${jackson.version}
com.fasterxml.jackson.core
jackson-databind
- ${jackson.version}
org.jgrapht
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java
index 307a379..53490fc 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseAdapter.java
@@ -13,7 +13,12 @@
public abstract class BaseAdapter implements Adapter {
protected final String namespace;
+
+ /**
+ * delay generator can have use according to type of adapter.
+ * eg. poll delay for queue and scheduler.
+ * heartbeat for persistence adapter.
+ */
protected final PollDelayGenerator pollDelayGenerator;
- protected final Serde serde;
}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java
index ef863fd..24babf0 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BasePersistenceAdapter.java
@@ -1,24 +1,43 @@
package io.github.pavansharma36.workflow.api.adapter.base;
import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter;
+import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
+import io.github.pavansharma36.workflow.api.model.ManagerInfo;
import io.github.pavansharma36.workflow.api.serde.Deserializer;
import io.github.pavansharma36.workflow.api.serde.Serde;
import io.github.pavansharma36.workflow.api.serde.Serializer;
import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
+import java.util.Date;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
/**
* Base class for all {@link PersistenceAdapter}.
*/
+@Slf4j
public abstract class BasePersistenceAdapter extends BaseAdapter implements PersistenceAdapter {
- protected final Serializer serializer;
- protected final Deserializer deserializer;
-
protected BasePersistenceAdapter(String namespace,
- PollDelayGenerator pollDelayGenerator,
- Serde serde) {
- super(namespace, pollDelayGenerator, serde);
- this.serializer = serde.serializer();
- this.deserializer = serde.deserializer();
+ PollDelayGenerator heartbeatDelayGenerator) {
+ super(namespace, heartbeatDelayGenerator);
+ }
+
+ @Override
+ public void maintenance(WorkflowAdapter adapter) {
+ List managerInfos = getAllManagerInfos();
+ long minHeartbeatTimestamp = System.currentTimeMillis()
+ - (heartbeatDelayGenerator().delay(false).toMillis() * 15);
+ managerInfos.forEach(m -> {
+ if (m.getHeartbeatEpoch() < minHeartbeatTimestamp) {
+ log.info("WorkflowManager's heartbeat is not updated since {}, purging",
+ new Date(m.getHeartbeatEpoch()));
+ removeManagerInfo(m.getManagerId());
+ }
+ });
+ }
+
+ @Override
+ public PollDelayGenerator heartbeatDelayGenerator() {
+ return pollDelayGenerator;
}
}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseQueueAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseQueueAdapter.java
deleted file mode 100644
index 1f4dcb5..0000000
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseQueueAdapter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package io.github.pavansharma36.workflow.api.adapter.base;
-
-import io.github.pavansharma36.workflow.api.adapter.QueueAdapter;
-import io.github.pavansharma36.workflow.api.serde.Deserializer;
-import io.github.pavansharma36.workflow.api.serde.Serde;
-import io.github.pavansharma36.workflow.api.serde.Serializer;
-import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
-
-/**
- * Base class for all {@link QueueAdapter}.
- */
-public abstract class BaseQueueAdapter extends BaseAdapter implements QueueAdapter {
-
- protected final Serializer serializer;
- protected final Deserializer deserializer;
-
- protected BaseQueueAdapter(String namespace,
- PollDelayGenerator pollDelayGenerator,
- Serde serde) {
- super(namespace, pollDelayGenerator, serde);
- this.serializer = serde.serializer();
- this.deserializer = serde.deserializer();
- }
-}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseScheduleAdapter.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseScheduleAdapter.java
deleted file mode 100644
index c58531d..0000000
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/base/BaseScheduleAdapter.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package io.github.pavansharma36.workflow.api.adapter.base;
-
-import io.github.pavansharma36.workflow.api.adapter.ScheduleAdapter;
-import io.github.pavansharma36.workflow.api.serde.Serde;
-import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
-
-/**
- * Base class for all {@link ScheduleAdapter}.
- */
-public abstract class BaseScheduleAdapter extends BaseAdapter implements ScheduleAdapter {
-
- protected BaseScheduleAdapter(String namespace,
- PollDelayGenerator pollDelayGenerator,
- Serde serde) {
- super(namespace, pollDelayGenerator, serde);
- }
-}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java
new file mode 100644
index 0000000..cf98ef7
--- /dev/null
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java
@@ -0,0 +1,13 @@
+package io.github.pavansharma36.workflow.api.adapter.builder;
+
+import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter;
+import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
+
+public abstract class BasePersistenceAdapterBuilder>
+ extends BaseAdapterBuilder {
+
+ public T withHeartbeatDelayGenerator(PollDelayGenerator heartbeatDelayGenerator) {
+ this.pollDelayGenerator = heartbeatDelayGenerator;
+ return (T) this;
+ }
+}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java
index f9ded81..c7d8bf8 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BaseScheduleAdapterBuilder.java
@@ -18,15 +18,15 @@ public abstract class BaseScheduleAdapterBuilder withMaintenanceDelayGenerator(
+ public S withMaintenanceDelayGenerator(
@NonNull final PollDelayGenerator maintenanceDelayGenerator) {
this.maintenanceDelayGenerator = maintenanceDelayGenerator;
- return this;
+ return (S) this;
}
- public BaseScheduleAdapterBuilder maxRunDuration(@NonNull Duration maxRunDuration) {
+ public S maxRunDuration(@NonNull Duration maxRunDuration) {
this.maxRunDuration = maxRunDuration;
- return this;
+ return (S) this;
}
}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java
index c3c51a6..3ed01d9 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java
@@ -10,45 +10,66 @@
/**
* Base class for all adapter builder.
*/
-public class WorkflowAdapterBuilder,
- S1 extends BaseScheduleAdapterBuilder,
- S2 extends BaseAdapterBuilder,
- S3 extends BaseAdapterBuilder> {
+public class WorkflowAdapterBuilder {
- protected S1 scheduleAdapterBuilder;
- protected S2 persistenceAdapterBuilder;
- protected S3 queueAdapterBuilder;
+ protected BaseScheduleAdapterBuilder> scheduleAdapterBuilder;
+ protected BasePersistenceAdapterBuilder> persistenceAdapterBuilder;
+ protected BaseAdapterBuilder, ? extends QueueAdapter> queueAdapterBuilder;
- public WorkflowAdapterBuilder withQueuePollDelayGenerator(
+ public WorkflowAdapterBuilder withQueuePollDelayGenerator(
final PollDelayGenerator pollDelayGenerator) {
this.queueAdapterBuilder.withPollDelayGenerator(pollDelayGenerator);
return this;
}
- public WorkflowAdapterBuilder withSchedulePollDelayGenerator(
+ public WorkflowAdapterBuilder withSchedulePollDelayGenerator(
final PollDelayGenerator pollDelayGenerator) {
this.scheduleAdapterBuilder.withPollDelayGenerator(pollDelayGenerator);
return this;
}
- public WorkflowAdapterBuilder withMaintenancePollDelayGenerator(
+ public WorkflowAdapterBuilder withHeartbeatDelayGenerator(PollDelayGenerator heartbeatDelayGenerator) {
+ this.persistenceAdapterBuilder.withHeartbeatDelayGenerator(heartbeatDelayGenerator);
+ return this;
+ }
+
+ public WorkflowAdapterBuilder withMaintenancePollDelayGenerator(
final PollDelayGenerator pollDelayGenerator
) {
this.scheduleAdapterBuilder.withMaintenanceDelayGenerator(pollDelayGenerator);
return this;
}
- public WorkflowAdapterBuilder withMaxRunDuration(Duration duration) {
+ public WorkflowAdapterBuilder withMaxRunDuration(Duration duration) {
this.scheduleAdapterBuilder.maxRunDuration(duration);
return this;
}
+ public WorkflowAdapterBuilder withScheduleAdapterBuilder(BaseScheduleAdapterBuilder> scheduleAdapterBuilder) {
+ this.scheduleAdapterBuilder = scheduleAdapterBuilder;
+ return this;
+ }
+
+ public WorkflowAdapterBuilder withPersistenceAdapterBuilder(BasePersistenceAdapterBuilder> persistenceAdapterBuilder) {
+ this.persistenceAdapterBuilder = persistenceAdapterBuilder;
+ return this;
+ }
+
+ public WorkflowAdapterBuilder withQueueAdapterBuilder(BaseAdapterBuilder, ? extends QueueAdapter> queueAdapterBuilder) {
+ this.queueAdapterBuilder = queueAdapterBuilder;
+ return this;
+ }
+
/**
* build {@link WorkflowAdapter}.
*
* @return - instance of workflowadaper.
*/
public WorkflowAdapter build() {
+ scheduleAdapterBuilder.validate();
+ queueAdapterBuilder.validate();
+ persistenceAdapterBuilder.validate();
+
return new WorkflowAdapterImpl(scheduleAdapterBuilder.build(),
queueAdapterBuilder.build(),
persistenceAdapterBuilder.build());
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java
index aaa1e72..c4bdc3a 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/Id.java
@@ -1,5 +1,7 @@
package io.github.pavansharma36.workflow.api.bean.id;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.github.pavansharma36.workflow.api.serde.IdSerializer;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -12,6 +14,7 @@
@ToString
@EqualsAndHashCode
@AllArgsConstructor
+@JsonSerialize(using = IdSerializer.class)
public class Id {
/**
* string representation of this id.
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java
index 3ae0ba3..ab2c088 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/id/ManagerId.java
@@ -9,4 +9,8 @@ public class ManagerId extends Id {
public ManagerId() {
super(Utils.random());
}
+
+ public ManagerId(String id) {
+ super(id);
+ }
}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java
index 49f3062..b002c71 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/bean/task/TaskType.java
@@ -1,18 +1,22 @@
package io.github.pavansharma36.workflow.api.bean.task;
import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
+import lombok.Setter;
import lombok.ToString;
/**
* Task type to differentiate queue and executor for submitted tasks.
*/
@Getter
+@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
+@EqualsAndHashCode
public class TaskType {
private int version = 1;
private @NonNull String type;
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java
index 340ff0f..4b31c1e 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java
@@ -4,17 +4,29 @@
import java.util.Map;
import lombok.Builder;
import lombok.Getter;
+import lombok.Setter;
import lombok.extern.jackson.Jacksonized;
/**
* Task executor needs to return result.
*/
@Getter
-@Builder
-@Jacksonized
+@Setter
public class ExecutionResult {
- private final TaskExecutionStatus status;
- private final String message;
- private final Map resultMeta;
- private final TaskId decision;
+ private TaskExecutionStatus status;
+ private String message;
+ private Map resultMeta;
+ private TaskId decision;
+
+ public ExecutionResult() {
+
+ }
+
+ public ExecutionResult(TaskExecutionStatus status, String message, Map resultMeta,
+ TaskId decision) {
+ this.status = status;
+ this.message = message;
+ this.resultMeta = resultMeta;
+ this.decision = decision;
+ }
}
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java
index de37760..ceab70b 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java
@@ -132,9 +132,7 @@ private void run(final WorkflowManager workflowManager) {
break;
} catch (final Throwable e) {
log.error("Unhandled error in task execution {}", e.getMessage(), e);
- executionResult = ExecutionResult.builder()
- .status(TaskExecutionStatus.FAILED_STOP).message(e.getMessage())
- .build();
+ executionResult = new ExecutionResult(TaskExecutionStatus.FAILED_STOP, e.getMessage(), null, null);
force = true;
}
} while (retry++ < taskInfo.getRetryCount());
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java
index 5be7cea..7cf683a 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/WorkflowManagerImpl.java
@@ -81,7 +81,7 @@ public void close() throws IOException {
log.info("Successfully stopped scheduled executor service");
} else {
List list = scheduledExecutorService().shutdownNow();
- log.info("Force shutdown scheduled executor service, dropped {} tasks", list.size());
+ log.warn("Force shutdown scheduled executor service, dropped {} tasks", list.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java
index fc9e2e8..f6308a8 100644
--- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/schedule/Scheduler.java
@@ -71,11 +71,16 @@ public void stop() {
}
}
+ private boolean isNotStopped() {
+ State s = state.get();
+ return s != State.STOPPING && s != State.STOPPED;
+ }
+
private void startMaintenanceLoop(final WorkflowManager workflowManager,
final ScheduledExecutorService scheduledExecutorService) {
boolean result = false;
try {
- if (adapter.scheduleAdapter().isScheduler()) {
+ if (adapter.scheduleAdapter().isScheduler() && isNotStopped()) {
log.info("Clearing all stuck workflows");
Duration maxRunDuration = adapter.scheduleAdapter().maxRunDuration();
List stuckRuns = adapter.persistenceAdapter().getStuckRunInfos(maxRunDuration);
@@ -88,7 +93,7 @@ private void startMaintenanceLoop(final WorkflowManager workflowManager,
adapter.maintenance();
}
} finally {
- if (state.get() != State.STOPPED) {
+ if (isNotStopped()) {
final Duration duration =
adapter.scheduleAdapter().maintenanceDelayGenerator().delay(result);
scheduledExecutorService.schedule(() -> startMaintenanceLoop(workflowManager,
@@ -101,10 +106,12 @@ private void startHeartbeatLoop(final WorkflowManager workflowManager,
final ScheduledExecutorService scheduledExecutorService) {
boolean result = false;
try {
- ManagerInfo managerInfo = workflowManager.info();
- log.info("Updating heartbeat {}", managerInfo.getManagerId());
- managerInfo.setHeartbeatEpoch(System.currentTimeMillis());
- result = adapter.persistenceAdapter().createOrUpdateManagerInfo(managerInfo);
+ if (isNotStopped()) {
+ ManagerInfo managerInfo = workflowManager.info();
+ log.info("Updating heartbeat {}", managerInfo.getManagerId());
+ managerInfo.setHeartbeatEpoch(System.currentTimeMillis());
+ result = adapter.persistenceAdapter().createOrUpdateManagerInfo(managerInfo);
+ }
} finally {
if (state.get() != State.STOPPED) {
final Duration duration = adapter.persistenceAdapter()
@@ -119,7 +126,7 @@ private void startHandleUpdatedRunLoop(final WorkflowManager workflowManager,
final ScheduledExecutorService scheduledExecutorService) {
boolean result = false;
try {
- if (adapter.scheduleAdapter().isScheduler()) {
+ if (adapter.scheduleAdapter().isScheduler() && isNotStopped()) {
try {
result = handleRun(workflowManager);
} catch (final Exception e) {
@@ -129,7 +136,7 @@ private void startHandleUpdatedRunLoop(final WorkflowManager workflowManager,
log.debug("Not scheduler");
}
} finally {
- if (state.get() != State.STOPPED) {
+ if (isNotStopped()) {
final Duration duration = adapter.scheduleAdapter().pollDelayGenerator().delay(result);
scheduledExecutorService.schedule(() ->
startHandleUpdatedRunLoop(workflowManager, scheduledExecutorService),
@@ -229,7 +236,7 @@ private void updateRun(final WorkflowManager workflowManager, final RunId runId,
} else {
adapter.persistenceAdapter().completeTask(
ExecutableTask.builder().runId(runId).taskId(tid).build(),
- ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS).build());
+ new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null));
taskInfo.setCompletionTimeEpoch(System.currentTimeMillis());
adapter.queueAdapter().pushUpdatedRun(runId);
@@ -268,8 +275,7 @@ private void ignoreAllChildrenTasks(final WorkflowManager workflowManager, final
final RunId runId = runInfo.getRunId();
log.info("Ignoring task {}", taskId);
- ExecutionResult result =
- ExecutionResult.builder().message(message).status(TaskExecutionStatus.IGNORED).build();
+ ExecutionResult result = new ExecutionResult(TaskExecutionStatus.IGNORED, message, null, null);
adapter.persistenceAdapter()
.completeTask(ExecutableTask.builder().runId(runId).taskId(taskId).build(), result);
diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java
new file mode 100644
index 0000000..b647d8f
--- /dev/null
+++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java
@@ -0,0 +1,16 @@
+package io.github.pavansharma36.workflow.api.serde;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import io.github.pavansharma36.workflow.api.bean.id.Id;
+import java.io.IOException;
+
+public class IdSerializer extends JsonSerializer {
+ @Override
+ public void serialize(Id id, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+ throws IOException {
+ jsonGenerator.writeString(id.getId());
+ }
+}
diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java
index fdbaed6..c47eea9 100644
--- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java
+++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/BaseTest.java
@@ -3,12 +3,13 @@
import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
import io.github.pavansharma36.workflow.api.bean.task.Task;
import io.github.pavansharma36.workflow.api.impl.WorkflowManagerBuilder;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
import io.github.pavansharma36.workflow.api.serde.JacksonTaskLoader;
import io.github.pavansharma36.workflow.api.util.WorkflowException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
-import lombok.extern.slf4j.Slf4j;
+import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,6 +17,12 @@ public abstract class BaseTest {
protected final Logger log = LoggerFactory.getLogger(getClass());
+ @Rule
+ public final WorkflowTestRule rule = rule();
+
+
+ protected abstract WorkflowTestRule rule();
+
protected WorkflowManagerBuilder builder() {
log.warn("Building workflow manager");
return WorkflowManagerBuilder.builder()
@@ -40,6 +47,8 @@ protected void closeWorkflow(WorkflowManager workflowManager) {
}
}
- protected abstract WorkflowAdapter adapter();
+ protected WorkflowAdapter adapter() {
+ return rule.adapter();
+ }
}
diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java
index 9688b50..4b6e3e5 100644
--- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java
+++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/NormalTest.java
@@ -12,6 +12,7 @@
import io.github.pavansharma36.workflow.api.executor.ExecutionResult;
import io.github.pavansharma36.workflow.api.executor.TaskExecutionStatus;
import io.github.pavansharma36.workflow.api.executor.TaskExecutor;
+import io.github.pavansharma36.workflow.api.helper.TestTaskExecutor;
import io.github.pavansharma36.workflow.api.model.RunInfo;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,7 +40,7 @@ public void testFailedStop() throws Exception {
@Override
public ExecutionResult execute(WorkflowManager manager, ExecutableTask task) {
if (task.getTaskId().getId().equals("task3")) {
- return ExecutionResult.builder().status(TaskExecutionStatus.FAILED_STOP).build();
+ return new ExecutionResult(TaskExecutionStatus.FAILED_STOP, null, null, null);
}
return super.execute(manager, task);
}
@@ -102,7 +103,7 @@ public void testCanceling() throws Exception {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null);
};
TaskType taskType = new TaskType(1, "test");
WorkflowManager workflowManager = builder()
@@ -187,6 +188,9 @@ public void testMultiClientSimple() throws Exception {
Assert.assertEquals(sets, expectedSets);
taskExecutor.getChecker().assertNoDuplicates();
+ } catch (Exception e) {
+ log.error("Error {}", e.getMessage(), e);
+ throw e;
} finally {
workflowManagers.forEach(this::closeWorkflow);
}
@@ -197,13 +201,17 @@ public void testNoData() throws Exception {
WorkflowManager workflowManager = builder()
.addingTaskExecutor(new TaskType(1, "test"), 10, new TestTaskExecutor(1))
.build();
- workflowManager.start();
+ try {
+ workflowManager.start();
- Thread.sleep(1000L);
+ Thread.sleep(1000L);
- Optional taskData =
- workflowManager.getTaskExecutionResult(new RunId(), new TaskId());
- Assert.assertFalse(taskData.isPresent());
+ Optional taskData =
+ workflowManager.getTaskExecutionResult(new RunId(), new TaskId());
+ Assert.assertFalse(taskData.isPresent());
+ } finally {
+ closeWorkflow(workflowManager);
+ }
}
@Test
@@ -216,8 +224,7 @@ public void testTaskData() throws Exception {
Map resultData = new HashMap<>();
resultData.put("one", "1");
resultData.put("two", "2");
- return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS)
- .message("").resultMeta(resultData).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, "", resultData, null);
};
TaskType taskType = new TaskType(1, "test");
WorkflowManager workflowManager = builder()
@@ -266,8 +273,7 @@ public void testSubTask() throws Exception {
}
RunId subTaskRunId = task.getTaskId().equals(groupAParent.getId())
? workflowManager.submit(groupBTask) : null;
- return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS)
- .message("test").resultMeta(new HashMap<>()).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, "test", new HashMap<>(), null);
};
WorkflowManager workflowManager = builder()
.addingTaskExecutor(taskType, 10, taskExecutor)
@@ -334,22 +340,19 @@ public void testMultiTypes() throws Exception {
BlockingQueue queue1 = new LinkedBlockingDeque<>();
TaskExecutor taskExecutor1 = (manager, task) -> {
queue1.add(task.getTaskId());
- return ExecutionResult.builder()
- .status(TaskExecutionStatus.SUCCESS).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null);
};
BlockingQueue queue2 = new LinkedBlockingDeque<>();
TaskExecutor taskExecutor2 = (manager, task) -> {
queue2.add(task.getTaskId());
- return ExecutionResult.builder()
- .status(TaskExecutionStatus.SUCCESS).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null);
};
BlockingQueue queue3 = new LinkedBlockingDeque<>();
TaskExecutor taskExecutor3 = (manager, task) -> {
queue3.add(task.getTaskId());
- return ExecutionResult.builder()
- .status(TaskExecutionStatus.SUCCESS).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null);
};
WorkflowManager workflowManager = builder()
@@ -389,6 +392,4 @@ public void testMultiTypes() throws Exception {
}
}
- protected abstract WorkflowAdapter adapter();
-
}
diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java
index bcbdb2a..0f0c911 100644
--- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java
+++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/PerformanceTest.java
@@ -6,6 +6,7 @@
import io.github.pavansharma36.workflow.api.bean.id.TaskId;
import io.github.pavansharma36.workflow.api.bean.task.Task;
import io.github.pavansharma36.workflow.api.bean.task.TaskType;
+import io.github.pavansharma36.workflow.api.helper.TestTaskExecutor;
import io.github.pavansharma36.workflow.api.util.RoundRobinIterator;
import java.util.LinkedList;
import java.util.List;
diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java
index 064b6c8..14c73f0 100644
--- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java
+++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/WorkflowListenerTest.java
@@ -6,6 +6,7 @@
import io.github.pavansharma36.workflow.api.bean.task.Task;
import io.github.pavansharma36.workflow.api.bean.task.TaskType;
import io.github.pavansharma36.workflow.api.bean.task.impl.SimpleTask;
+import io.github.pavansharma36.workflow.api.helper.TestTaskExecutor;
import java.util.HashSet;
import java.util.Set;
import org.junit.Assert;
diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/ConcurrentTaskChecker.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/ConcurrentTaskChecker.java
similarity index 72%
rename from workflow-api/src/test/java/io/github/pavansharma36/workflow/api/ConcurrentTaskChecker.java
rename to workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/ConcurrentTaskChecker.java
index c99a396..99f5dbf 100644
--- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/ConcurrentTaskChecker.java
+++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/ConcurrentTaskChecker.java
@@ -1,4 +1,4 @@
-package io.github.pavansharma36.workflow.api;
+package io.github.pavansharma36.workflow.api.helper;
import io.github.pavansharma36.workflow.api.bean.id.RunId;
import io.github.pavansharma36.workflow.api.bean.id.TaskId;
@@ -9,26 +9,26 @@
import org.junit.Assert;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair;
-class ConcurrentTaskChecker {
+public class ConcurrentTaskChecker {
private final Set currentSet = new HashSet<>();
private final List> all = new ArrayList<>();
private int count = 0;
private final List> sets = new ArrayList<>();
- synchronized void reset() {
+ public synchronized void reset() {
currentSet.clear();
all.clear();
sets.clear();
count = 0;
}
- synchronized void add(RunId runId, TaskId taskId) {
+ public synchronized void add(RunId runId, TaskId taskId) {
all.add(Pair.of(runId, taskId));
currentSet.add(taskId);
++count;
}
- synchronized void decrement() {
+ public synchronized void decrement() {
if (--count == 0) {
HashSet copy = new HashSet<>(currentSet);
currentSet.clear();
@@ -37,15 +37,15 @@ synchronized void decrement() {
}
}
- synchronized List> getSets() {
+ public synchronized List> getSets() {
return new ArrayList<>(sets);
}
- synchronized List> getAll() {
+ public synchronized List> getAll() {
return new ArrayList<>(all);
}
- synchronized void assertNoDuplicates() {
+ public synchronized void assertNoDuplicates() {
Assert.assertEquals(all.size(), new HashSet<>(all).size()); // no dups
}
}
diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/TestTaskExecutor.java
similarity index 86%
rename from workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java
rename to workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/TestTaskExecutor.java
index 31f96a0..e56bfea 100644
--- a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/TestTaskExecutor.java
+++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/helper/TestTaskExecutor.java
@@ -1,9 +1,11 @@
-package io.github.pavansharma36.workflow.api;
+package io.github.pavansharma36.workflow.api.helper;
+import io.github.pavansharma36.workflow.api.WorkflowManager;
import io.github.pavansharma36.workflow.api.executor.ExecutableTask;
import io.github.pavansharma36.workflow.api.executor.ExecutionResult;
import io.github.pavansharma36.workflow.api.executor.TaskExecutionStatus;
import io.github.pavansharma36.workflow.api.executor.TaskExecutor;
+import io.github.pavansharma36.workflow.api.helper.ConcurrentTaskChecker;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
@@ -52,9 +54,7 @@ public ExecutionResult execute(WorkflowManager manager, ExecutableTask task) {
checker.decrement();
latch.countDown();
}
- return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS)
- .message("hey")
- .resultMeta(new HashMap<>()).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, "hey", new HashMap<>(), null);
}
diff --git a/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/junit/WorkflowTestRule.java b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/junit/WorkflowTestRule.java
new file mode 100644
index 0000000..301eda7
--- /dev/null
+++ b/workflow-api/src/test/java/io/github/pavansharma36/workflow/api/junit/WorkflowTestRule.java
@@ -0,0 +1,37 @@
+package io.github.pavansharma36.workflow.api.junit;
+
+import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
+import java.util.LinkedList;
+import java.util.List;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.MultipleFailureException;
+import org.junit.runners.model.Statement;
+
+public abstract class WorkflowTestRule implements TestRule {
+
+ public abstract WorkflowAdapter adapter();
+
+ @Override
+ public Statement apply(Statement statement, Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ List errors = new LinkedList<>();
+ try {
+ pre();
+ statement.evaluate();
+ } catch (Throwable e) {
+ errors.add(e);
+ } finally {
+ post();
+ }
+ MultipleFailureException.assertEmpty(errors);
+ }
+ };
+ }
+
+ protected abstract void pre();
+
+ protected abstract void post();
+}
diff --git a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java
index 6b60f95..b488588 100644
--- a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java
+++ b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java
@@ -63,7 +63,7 @@ public static void main(final String[] args) throws InterruptedException, IOExce
final TaskExecutor te = (w, t) -> {
log.info("Executing {}", t.getTaskType());
Utils.sleep(Duration.ofMillis(10));
- return ExecutionResult.builder().status(TaskExecutionStatus.SUCCESS).build();
+ return new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, null);
};
final JedisPool jedisPool = new JedisPool();
@@ -77,9 +77,7 @@ public static void main(final String[] args) throws InterruptedException, IOExce
final WorkflowManager workflowManager = WorkflowManagerBuilder.builder().withAdapter(adapter)
.addingTaskExecutor(taskTypeA, 2, te).addingTaskExecutor(taskTypeB, 2, te)
.addingTaskExecutor(taskTypeC, 2, te).addingTaskExecutor(decisionType, 1,
- (manager, task) -> ExecutionResult.builder()
- .status(TaskExecutionStatus.SUCCESS).decision(new TaskId("taske"))
- .build()).build();
+ (manager, task) -> new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, new TaskId("taske"))).build();
CountDownLatch countDownLatch = new CountDownLatch(SUBMIT_COUNT);
workflowManager.workflowManagerListener().addListener(new WorkflowListener() {
@@ -120,5 +118,6 @@ public void onTaskEvent(TaskEvent event) {
System.currentTimeMillis() - startTimeMillis);
workflowManager.close();
+ jedisPool.close();
}
}
diff --git a/workflow-inmemory/pom.xml b/workflow-inmemory/pom.xml
new file mode 100644
index 0000000..49a41eb
--- /dev/null
+++ b/workflow-inmemory/pom.xml
@@ -0,0 +1,28 @@
+
+
+ workflow
+ io.github.pavansharma36
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ workflow-inmemory
+ jar
+
+
+
+ io.github.pavansharma36
+ workflow-api
+ ${project.version}
+
+
+
+ io.github.pavansharma36
+ workflow-api
+ ${project.version}
+ test
+ test-jar
+
+
+
diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java
new file mode 100644
index 0000000..70573e3
--- /dev/null
+++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java
@@ -0,0 +1,146 @@
+package io.github.pavansharma36.workflow.inmemory.adapter;
+
+import io.github.pavansharma36.workflow.api.WorkflowManager;
+import io.github.pavansharma36.workflow.api.adapter.base.BasePersistenceAdapter;
+import io.github.pavansharma36.workflow.api.bean.id.ManagerId;
+import io.github.pavansharma36.workflow.api.bean.id.RunId;
+import io.github.pavansharma36.workflow.api.bean.id.TaskId;
+import io.github.pavansharma36.workflow.api.executor.ExecutableTask;
+import io.github.pavansharma36.workflow.api.executor.ExecutionResult;
+import io.github.pavansharma36.workflow.api.model.ManagerInfo;
+import io.github.pavansharma36.workflow.api.model.RunInfo;
+import io.github.pavansharma36.workflow.api.model.TaskInfo;
+import io.github.pavansharma36.workflow.api.serde.Serde;
+import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class InmemoryPersistenceAdapter extends BasePersistenceAdapter {
+
+ private Map managerInfos = new HashMap<>();
+ private Map runInfos = new HashMap<>();
+ private Map> taskInfos = new HashMap<>();
+
+ public InmemoryPersistenceAdapter(String namespace, PollDelayGenerator heartbeatDelayGenerator) {
+ super(namespace, heartbeatDelayGenerator);
+ }
+
+ @Override
+ public void start(WorkflowManager workflowManager) {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public boolean createOrUpdateManagerInfo(ManagerInfo managerInfo) {
+ managerInfos.computeIfAbsent(managerInfo.getManagerId(), managerId -> managerInfo);
+ return true;
+ }
+
+ @Override
+ public List getAllManagerInfos() {
+ return new ArrayList<>(managerInfos.values());
+ }
+
+ @Override
+ public boolean removeManagerInfo(ManagerId id) {
+ return managerInfos.remove(id) != null;
+ }
+
+ @Override
+ public boolean updateQueuedTime(RunId runId, TaskId taskId) {
+ taskInfos.computeIfPresent(runId, (i, taskInfoMap) -> {
+ taskInfoMap.computeIfPresent(taskId, (j, task) -> {
+ task.setQueuedTimeEpoch(System.currentTimeMillis());
+ return task;
+ });
+ return taskInfoMap;
+ });
+ return true;
+ }
+
+ @Override
+ public boolean updateStartTime(RunId runId) {
+ runInfos.computeIfPresent(runId, (i, run) -> {
+ run.setStartTimeEpoch(System.currentTimeMillis());
+ return run;
+ });
+ return true;
+ }
+
+ @Override
+ public boolean updateStartTime(RunId runId, TaskId taskId, ManagerId processedBy) {
+ taskInfos.computeIfPresent(runId, (i, taskInfoMap) -> {
+ taskInfoMap.computeIfPresent(taskId, (j, task) -> {
+ task.setStartTimeEpoch(System.currentTimeMillis());
+ task.setProcessedBy(processedBy);
+ return task;
+ });
+ return taskInfoMap;
+ });
+ return true;
+ }
+
+ @Override
+ public boolean completeTask(ExecutableTask executableTask, ExecutionResult executionResult) {
+ taskInfos.computeIfPresent(executableTask.getRunId(), (i, tasks) -> {
+ tasks.computeIfPresent(executableTask.getTaskId(), (j, task) -> {
+ task.setCompletionTimeEpoch(System.currentTimeMillis());
+ task.setResult(executionResult);
+ return task;
+ });
+ return tasks;
+ });
+ return true;
+ }
+
+ @Override
+ public Optional getTaskInfo(RunId runId, TaskId taskId) {
+ return Optional.ofNullable(taskInfos.getOrDefault(runId, Collections.emptyMap()).get(taskId));
+ }
+
+ @Override
+ public Optional getRunInfo(RunId runId) {
+ return Optional.ofNullable(runInfos.get(runId));
+ }
+
+ @Override
+ public void createRunInfo(RunInfo runInfo) {
+ runInfos.put(runInfo.getRunId(), runInfo);
+ }
+
+ @Override
+ public boolean updateRunInfoEpoch(RunId runId) {
+ runInfos.computeIfPresent(runId, (i, run) -> {
+ run.setLastUpdateEpoch(System.currentTimeMillis());
+ return run;
+ });
+ return true;
+ }
+
+ @Override
+ public void createTaskInfos(RunId runId, List taskInfos) {
+ this.taskInfos.put(runId, taskInfos.stream().collect(Collectors.toMap(TaskInfo::getTaskId, t -> t)));
+ }
+
+ @Override
+ public boolean cleanup(RunId runId) {
+ return taskInfos.remove(runId) != null && runInfos.remove(runId) != null;
+ }
+
+ @Override
+ public List getStuckRunInfos(Duration maxDuration) {
+ // in memory dont stuck.
+ return Collections.emptyList();
+ }
+}
diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java
new file mode 100644
index 0000000..ea79e55
--- /dev/null
+++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java
@@ -0,0 +1,73 @@
+package io.github.pavansharma36.workflow.inmemory.adapter;
+
+import io.github.pavansharma36.workflow.api.WorkflowManager;
+import io.github.pavansharma36.workflow.api.adapter.QueueAdapter;
+import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
+import io.github.pavansharma36.workflow.api.bean.id.RunId;
+import io.github.pavansharma36.workflow.api.bean.task.TaskType;
+import io.github.pavansharma36.workflow.api.executor.ExecutableTask;
+import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class InmemoryQueueAdapter implements QueueAdapter {
+
+ private final LinkedList updatedRunQueue = new LinkedList<>();
+ private final Map> queue = new HashMap<>();
+
+ private final PollDelayGenerator pollDelayGenerator;
+
+ @Override
+ public void start(WorkflowManager workflowManager) {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void maintenance(WorkflowAdapter workflowAdapter) {
+
+ }
+
+ @Override
+ public PollDelayGenerator pollDelayGenerator() {
+ return pollDelayGenerator;
+ }
+
+ @Override
+ public void pushTask(ExecutableTask task) {
+ queue.computeIfAbsent(task.getTaskType(), i -> new LinkedList<>()).offer(task);
+ }
+
+ @Override
+ public Optional pollTask(TaskType taskType) {
+ return Optional.ofNullable(queue.computeIfAbsent(taskType, i -> new LinkedList<>()).poll());
+ }
+
+ @Override
+ public boolean commitTaskProcessed(ExecutableTask task) {
+ return true;
+ }
+
+ @Override
+ public void pushUpdatedRun(RunId runId) {
+ updatedRunQueue.offer(runId);
+ }
+
+ @Override
+ public Optional pollUpdatedRun() {
+ return Optional.ofNullable(updatedRunQueue.poll());
+ }
+
+ @Override
+ public boolean commitUpdatedRunProcess(RunId runId) {
+ return true;
+ }
+}
diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java
new file mode 100644
index 0000000..83e3627
--- /dev/null
+++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java
@@ -0,0 +1,50 @@
+package io.github.pavansharma36.workflow.inmemory.adapter;
+
+import io.github.pavansharma36.workflow.api.WorkflowManager;
+import io.github.pavansharma36.workflow.api.adapter.ScheduleAdapter;
+import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
+import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
+import java.time.Duration;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class InmemorySchedulerAdapter implements ScheduleAdapter {
+
+ private final PollDelayGenerator pollDelayGenerator;
+ private final PollDelayGenerator maintenanceDelayGenerator;
+
+ @Override
+ public void start(WorkflowManager workflowManager) {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void maintenance(WorkflowAdapter workflowAdapter) {
+
+ }
+
+ @Override
+ public PollDelayGenerator pollDelayGenerator() {
+ return pollDelayGenerator;
+ }
+
+ @Override
+ public PollDelayGenerator maintenanceDelayGenerator() {
+ return maintenanceDelayGenerator;
+ }
+
+ @Override
+ public Duration maxRunDuration() {
+ return Duration.ofDays(7L);
+ }
+
+ @Override
+ public boolean isScheduler() {
+ return true;
+ }
+}
diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java
new file mode 100644
index 0000000..b5beccd
--- /dev/null
+++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java
@@ -0,0 +1,24 @@
+package io.github.pavansharma36.workflow.inmemory.builder;
+
+import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter;
+import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder;
+import io.github.pavansharma36.workflow.api.util.WorkflowException;
+import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryPersistenceAdapter;
+
+public class InmemoryPersistenceAdapterBuilder
+ extends BasePersistenceAdapterBuilder {
+
+ public InmemoryPersistenceAdapterBuilder() {
+ namespace = "NA";
+ }
+ @Override
+ public PersistenceAdapter build() {
+ return new InmemoryPersistenceAdapter(namespace, pollDelayGenerator);
+ }
+
+ @Override
+ public InmemoryPersistenceAdapterBuilder withNamespace(String namespace) {
+ throw new WorkflowException("Namespace is not supported for inmemory");
+ }
+
+}
diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java
new file mode 100644
index 0000000..68a3f7e
--- /dev/null
+++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java
@@ -0,0 +1,23 @@
+package io.github.pavansharma36.workflow.inmemory.builder;
+
+import io.github.pavansharma36.workflow.api.adapter.QueueAdapter;
+import io.github.pavansharma36.workflow.api.adapter.builder.BaseAdapterBuilder;
+import io.github.pavansharma36.workflow.api.util.WorkflowException;
+import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryQueueAdapter;
+
+public class InmemoryQueueAdapterBuilder
+ extends BaseAdapterBuilder {
+
+ public InmemoryQueueAdapterBuilder() {
+ namespace = "NA";
+ }
+ @Override
+ public QueueAdapter build() {
+ return new InmemoryQueueAdapter(pollDelayGenerator);
+ }
+
+ @Override
+ public InmemoryQueueAdapterBuilder withNamespace(String namespace) {
+ throw new WorkflowException("Namespace is not supported for inmemory");
+ }
+}
diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java
new file mode 100644
index 0000000..b110c69
--- /dev/null
+++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java
@@ -0,0 +1,23 @@
+package io.github.pavansharma36.workflow.inmemory.builder;
+
+import io.github.pavansharma36.workflow.api.adapter.ScheduleAdapter;
+import io.github.pavansharma36.workflow.api.adapter.builder.BaseScheduleAdapterBuilder;
+import io.github.pavansharma36.workflow.api.util.WorkflowException;
+import io.github.pavansharma36.workflow.inmemory.adapter.InmemorySchedulerAdapter;
+
+public class InmemoryScheduleAdapterBuilder extends BaseScheduleAdapterBuilder {
+
+ public InmemoryScheduleAdapterBuilder() {
+ namespace = "NA";
+ }
+
+ @Override
+ public ScheduleAdapter build() {
+ return new InmemorySchedulerAdapter(pollDelayGenerator, maintenanceDelayGenerator);
+ }
+
+ @Override
+ public InmemoryScheduleAdapterBuilder withNamespace(String namespace) {
+ throw new WorkflowException("Namespace is not supported for inmemory");
+ }
+}
diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java
new file mode 100644
index 0000000..59b9303
--- /dev/null
+++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java
@@ -0,0 +1,16 @@
+package io.github.pavansharma36.workflow.inmemory.builder;
+
+import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder;
+
+public class InmemoryWorkflowAdapterBuilder
+ extends WorkflowAdapterBuilder {
+
+
+ public static InmemoryWorkflowAdapterBuilder builder() {
+ InmemoryWorkflowAdapterBuilder builder = new InmemoryWorkflowAdapterBuilder();
+ builder.scheduleAdapterBuilder = new InmemoryScheduleAdapterBuilder();
+ builder.queueAdapterBuilder = new InmemoryQueueAdapterBuilder();
+ builder.persistenceAdapterBuilder = new InmemoryPersistenceAdapterBuilder();
+ return builder;
+ }
+}
diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryNormalTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryNormalTest.java
new file mode 100644
index 0000000..1ef98d0
--- /dev/null
+++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryNormalTest.java
@@ -0,0 +1,12 @@
+package io.github.pavansharma36.workflow.inmemory;
+
+import io.github.pavansharma36.workflow.api.NormalTest;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.inmemory.rule.InmemoryRule;
+
+public class InmemoryNormalTest extends NormalTest {
+ @Override
+ protected WorkflowTestRule rule() {
+ return new InmemoryRule();
+ }
+}
diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryPerformanceTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryPerformanceTest.java
new file mode 100644
index 0000000..3a1cf25
--- /dev/null
+++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryPerformanceTest.java
@@ -0,0 +1,12 @@
+package io.github.pavansharma36.workflow.inmemory;
+
+import io.github.pavansharma36.workflow.api.PerformanceTest;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.inmemory.rule.InmemoryRule;
+
+public class InmemoryPerformanceTest extends PerformanceTest {
+ @Override
+ protected WorkflowTestRule rule() {
+ return new InmemoryRule();
+ }
+}
diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryWorkflowListenerTest.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryWorkflowListenerTest.java
new file mode 100644
index 0000000..d076089
--- /dev/null
+++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/InmemoryWorkflowListenerTest.java
@@ -0,0 +1,12 @@
+package io.github.pavansharma36.workflow.inmemory;
+
+import io.github.pavansharma36.workflow.api.WorkflowListenerTest;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.inmemory.rule.InmemoryRule;
+
+public class InmemoryWorkflowListenerTest extends WorkflowListenerTest {
+ @Override
+ protected WorkflowTestRule rule() {
+ return new InmemoryRule();
+ }
+}
diff --git a/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/rule/InmemoryRule.java b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/rule/InmemoryRule.java
new file mode 100644
index 0000000..97852b6
--- /dev/null
+++ b/workflow-inmemory/src/test/java/io/github/pavansharma36/workflow/inmemory/rule/InmemoryRule.java
@@ -0,0 +1,28 @@
+package io.github.pavansharma36.workflow.inmemory.rule;
+
+import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator;
+import io.github.pavansharma36.workflow.inmemory.builder.InmemoryWorkflowAdapterBuilder;
+import java.time.Duration;
+
+public class InmemoryRule extends WorkflowTestRule {
+ @Override
+ public WorkflowAdapter adapter() {
+ return InmemoryWorkflowAdapterBuilder.builder()
+ .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
+ .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
+ .build();
+ }
+
+
+ @Override
+ protected void pre() {
+ // Nothing to do
+ }
+
+ @Override
+ protected void post() {
+ // Nothing to do
+ }
+}
diff --git a/workflow-jdbc/pom.xml b/workflow-jdbc/pom.xml
new file mode 100644
index 0000000..179fa1e
--- /dev/null
+++ b/workflow-jdbc/pom.xml
@@ -0,0 +1,16 @@
+
+
+ workflow
+ io.github.pavansharma36
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ workflow-jdbc
+ pom
+
+ workflow-jdbc-common
+
+
+
diff --git a/workflow-jdbc/workflow-jdbc-common/pom.xml b/workflow-jdbc/workflow-jdbc-common/pom.xml
new file mode 100644
index 0000000..b69e88f
--- /dev/null
+++ b/workflow-jdbc/workflow-jdbc-common/pom.xml
@@ -0,0 +1,21 @@
+
+
+ workflow-jdbc
+ io.github.pavansharma36
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ workflow-jdbc-common
+ jar
+
+
+
+ io.github.pavansharma36
+ workflow-api
+ ${project.version}
+
+
+
+
diff --git a/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java b/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java
new file mode 100644
index 0000000..9ecae98
--- /dev/null
+++ b/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java
@@ -0,0 +1,13 @@
+package io.github.pavansharma36;
+
+/**
+ * Hello world!
+ *
+ */
+public class App
+{
+ public static void main( String[] args )
+ {
+ System.out.println( "Hello World!" );
+ }
+}
diff --git a/workflow-jdbc/workflow-jdbc-common/src/test/java/io/github/pavansharma36/AppTest.java b/workflow-jdbc/workflow-jdbc-common/src/test/java/io/github/pavansharma36/AppTest.java
new file mode 100644
index 0000000..98d2799
--- /dev/null
+++ b/workflow-jdbc/workflow-jdbc-common/src/test/java/io/github/pavansharma36/AppTest.java
@@ -0,0 +1,38 @@
+package io.github.pavansharma36;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/workflow-jedis/pom.xml b/workflow-jedis/pom.xml
index 5bb60b0..4f7ccbd 100644
--- a/workflow-jedis/pom.xml
+++ b/workflow-jedis/pom.xml
@@ -11,10 +11,6 @@
http://maven.apache.org
Module with WorkflowManager api implementation using Jedis
-
- UTF-8
- 3.7.1
-
@@ -26,7 +22,6 @@
redis.clients
jedis
- ${jedis.version}
diff --git a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java
index 94688f2..262a282 100644
--- a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java
+++ b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisPersistenceAdapterBuilder.java
@@ -2,6 +2,7 @@
import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter;
import io.github.pavansharma36.workflow.api.adapter.builder.BaseAdapterBuilder;
+import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder;
import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator;
import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
import io.github.pavansharma36.workflow.api.util.WorkflowException;
@@ -18,7 +19,7 @@
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class JedisPersistenceAdapterBuilder
- extends BaseAdapterBuilder {
+ extends BasePersistenceAdapterBuilder {
private JedisPool jedis;
@@ -31,12 +32,6 @@ public JedisPersistenceAdapterBuilder withJedisPool(final JedisPool pool) {
return this;
}
- public JedisPersistenceAdapterBuilder heartbeatDelayGenerator(
- @NonNull final PollDelayGenerator heartbeatDelayGenerator) {
- this.pollDelayGenerator = heartbeatDelayGenerator;
- return this;
- }
-
/**
* Build {@link io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter}
* with given details.
diff --git a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java
index e559473..be3f8e7 100644
--- a/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java
+++ b/workflow-jedis/src/main/java/io/github/pavansharma36/workflow/jedis/adapter/builder/JedisWorkflowAdapterBuilder.java
@@ -12,8 +12,7 @@
* Builder class to build redis based {@link WorkflowAdapter}.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class JedisWorkflowAdapterBuilder extends WorkflowAdapterBuilder {
+public class JedisWorkflowAdapterBuilder extends WorkflowAdapterBuilder {
public static JedisWorkflowAdapterBuilder builder(final JedisPool jedisPool,
final String namespace) {
diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java
index 5fc7d98..0a0f146 100644
--- a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java
+++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisNormalTest.java
@@ -1,42 +1,14 @@
package io.github.pavansharma36.workflow.jedis;
import io.github.pavansharma36.workflow.api.NormalTest;
-import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
-import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator;
-import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder;
-import java.time.Duration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.DockerImageName;
-import redis.clients.jedis.JedisPool;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.jedis.rule.JedisRule;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class JedisNormalTest extends NormalTest {
-
- @Rule
- public GenericContainer redis = new GenericContainer(DockerImageName.parse("redis"))
- .withExposedPorts(6379);
-
- private JedisPool jedisPool = null;
-
- @Before
- public void init() {
- jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
- }
-
- @After
- public void tearDown() {
- jedisPool.close();
- }
-
@Override
- protected WorkflowAdapter adapter() {
- final String namespace = "test";
- //final JedisPool jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
- return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace)
- .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
- .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
- .build();
+ protected WorkflowTestRule rule() {
+ return new JedisRule();
}
}
diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java
index 93d98a5..81e74ae 100644
--- a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java
+++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisPerformanceTest.java
@@ -1,45 +1,13 @@
package io.github.pavansharma36.workflow.jedis;
import io.github.pavansharma36.workflow.api.PerformanceTest;
-import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
-import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator;
-import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder;
-import java.time.Duration;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.DockerImageName;
-import redis.clients.jedis.JedisPool;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.jedis.rule.JedisRule;
-@Slf4j
public class JedisPerformanceTest extends PerformanceTest {
- @Rule
- public GenericContainer redis = new GenericContainer(DockerImageName.parse("redis"))
- .withExposedPorts(6379);
-
- private JedisPool jedisPool = null;
-
- @Before
- public void init() {
- log.warn("Initializing jedis pool");
- jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
- }
-
- @After
- public void tearDown() {
- log.warn("Closing jedis pool");
- jedisPool.close();
- }
@Override
- protected WorkflowAdapter adapter() {
- final String namespace = "test";
- //final JedisPool jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
- return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace)
- .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(10L)))
- .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(10L)))
- .build();
+ protected WorkflowTestRule rule() {
+ return new JedisRule();
}
}
diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java
index e12d686..962c4bb 100644
--- a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java
+++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/JedisWorkflowListenerTest.java
@@ -1,42 +1,12 @@
package io.github.pavansharma36.workflow.jedis;
import io.github.pavansharma36.workflow.api.WorkflowListenerTest;
-import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
-import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator;
-import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder;
-import java.time.Duration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.DockerImageName;
-import redis.clients.jedis.JedisPool;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.jedis.rule.JedisRule;
public class JedisWorkflowListenerTest extends WorkflowListenerTest {
-
- @Rule
- public GenericContainer redis = new GenericContainer(DockerImageName.parse("redis"))
- .withExposedPorts(6379);
-
- private JedisPool jedisPool = null;
-
- @Before
- public void init() {
- jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
- }
-
- @After
- public void tearDown() {
- jedisPool.close();
- }
-
@Override
- protected WorkflowAdapter adapter() {
- final String namespace = "test";
- return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace)
- .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
- .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
- .build();
+ protected WorkflowTestRule rule() {
+ return new JedisRule();
}
-
}
diff --git a/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/rule/JedisRule.java b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/rule/JedisRule.java
new file mode 100644
index 0000000..89387ff
--- /dev/null
+++ b/workflow-jedis/src/test/java/io/github/pavansharma36/workflow/jedis/rule/JedisRule.java
@@ -0,0 +1,40 @@
+package io.github.pavansharma36.workflow.jedis.rule;
+
+import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator;
+import io.github.pavansharma36.workflow.jedis.adapter.builder.JedisWorkflowAdapterBuilder;
+import java.time.Duration;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.JedisPool;
+
+public class JedisRule extends WorkflowTestRule {
+
+ private GenericContainer redis = new GenericContainer(DockerImageName.parse("redis"))
+ .withExposedPorts(6379);
+ private JedisPool jedisPool = null;
+
+ @Override
+ public WorkflowAdapter adapter() {
+ final String namespace = "test";
+ //final JedisPool jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
+ return JedisWorkflowAdapterBuilder.builder(jedisPool, namespace)
+ .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
+ .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
+ .build();
+ }
+
+ @Override
+ protected void pre() {
+ redis.start();
+ jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
+ }
+
+ @Override
+ protected void post() {
+ jedisPool.close();
+ redis.stop();
+ }
+
+}
diff --git a/workflow-mongodb/pom.xml b/workflow-mongodb/pom.xml
new file mode 100644
index 0000000..82f7651
--- /dev/null
+++ b/workflow-mongodb/pom.xml
@@ -0,0 +1,46 @@
+
+
+ workflow
+ io.github.pavansharma36
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ workflow-mongodb
+ jar
+
+
+
+ io.github.pavansharma36
+ workflow-api
+ ${project.version}
+
+
+
+ org.mongodb
+ mongodb-driver-sync
+
+
+
+ io.github.pavansharma36
+ workflow-api
+ ${project.version}
+ test-jar
+ test
+
+
+
+ io.github.pavansharma36
+ workflow-inmemory
+ ${project.version}
+ test
+
+
+ org.testcontainers
+ mongodb
+ ${test.containers.version}
+ test
+
+
+
diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java
new file mode 100644
index 0000000..9ecae98
--- /dev/null
+++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java
@@ -0,0 +1,13 @@
+package io.github.pavansharma36;
+
+/**
+ * Hello world!
+ *
+ */
+public class App
+{
+ public static void main( String[] args )
+ {
+ System.out.println( "Hello World!" );
+ }
+}
diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java
new file mode 100644
index 0000000..21b95f5
--- /dev/null
+++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java
@@ -0,0 +1,216 @@
+package io.github.pavansharma36.workflow.mongodb.adapter;
+
+import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
+import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
+import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
+import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.Updates;
+import io.github.pavansharma36.workflow.api.WorkflowManager;
+import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter;
+import io.github.pavansharma36.workflow.api.adapter.base.BasePersistenceAdapter;
+import io.github.pavansharma36.workflow.api.bean.id.ManagerId;
+import io.github.pavansharma36.workflow.api.bean.id.RunId;
+import io.github.pavansharma36.workflow.api.bean.id.TaskId;
+import io.github.pavansharma36.workflow.api.bean.task.TaskType;
+import io.github.pavansharma36.workflow.api.dag.RunnableTaskDag;
+import io.github.pavansharma36.workflow.api.executor.ExecutableTask;
+import io.github.pavansharma36.workflow.api.executor.ExecutionResult;
+import io.github.pavansharma36.workflow.api.model.ManagerInfo;
+import io.github.pavansharma36.workflow.api.model.RunInfo;
+import io.github.pavansharma36.workflow.api.model.TaskInfo;
+import io.github.pavansharma36.workflow.api.serde.Serde;
+import io.github.pavansharma36.workflow.api.util.PollDelayGenerator;
+import io.github.pavansharma36.workflow.mongodb.helper.IdCodecs;
+import io.github.pavansharma36.workflow.mongodb.helper.MongoDBQueryHelper;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import org.bson.BsonDocument;
+import org.bson.BsonDocumentWriter;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.codecs.EncoderContext;
+import org.bson.codecs.configuration.CodecProvider;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.codecs.pojo.PojoCodecProvider;
+import org.bson.conversions.Bson;
+
+public class MongoDBPersistenceAdapter extends BasePersistenceAdapter implements PersistenceAdapter {
+
+ private final String database;
+ private final MongoClient mongoClient;
+
+ private final CodecRegistry codec;
+
+ public MongoDBPersistenceAdapter(String namespace, PollDelayGenerator heartbeatDelayGenerator,
+ String database, MongoClient mongoClient, Serde serde) {
+ super(namespace, heartbeatDelayGenerator);
+ this.database = database;
+ this.mongoClient = mongoClient;
+
+ CodecProvider pojoCodecProvider = PojoCodecProvider.builder()
+ .register(ManagerInfo.class, RunInfo.class, TaskInfo.class, ExecutionResult.class, RunnableTaskDag.class,
+ TaskType.class).build();
+ CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),
+ fromProviders(pojoCodecProvider), fromCodecs(new IdCodecs.ManagerIdCodec(), new IdCodecs.RunIdCodec(), new IdCodecs.TaskIdCodec()));
+
+ this.codec = pojoCodecRegistry;
+ }
+
+ @Override
+ public void start(WorkflowManager workflowManager) {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public boolean createOrUpdateManagerInfo(ManagerInfo managerInfo) {
+ Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY,
+ new BsonString(managerInfo.getManagerId().getId()));
+ Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, managerInfo.getManagerId().getId()),
+ Updates.set(MongoDBQueryHelper.ManagerInfo.START_TIME_KEY, managerInfo.getStartTimeEpoch()),
+ Updates.set(MongoDBQueryHelper.ManagerInfo.HEARTBEAT_KEY, managerInfo.getHeartbeatEpoch()));
+ UpdateOptions options = new UpdateOptions().upsert(true);
+ return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace))
+ .updateOne(filter, update, options).wasAcknowledged();
+ }
+
+ @Override
+ public List getAllManagerInfos() {
+ try (MongoCursor infos = collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace))
+ .find(ManagerInfo.class).cursor()) {
+ List docs = new LinkedList<>();
+ while (infos.hasNext()) {
+ docs.add(infos.next());
+ }
+ return docs;
+ }
+ }
+
+ @Override
+ public boolean removeManagerInfo(ManagerId id) {
+ Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY,
+ new BsonString(id.getId()));
+ return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)).deleteOne(filter)
+ .wasAcknowledged();
+ }
+
+ @Override
+ public boolean updateQueuedTime(RunId runId, TaskId taskId) {
+ Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()),
+ Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId()));
+ Bson update = Updates.set(MongoDBQueryHelper.TaskInfo.QUEUED_TIME_KEY, System.currentTimeMillis());
+ return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update)
+ .wasAcknowledged();
+ }
+
+ @Override
+ public boolean updateStartTime(RunId runId) {
+ Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId());
+ Bson update = Updates.set(MongoDBQueryHelper.RunInfo.START_TIME_KEY, System.currentTimeMillis());
+ return collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).updateOne(filter, update)
+ .wasAcknowledged();
+ }
+
+ @Override
+ public boolean updateStartTime(RunId runId, TaskId taskId, ManagerId processedBy) {
+ Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()),
+ Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId()));
+ Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.TaskInfo.START_TIME_KEY, System.currentTimeMillis()),
+ Updates.set(MongoDBQueryHelper.TaskInfo.PROCESSED_BY_KEY, processedBy.getId()));
+ return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update)
+ .wasAcknowledged();
+ }
+
+ @Override
+ public boolean completeTask(ExecutableTask executableTask, ExecutionResult executionResult) {
+ Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, executableTask.getRunId().getId()),
+ Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, executableTask.getTaskId().getId()));
+ Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.TaskInfo.COMPLETION_TIME_KEY, System.currentTimeMillis()),
+ Updates.set(MongoDBQueryHelper.TaskInfo.RESULT_KEY, executionResult));
+ return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update)
+ .wasAcknowledged();
+ }
+
+ @Override
+ public Optional getTaskInfo(RunId runId, TaskId taskId) {
+ Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()),
+ Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId()));
+ TaskInfo taskInfo = collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace))
+ .find(filter, TaskInfo.class).first();
+ return Optional.ofNullable(taskInfo);
+ }
+
+ @Override
+ public Optional getRunInfo(RunId runId) {
+ Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId());
+ RunInfo runInfo = collection(MongoDBQueryHelper.RunInfo.collectionName(namespace))
+ .find(filter, RunInfo.class).first();
+ return Optional.ofNullable(runInfo);
+ }
+
+ @Override
+ public void createRunInfo(RunInfo runInfo) {
+ BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument());
+ codec.get(RunInfo.class).encode(writer, runInfo, EncoderContext.builder().build());
+ BsonDocument doc = writer.getDocument();
+ collection(MongoDBQueryHelper.RunInfo.collectionName(namespace))
+ .insertOne(Document.parse(doc.toJson()));
+ }
+
+ @Override
+ public boolean updateRunInfoEpoch(RunId runId) {
+ Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId());
+ Bson update = Updates.set(MongoDBQueryHelper.RunInfo.LAST_UPDATE_KEY, System.currentTimeMillis());
+ return collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).updateOne(filter, update)
+ .wasAcknowledged();
+ }
+
+ @Override
+ public void createTaskInfos(RunId runId, List taskInfos) {
+ List docs = new ArrayList<>(taskInfos.size());
+ for (TaskInfo taskInfo : taskInfos) {
+ taskInfo.setRunId(runId);
+ BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument());
+ codec.get(TaskInfo.class).encode(writer, taskInfo, EncoderContext.builder().build());
+ BsonDocument doc = writer.getDocument();
+ docs.add(Document.parse(doc.toJson()));
+ }
+ collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace))
+ .insertMany(docs);
+ }
+
+ @Override
+ public boolean cleanup(RunId runId) {
+ Bson taskFilter = Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId());
+ collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).deleteMany(taskFilter);
+
+ Bson runFilter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId());
+ collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).deleteMany(runFilter);
+ return true;
+ }
+
+ @Override
+ public List getStuckRunInfos(Duration maxDuration) {
+ // TODO
+ return Collections.emptyList();
+ }
+
+ private MongoCollection collection(String collection) {
+ return mongoClient.getDatabase(database).getCollection(collection).withCodecRegistry(codec);
+ }
+
+}
diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java
new file mode 100644
index 0000000..c6c7f9b
--- /dev/null
+++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java
@@ -0,0 +1,25 @@
+package io.github.pavansharma36.workflow.mongodb.adapter.builder;
+
+import com.mongodb.client.MongoClient;
+import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter;
+import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder;
+import io.github.pavansharma36.workflow.mongodb.adapter.MongoDBPersistenceAdapter;
+import lombok.AccessLevel;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public class MongoDBPersistanceAdapterBuilder extends BasePersistenceAdapterBuilder {
+
+ private final String database;
+ private final MongoClient mongoClient;
+
+ public static MongoDBPersistanceAdapterBuilder builder(@NonNull String database, @NonNull MongoClient mongoClient) {
+ return new MongoDBPersistanceAdapterBuilder(database, mongoClient);
+ }
+
+ @Override
+ public PersistenceAdapter build() {
+ return new MongoDBPersistenceAdapter(namespace, pollDelayGenerator, database, mongoClient, serde);
+ }
+}
diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java
new file mode 100644
index 0000000..d1d76dc
--- /dev/null
+++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java
@@ -0,0 +1,67 @@
+package io.github.pavansharma36.workflow.mongodb.helper;
+
+import io.github.pavansharma36.workflow.api.bean.id.ManagerId;
+import io.github.pavansharma36.workflow.api.bean.id.RunId;
+import io.github.pavansharma36.workflow.api.bean.id.TaskId;
+import org.bson.BsonReader;
+import org.bson.BsonWriter;
+import org.bson.codecs.Codec;
+import org.bson.codecs.DecoderContext;
+import org.bson.codecs.EncoderContext;
+
+public class IdCodecs {
+
+ public static class ManagerIdCodec implements Codec {
+ @Override
+ public ManagerId decode(BsonReader bsonReader, DecoderContext decoderContext) {
+ return new ManagerId(bsonReader.readString());
+ }
+
+ @Override
+ public void encode(BsonWriter bsonWriter, ManagerId managerId, EncoderContext encoderContext) {
+ bsonWriter.writeString(managerId.getId());
+ }
+
+ @Override
+ public Class getEncoderClass() {
+ return ManagerId.class;
+ }
+ }
+
+ public static class RunIdCodec implements Codec {
+
+ @Override
+ public RunId decode(BsonReader bsonReader, DecoderContext decoderContext) {
+ return new RunId(bsonReader.readString());
+ }
+
+ @Override
+ public void encode(BsonWriter bsonWriter, RunId runId, EncoderContext encoderContext) {
+ bsonWriter.writeString(runId.getId());
+ }
+
+ @Override
+ public Class getEncoderClass() {
+ return RunId.class;
+ }
+ }
+
+ public static class TaskIdCodec implements Codec {
+
+ @Override
+ public TaskId decode(BsonReader bsonReader, DecoderContext decoderContext) {
+ return new TaskId(bsonReader.readString());
+ }
+
+ @Override
+ public void encode(BsonWriter bsonWriter, TaskId taskId, EncoderContext encoderContext) {
+ bsonWriter.writeString(taskId.getId());
+ }
+
+ @Override
+ public Class getEncoderClass() {
+ return TaskId.class;
+ }
+ }
+
+}
diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java
new file mode 100644
index 0000000..9a9f09d
--- /dev/null
+++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java
@@ -0,0 +1,52 @@
+package io.github.pavansharma36.workflow.mongodb.helper;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class MongoDBQueryHelper {
+
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
+ public static class ManagerInfo {
+ public static final String MANAGER_ID_KEY = "managerId";
+ public static final String START_TIME_KEY = "startTimeEpoch";
+ public static final String HEARTBEAT_KEY = "heartbeatEpoch";
+
+ public static String collectionName(String namespace) {
+ return namespace + "_manager_info";
+ }
+ }
+
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
+ public static class TaskInfo {
+
+ public static final String QUEUED_TIME_KEY = "queuedTimeEpoch";
+ public static final String RUN_ID_KEY = "runId";
+ public static final String TASK_ID_KEY = "taskId";
+ public static final String START_TIME_KEY = "startTimeEpoch";
+ public static final String PROCESSED_BY_KEY = "processedBy";
+ public static final String COMPLETION_TIME_KEY = "completionTimeEpoch";
+ public static final String RESULT_KEY = "result";
+
+ public static String collectionName(String namespace) {
+ return namespace + "_task_info";
+ }
+ }
+
+ public static class RunInfo {
+
+ public static final String RUN_ID_KEY = "runId";
+ public static final String QUEUED_TIME_KEY = "queuedTime";
+ public static final String START_TIME_KEY = "startTimeEpoch";
+ public static final String LAST_UPDATE_KEY = "lastUpdateEpoch";
+
+ public static String collectionName(String namespace) {
+ return namespace + "_run_info";
+ }
+ }
+
+
+
+}
diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java
new file mode 100644
index 0000000..0083b6b
--- /dev/null
+++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java
@@ -0,0 +1,45 @@
+package io.github.pavansharma36.workflow.mongodb.helper;
+
+import io.github.pavansharma36.workflow.api.serde.Deserializer;
+import io.github.pavansharma36.workflow.api.serde.Serde;
+import io.github.pavansharma36.workflow.api.serde.Serializer;
+import org.bson.BsonReader;
+import org.bson.BsonWriter;
+import org.bson.RawBsonDocument;
+import org.bson.codecs.Codec;
+import org.bson.codecs.DecoderContext;
+import org.bson.codecs.EncoderContext;
+import org.bson.codecs.configuration.CodecRegistry;
+
+public class SerdeCodec implements Codec {
+
+ private final Serializer serializer;
+ private final Deserializer deserializer;
+ private final Codec rawBsonDocumentCodec;
+ private final Class type;
+
+ public SerdeCodec(Serde serde,
+ CodecRegistry codecRegistry,
+ Class type) {
+ this.serializer = serde.serializer();
+ this.deserializer = serde.deserializer();
+ this.rawBsonDocumentCodec = codecRegistry.get(RawBsonDocument.class);
+ this.type = type;
+ }
+ @Override
+ public T decode(BsonReader bsonReader, DecoderContext decoderContext) {
+ RawBsonDocument document = rawBsonDocumentCodec.decode(bsonReader, decoderContext);
+ return deserializer.deserialize(document.getByteBuffer().array(), type);
+ }
+
+ @Override
+ public void encode(BsonWriter bsonWriter, T t, EncoderContext encoderContext) {
+ byte[] data = serializer.serialize(t);
+ rawBsonDocumentCodec.encode(bsonWriter, new RawBsonDocument(data), encoderContext);
+ }
+
+ @Override
+ public Class getEncoderClass() {
+ return type;
+ }
+}
diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java
new file mode 100644
index 0000000..d7819bb
--- /dev/null
+++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java
@@ -0,0 +1,35 @@
+package io.github.pavansharma36.workflow.mongodb.helper;
+
+import io.github.pavansharma36.workflow.api.model.ManagerInfo;
+import io.github.pavansharma36.workflow.api.serde.Serde;
+import java.util.HashMap;
+import java.util.Map;
+import org.bson.codecs.Codec;
+import org.bson.codecs.configuration.CodecRegistry;
+
+public class SerdeCodecProvider implements CodecRegistry {
+
+ private final CodecRegistry delegation;
+ private final Map, Codec>> codecs;
+
+ public SerdeCodecProvider(CodecRegistry delegation, Serde serde) {
+ this.delegation = delegation;
+ this.codecs = new HashMap<>();
+ codecs.put(ManagerInfo.class, new SerdeCodec<>(serde, delegation,
+ ManagerInfo.class));
+ }
+
+ @Override
+ public Codec get(Class aClass, CodecRegistry codecRegistry) {
+ if (codecs.containsKey(aClass)) {
+ return (Codec) codecs.get(aClass);
+ } else {
+ return codecRegistry.get(aClass);
+ }
+ }
+
+ @Override
+ public Codec get(Class aClass) {
+ return get(aClass, delegation);
+ }
+}
diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java
new file mode 100644
index 0000000..0e1a02f
--- /dev/null
+++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBNormalTest.java
@@ -0,0 +1,12 @@
+package io.github.pavansharma36.workflow.mongodb;
+
+import io.github.pavansharma36.workflow.api.NormalTest;
+import io.github.pavansharma36.workflow.mongodb.rule.MongoDBRule;
+
+public class MongoDBNormalTest extends NormalTest {
+
+ @Override
+ protected MongoDBRule rule() {
+ return new MongoDBRule();
+ }
+}
diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBPerformanceTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBPerformanceTest.java
new file mode 100644
index 0000000..9c933e3
--- /dev/null
+++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBPerformanceTest.java
@@ -0,0 +1,11 @@
+package io.github.pavansharma36.workflow.mongodb;
+
+import io.github.pavansharma36.workflow.api.PerformanceTest;
+import io.github.pavansharma36.workflow.mongodb.rule.MongoDBRule;
+
+public class MongoDBPerformanceTest extends PerformanceTest {
+ @Override
+ protected MongoDBRule rule() {
+ return new MongoDBRule();
+ }
+}
diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBWorkflowListenerTest.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBWorkflowListenerTest.java
new file mode 100644
index 0000000..06b632d
--- /dev/null
+++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/MongoDBWorkflowListenerTest.java
@@ -0,0 +1,12 @@
+package io.github.pavansharma36.workflow.mongodb;
+
+import io.github.pavansharma36.workflow.api.WorkflowListenerTest;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.mongodb.rule.MongoDBRule;
+
+public class MongoDBWorkflowListenerTest extends WorkflowListenerTest {
+ @Override
+ protected WorkflowTestRule rule() {
+ return new MongoDBRule();
+ }
+}
diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java
new file mode 100644
index 0000000..b15ff45
--- /dev/null
+++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java
@@ -0,0 +1,47 @@
+package io.github.pavansharma36.workflow.mongodb.rule;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import io.github.pavansharma36.workflow.api.adapter.WorkflowAdapter;
+import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder;
+import io.github.pavansharma36.workflow.api.junit.WorkflowTestRule;
+import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator;
+import io.github.pavansharma36.workflow.inmemory.builder.InmemoryQueueAdapterBuilder;
+import io.github.pavansharma36.workflow.inmemory.builder.InmemoryScheduleAdapterBuilder;
+import io.github.pavansharma36.workflow.mongodb.adapter.builder.MongoDBPersistanceAdapterBuilder;
+import java.time.Duration;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class MongoDBRule extends WorkflowTestRule {
+
+ public final MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"));
+
+ private MongoClient client = null;
+
+ @Override
+ protected void pre() {
+ mongoDBContainer.start();
+ client = MongoClients.create(mongoDBContainer.getConnectionString());
+ }
+
+ @Override
+ protected void post() {
+ client.close();
+ mongoDBContainer.stop();
+ }
+
+
+ @Override
+ public WorkflowAdapter adapter() {
+ final String namespace = "test";
+ return new WorkflowAdapterBuilder()
+ .withPersistenceAdapterBuilder(MongoDBPersistanceAdapterBuilder.builder("test", client)
+ .withNamespace(namespace))
+ .withScheduleAdapterBuilder(new InmemoryScheduleAdapterBuilder().withNamespace(namespace))
+ .withQueueAdapterBuilder(new InmemoryQueueAdapterBuilder().withNamespace(namespace))
+ .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
+ .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L)))
+ .build();
+ }
+}