From 93d4763f2da46c4fa551e08298f522d6f60c4fe3 Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Thu, 7 Dec 2023 23:59:29 +0530 Subject: [PATCH 1/2] fixed checkstyle --- .../BasePersistenceAdapterBuilder.java | 11 ++ .../builder/WorkflowAdapterBuilder.java | 54 +++++++- .../api/executor/ExecutionResult.java | 11 ++ .../workflow/api/impl/QueueConsumerImpl.java | 3 +- .../workflow/api/serde/IdSerializer.java | 13 +- .../pavansharma36/workflow/examples/App.java | 3 +- .../adapter/InmemoryPersistenceAdapter.java | 12 +- .../adapter/InmemoryQueueAdapter.java | 3 + .../adapter/InmemorySchedulerAdapter.java | 3 + .../InmemoryPersistenceAdapterBuilder.java | 7 + .../builder/InmemoryQueueAdapterBuilder.java | 7 + .../InmemoryScheduleAdapterBuilder.java | 9 +- .../InmemoryWorkflowAdapterBuilder.java | 9 +- .../java/io/github/pavansharma36/App.java | 13 -- .../java/io/github/pavansharma36/App.java | 13 -- ...er.java => MongoDbPersistenceAdapter.java} | 123 +++++++++++------- .../MongoDBPersistanceAdapterBuilder.java | 25 ---- .../MongoDbPersistanceAdapterBuilder.java | 38 ++++++ .../workflow/mongodb/helper/IdCodecs.java | 12 ++ ...eryHelper.java => MongoDbQueryHelper.java} | 15 ++- .../workflow/mongodb/helper/SerdeCodec.java | 14 ++ .../mongodb/helper/SerdeCodecProvider.java | 22 +++- .../workflow/mongodb/rule/MongoDBRule.java | 4 +- 23 files changed, 308 insertions(+), 116 deletions(-) delete mode 100644 workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java delete mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/App.java rename workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/{MongoDBPersistenceAdapter.java => MongoDbPersistenceAdapter.java} (58%) delete mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java create mode 100644 workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDbPersistanceAdapterBuilder.java rename workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/{MongoDBQueryHelper.java => MongoDbQueryHelper.java} (87%) diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java index cf98ef7..34bdf98 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/BasePersistenceAdapterBuilder.java @@ -3,9 +3,20 @@ import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; +/** + * persistent adapter builder. + * + * @param - same class. + */ public abstract class BasePersistenceAdapterBuilder> extends BaseAdapterBuilder { + /** + * heartbeat delay generator to use. + * + * @param heartbeatDelayGenerator - generator + * @return - this + */ public T withHeartbeatDelayGenerator(PollDelayGenerator heartbeatDelayGenerator) { this.pollDelayGenerator = heartbeatDelayGenerator; return (T) this; diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java index 3ed01d9..5f876e1 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/adapter/builder/WorkflowAdapterBuilder.java @@ -22,17 +22,36 @@ public WorkflowAdapterBuilder withQueuePollDelayGenerator( return this; } + /** + * schedule poll delay generator to use. + * + * @param pollDelayGenerator - generator + * @return - this + */ public WorkflowAdapterBuilder withSchedulePollDelayGenerator( final PollDelayGenerator pollDelayGenerator) { this.scheduleAdapterBuilder.withPollDelayGenerator(pollDelayGenerator); return this; } - public WorkflowAdapterBuilder withHeartbeatDelayGenerator(PollDelayGenerator heartbeatDelayGenerator) { + /** + * heartbeat generator to use. + * + * @param heartbeatDelayGenerator - generator + * @return - this + */ + public WorkflowAdapterBuilder withHeartbeatDelayGenerator(PollDelayGenerator + heartbeatDelayGenerator) { this.persistenceAdapterBuilder.withHeartbeatDelayGenerator(heartbeatDelayGenerator); return this; } + /** + * poll delay generator to use. + * + * @param pollDelayGenerator - generator + * @return - this + */ public WorkflowAdapterBuilder withMaintenancePollDelayGenerator( final PollDelayGenerator pollDelayGenerator ) { @@ -40,22 +59,49 @@ public WorkflowAdapterBuilder withMaintenancePollDelayGenerator( return this; } + /** + * max run duration to use. + * + * @param duration - duration + * @return - this + */ public WorkflowAdapterBuilder withMaxRunDuration(Duration duration) { this.scheduleAdapterBuilder.maxRunDuration(duration); return this; } - public WorkflowAdapterBuilder withScheduleAdapterBuilder(BaseScheduleAdapterBuilder scheduleAdapterBuilder) { + /** + * scheduler adapter to use. + * + * @param scheduleAdapterBuilder - adapter + * @return - this + */ + public WorkflowAdapterBuilder withScheduleAdapterBuilder(BaseScheduleAdapterBuilder + scheduleAdapterBuilder) { this.scheduleAdapterBuilder = scheduleAdapterBuilder; return this; } - public WorkflowAdapterBuilder withPersistenceAdapterBuilder(BasePersistenceAdapterBuilder persistenceAdapterBuilder) { + /** + * persistent adapter to use. + * + * @param persistenceAdapterBuilder - adaper + * @return - this + */ + public WorkflowAdapterBuilder withPersistenceAdapterBuilder(BasePersistenceAdapterBuilder + persistenceAdapterBuilder) { this.persistenceAdapterBuilder = persistenceAdapterBuilder; return this; } - public WorkflowAdapterBuilder withQueueAdapterBuilder(BaseAdapterBuilder queueAdapterBuilder) { + /** + * queue adapter to use. + * + * @param queueAdapterBuilder - queue adaper + * @return - this + */ + public WorkflowAdapterBuilder withQueueAdapterBuilder( + BaseAdapterBuilder queueAdapterBuilder) { this.queueAdapterBuilder = queueAdapterBuilder; return this; } diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java index 4b31c1e..876ad9b 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/executor/ExecutionResult.java @@ -18,10 +18,21 @@ public class ExecutionResult { private Map resultMeta; private TaskId decision; + /** + * for serializer. + */ public ExecutionResult() { } + /** + * all args constructor. + * + * @param status - status + * @param message - message + * @param resultMeta - result + * @param decision - decision + */ public ExecutionResult(TaskExecutionStatus status, String message, Map resultMeta, TaskId decision) { this.status = status; diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java index ceab70b..9da1f47 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/impl/QueueConsumerImpl.java @@ -132,7 +132,8 @@ private void run(final WorkflowManager workflowManager) { break; } catch (final Throwable e) { log.error("Unhandled error in task execution {}", e.getMessage(), e); - executionResult = new ExecutionResult(TaskExecutionStatus.FAILED_STOP, e.getMessage(), null, null); + executionResult = new ExecutionResult(TaskExecutionStatus.FAILED_STOP, + e.getMessage(), null, null); force = true; } } while (retry++ < taskInfo.getRetryCount()); diff --git a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java index b647d8f..3f7e221 100644 --- a/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java +++ b/workflow-api/src/main/java/io/github/pavansharma36/workflow/api/serde/IdSerializer.java @@ -3,11 +3,22 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; import io.github.pavansharma36.workflow.api.bean.id.Id; import java.io.IOException; +/** + * jackson serializer for Id. + */ public class IdSerializer extends JsonSerializer { + + /** + * serialize id as string. + * + * @param id - id to serialize + * @param jsonGenerator - json + * @param serializerProvider - serializer + * @throws IOException - invalid + */ @Override public void serialize(Id id, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { diff --git a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java index b488588..983bd73 100644 --- a/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java +++ b/workflow-examples/src/main/java/io/github/pavansharma36/workflow/examples/App.java @@ -77,7 +77,8 @@ public static void main(final String[] args) throws InterruptedException, IOExce final WorkflowManager workflowManager = WorkflowManagerBuilder.builder().withAdapter(adapter) .addingTaskExecutor(taskTypeA, 2, te).addingTaskExecutor(taskTypeB, 2, te) .addingTaskExecutor(taskTypeC, 2, te).addingTaskExecutor(decisionType, 1, - (manager, task) -> new ExecutionResult(TaskExecutionStatus.SUCCESS, null, null, new TaskId("taske"))).build(); + (manager, task) -> new ExecutionResult(TaskExecutionStatus.SUCCESS, null, + null, new TaskId("taske"))).build(); CountDownLatch countDownLatch = new CountDownLatch(SUBMIT_COUNT); workflowManager.workflowManagerListener().addListener(new WorkflowListener() { diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java index 70573e3..18d4471 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryPersistenceAdapter.java @@ -21,12 +21,21 @@ import java.util.Optional; import java.util.stream.Collectors; +/** + * inmemory persistence adapter. + */ public class InmemoryPersistenceAdapter extends BasePersistenceAdapter { private Map managerInfos = new HashMap<>(); private Map runInfos = new HashMap<>(); private Map> taskInfos = new HashMap<>(); + /** + * required constructor. + * + * @param namespace - NA + * @param heartbeatDelayGenerator - poll delay generator. + */ public InmemoryPersistenceAdapter(String namespace, PollDelayGenerator heartbeatDelayGenerator) { super(namespace, heartbeatDelayGenerator); } @@ -130,7 +139,8 @@ public boolean updateRunInfoEpoch(RunId runId) { @Override public void createTaskInfos(RunId runId, List taskInfos) { - this.taskInfos.put(runId, taskInfos.stream().collect(Collectors.toMap(TaskInfo::getTaskId, t -> t))); + this.taskInfos.put(runId, taskInfos.stream().collect( + Collectors.toMap(TaskInfo::getTaskId, t -> t))); } @Override diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java index ea79e55..e94db6d 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemoryQueueAdapter.java @@ -13,6 +13,9 @@ import java.util.Optional; import lombok.RequiredArgsConstructor; +/** + * queue adapter for inmemory workflow. + */ @RequiredArgsConstructor public class InmemoryQueueAdapter implements QueueAdapter { diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java index 83e3627..b2a6340 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/adapter/InmemorySchedulerAdapter.java @@ -7,6 +7,9 @@ import java.time.Duration; import lombok.RequiredArgsConstructor; +/** + * schedule adapter for inmemory workflow. + */ @RequiredArgsConstructor public class InmemorySchedulerAdapter implements ScheduleAdapter { diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java index b5beccd..5096494 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryPersistenceAdapterBuilder.java @@ -5,12 +5,19 @@ import io.github.pavansharma36.workflow.api.util.WorkflowException; import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryPersistenceAdapter; +/** + * persistent adapter builder for inmemory workflow. + */ public class InmemoryPersistenceAdapterBuilder extends BasePersistenceAdapterBuilder { + /** + * default constructor. + */ public InmemoryPersistenceAdapterBuilder() { namespace = "NA"; } + @Override public PersistenceAdapter build() { return new InmemoryPersistenceAdapter(namespace, pollDelayGenerator); diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java index 68a3f7e..f9272dd 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryQueueAdapterBuilder.java @@ -5,12 +5,19 @@ import io.github.pavansharma36.workflow.api.util.WorkflowException; import io.github.pavansharma36.workflow.inmemory.adapter.InmemoryQueueAdapter; +/** + * queue adapter builder for inmemory workflow. + */ public class InmemoryQueueAdapterBuilder extends BaseAdapterBuilder { + /** + * default constructor. + */ public InmemoryQueueAdapterBuilder() { namespace = "NA"; } + @Override public QueueAdapter build() { return new InmemoryQueueAdapter(pollDelayGenerator); diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java index b110c69..11e9bcc 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryScheduleAdapterBuilder.java @@ -5,8 +5,15 @@ import io.github.pavansharma36.workflow.api.util.WorkflowException; import io.github.pavansharma36.workflow.inmemory.adapter.InmemorySchedulerAdapter; -public class InmemoryScheduleAdapterBuilder extends BaseScheduleAdapterBuilder { +/** + * schedule adapter builder for inmemory workflow. + */ +public class InmemoryScheduleAdapterBuilder + extends BaseScheduleAdapterBuilder { + /** + * default constructor. + */ public InmemoryScheduleAdapterBuilder() { namespace = "NA"; } diff --git a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java index 59b9303..24c4ccb 100644 --- a/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java +++ b/workflow-inmemory/src/main/java/io/github/pavansharma36/workflow/inmemory/builder/InmemoryWorkflowAdapterBuilder.java @@ -2,10 +2,17 @@ import io.github.pavansharma36.workflow.api.adapter.builder.WorkflowAdapterBuilder; +/** + * workflow adapter builder for inmemory workflow. + */ public class InmemoryWorkflowAdapterBuilder extends WorkflowAdapterBuilder { - + /** + * main builder. + * + * @return - this + */ public static InmemoryWorkflowAdapterBuilder builder() { InmemoryWorkflowAdapterBuilder builder = new InmemoryWorkflowAdapterBuilder(); builder.scheduleAdapterBuilder = new InmemoryScheduleAdapterBuilder(); diff --git a/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java b/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java deleted file mode 100644 index 9ecae98..0000000 --- a/workflow-jdbc/workflow-jdbc-common/src/main/java/io/github/pavansharma36/App.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.github.pavansharma36; - -/** - * Hello world! - * - */ -public class App -{ - public static void main( String[] args ) - { - System.out.println( "Hello World!" ); - } -} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java deleted file mode 100644 index 9ecae98..0000000 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/App.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.github.pavansharma36; - -/** - * Hello world! - * - */ -public class App -{ - public static void main( String[] args ) - { - System.out.println( "Hello World!" ); - } -} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDbPersistenceAdapter.java similarity index 58% rename from workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java rename to workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDbPersistenceAdapter.java index 21b95f5..e96fedb 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDBPersistenceAdapter.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/MongoDbPersistenceAdapter.java @@ -27,7 +27,7 @@ import io.github.pavansharma36.workflow.api.serde.Serde; import io.github.pavansharma36.workflow.api.util.PollDelayGenerator; import io.github.pavansharma36.workflow.mongodb.helper.IdCodecs; -import io.github.pavansharma36.workflow.mongodb.helper.MongoDBQueryHelper; +import io.github.pavansharma36.workflow.mongodb.helper.MongoDbQueryHelper; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -44,24 +44,40 @@ import org.bson.codecs.pojo.PojoCodecProvider; import org.bson.conversions.Bson; -public class MongoDBPersistenceAdapter extends BasePersistenceAdapter implements PersistenceAdapter { +/** + * persistent adapter using mongoclient. + */ +public class MongoDbPersistenceAdapter extends BasePersistenceAdapter + implements PersistenceAdapter { private final String database; private final MongoClient mongoClient; private final CodecRegistry codec; - public MongoDBPersistenceAdapter(String namespace, PollDelayGenerator heartbeatDelayGenerator, - String database, MongoClient mongoClient, Serde serde) { + /** + * required args constructor. + * + * @param namespace - namespace + * @param heartbeatDelayGenerator - heartbeat delay generator + * @param database - database to use + * @param mongoClient - mongo client + * @param serde - serde + */ + public MongoDbPersistenceAdapter(String namespace, PollDelayGenerator heartbeatDelayGenerator, + String database, MongoClient mongoClient, Serde serde) { super(namespace, heartbeatDelayGenerator); this.database = database; this.mongoClient = mongoClient; CodecProvider pojoCodecProvider = PojoCodecProvider.builder() - .register(ManagerInfo.class, RunInfo.class, TaskInfo.class, ExecutionResult.class, RunnableTaskDag.class, + .register(ManagerInfo.class, RunInfo.class, TaskInfo.class, + ExecutionResult.class, RunnableTaskDag.class, TaskType.class).build(); CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(), - fromProviders(pojoCodecProvider), fromCodecs(new IdCodecs.ManagerIdCodec(), new IdCodecs.RunIdCodec(), new IdCodecs.TaskIdCodec())); + fromProviders(pojoCodecProvider), + fromCodecs(new IdCodecs.ManagerIdCodec(), + new IdCodecs.RunIdCodec(), new IdCodecs.TaskIdCodec())); this.codec = pojoCodecRegistry; } @@ -78,19 +94,21 @@ public void stop() { @Override public boolean createOrUpdateManagerInfo(ManagerInfo managerInfo) { - Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, + Bson filter = Filters.eq(MongoDbQueryHelper.ManagerInfo.MANAGER_ID_KEY, new BsonString(managerInfo.getManagerId().getId())); - Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, managerInfo.getManagerId().getId()), - Updates.set(MongoDBQueryHelper.ManagerInfo.START_TIME_KEY, managerInfo.getStartTimeEpoch()), - Updates.set(MongoDBQueryHelper.ManagerInfo.HEARTBEAT_KEY, managerInfo.getHeartbeatEpoch())); + Bson update = Updates.combine(Updates.set( + MongoDbQueryHelper.ManagerInfo.MANAGER_ID_KEY, managerInfo.getManagerId().getId()), + Updates.set(MongoDbQueryHelper.ManagerInfo.START_TIME_KEY, managerInfo.getStartTimeEpoch()), + Updates.set(MongoDbQueryHelper.ManagerInfo.HEARTBEAT_KEY, managerInfo.getHeartbeatEpoch())); UpdateOptions options = new UpdateOptions().upsert(true); - return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)) + return collection(MongoDbQueryHelper.ManagerInfo.collectionName(namespace)) .updateOne(filter, update, options).wasAcknowledged(); } @Override public List getAllManagerInfos() { - try (MongoCursor infos = collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)) + try (MongoCursor infos = collection( + MongoDbQueryHelper.ManagerInfo.collectionName(namespace)) .find(ManagerInfo.class).cursor()) { List docs = new LinkedList<>(); while (infos.hasNext()) { @@ -102,62 +120,73 @@ public List getAllManagerInfos() { @Override public boolean removeManagerInfo(ManagerId id) { - Bson filter = Filters.eq(MongoDBQueryHelper.ManagerInfo.MANAGER_ID_KEY, + Bson filter = Filters.eq(MongoDbQueryHelper.ManagerInfo.MANAGER_ID_KEY, new BsonString(id.getId())); - return collection(MongoDBQueryHelper.ManagerInfo.collectionName(namespace)).deleteOne(filter) + return collection(MongoDbQueryHelper.ManagerInfo.collectionName(namespace)).deleteOne(filter) .wasAcknowledged(); } @Override public boolean updateQueuedTime(RunId runId, TaskId taskId) { - Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), - Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); - Bson update = Updates.set(MongoDBQueryHelper.TaskInfo.QUEUED_TIME_KEY, System.currentTimeMillis()); - return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update) + Bson filter = Filters.and(Filters.eq(MongoDbQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), + Filters.eq(MongoDbQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); + Bson update = Updates.set(MongoDbQueryHelper.TaskInfo.QUEUED_TIME_KEY, + System.currentTimeMillis()); + return collection(MongoDbQueryHelper.TaskInfo.collectionName(namespace)) + .updateOne(filter, update) .wasAcknowledged(); } @Override public boolean updateStartTime(RunId runId) { - Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); - Bson update = Updates.set(MongoDBQueryHelper.RunInfo.START_TIME_KEY, System.currentTimeMillis()); - return collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).updateOne(filter, update) + Bson filter = Filters.eq(MongoDbQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + Bson update = Updates.set(MongoDbQueryHelper.RunInfo.START_TIME_KEY, + System.currentTimeMillis()); + return collection(MongoDbQueryHelper.RunInfo.collectionName(namespace)) + .updateOne(filter, update) .wasAcknowledged(); } @Override public boolean updateStartTime(RunId runId, TaskId taskId, ManagerId processedBy) { - Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), - Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); - Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.TaskInfo.START_TIME_KEY, System.currentTimeMillis()), - Updates.set(MongoDBQueryHelper.TaskInfo.PROCESSED_BY_KEY, processedBy.getId())); - return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update) + Bson filter = Filters.and(Filters.eq(MongoDbQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), + Filters.eq(MongoDbQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); + Bson update = Updates.combine(Updates.set( + MongoDbQueryHelper.TaskInfo.START_TIME_KEY, System.currentTimeMillis()), + Updates.set(MongoDbQueryHelper.TaskInfo.PROCESSED_BY_KEY, processedBy.getId())); + return collection(MongoDbQueryHelper.TaskInfo.collectionName(namespace)) + .updateOne(filter, update) .wasAcknowledged(); } @Override - public boolean completeTask(ExecutableTask executableTask, ExecutionResult executionResult) { - Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, executableTask.getRunId().getId()), - Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, executableTask.getTaskId().getId())); - Bson update = Updates.combine(Updates.set(MongoDBQueryHelper.TaskInfo.COMPLETION_TIME_KEY, System.currentTimeMillis()), - Updates.set(MongoDBQueryHelper.TaskInfo.RESULT_KEY, executionResult)); - return collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).updateOne(filter, update) + public boolean completeTask(ExecutableTask executableTask, + ExecutionResult executionResult) { + Bson filter = Filters.and(Filters.eq(MongoDbQueryHelper.TaskInfo.RUN_ID_KEY, + executableTask.getRunId().getId()), + Filters.eq(MongoDbQueryHelper.TaskInfo.TASK_ID_KEY, + executableTask.getTaskId().getId())); + Bson update = Updates.combine(Updates.set( + MongoDbQueryHelper.TaskInfo.COMPLETION_TIME_KEY, System.currentTimeMillis()), + Updates.set(MongoDbQueryHelper.TaskInfo.RESULT_KEY, executionResult)); + return collection(MongoDbQueryHelper.TaskInfo.collectionName(namespace)) + .updateOne(filter, update) .wasAcknowledged(); } @Override public Optional getTaskInfo(RunId runId, TaskId taskId) { - Bson filter = Filters.and(Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), - Filters.eq(MongoDBQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); - TaskInfo taskInfo = collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)) + Bson filter = Filters.and(Filters.eq(MongoDbQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()), + Filters.eq(MongoDbQueryHelper.TaskInfo.TASK_ID_KEY, taskId.getId())); + TaskInfo taskInfo = collection(MongoDbQueryHelper.TaskInfo.collectionName(namespace)) .find(filter, TaskInfo.class).first(); return Optional.ofNullable(taskInfo); } @Override public Optional getRunInfo(RunId runId) { - Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); - RunInfo runInfo = collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)) + Bson filter = Filters.eq(MongoDbQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + RunInfo runInfo = collection(MongoDbQueryHelper.RunInfo.collectionName(namespace)) .find(filter, RunInfo.class).first(); return Optional.ofNullable(runInfo); } @@ -167,15 +196,17 @@ public void createRunInfo(RunInfo runInfo) { BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); codec.get(RunInfo.class).encode(writer, runInfo, EncoderContext.builder().build()); BsonDocument doc = writer.getDocument(); - collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)) + collection(MongoDbQueryHelper.RunInfo.collectionName(namespace)) .insertOne(Document.parse(doc.toJson())); } @Override public boolean updateRunInfoEpoch(RunId runId) { - Bson filter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); - Bson update = Updates.set(MongoDBQueryHelper.RunInfo.LAST_UPDATE_KEY, System.currentTimeMillis()); - return collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).updateOne(filter, update) + Bson filter = Filters.eq(MongoDbQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + Bson update = Updates.set(MongoDbQueryHelper.RunInfo.LAST_UPDATE_KEY, + System.currentTimeMillis()); + return collection(MongoDbQueryHelper.RunInfo.collectionName(namespace)) + .updateOne(filter, update) .wasAcknowledged(); } @@ -189,17 +220,17 @@ public void createTaskInfos(RunId runId, List taskInfos) { BsonDocument doc = writer.getDocument(); docs.add(Document.parse(doc.toJson())); } - collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)) + collection(MongoDbQueryHelper.TaskInfo.collectionName(namespace)) .insertMany(docs); } @Override public boolean cleanup(RunId runId) { - Bson taskFilter = Filters.eq(MongoDBQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()); - collection(MongoDBQueryHelper.TaskInfo.collectionName(namespace)).deleteMany(taskFilter); + Bson taskFilter = Filters.eq(MongoDbQueryHelper.TaskInfo.RUN_ID_KEY, runId.getId()); + collection(MongoDbQueryHelper.TaskInfo.collectionName(namespace)).deleteMany(taskFilter); - Bson runFilter = Filters.eq(MongoDBQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); - collection(MongoDBQueryHelper.RunInfo.collectionName(namespace)).deleteMany(runFilter); + Bson runFilter = Filters.eq(MongoDbQueryHelper.RunInfo.RUN_ID_KEY, runId.getId()); + collection(MongoDbQueryHelper.RunInfo.collectionName(namespace)).deleteMany(runFilter); return true; } diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java deleted file mode 100644 index c6c7f9b..0000000 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDBPersistanceAdapterBuilder.java +++ /dev/null @@ -1,25 +0,0 @@ -package io.github.pavansharma36.workflow.mongodb.adapter.builder; - -import com.mongodb.client.MongoClient; -import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; -import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder; -import io.github.pavansharma36.workflow.mongodb.adapter.MongoDBPersistenceAdapter; -import lombok.AccessLevel; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; - -@RequiredArgsConstructor(access = AccessLevel.PRIVATE) -public class MongoDBPersistanceAdapterBuilder extends BasePersistenceAdapterBuilder { - - private final String database; - private final MongoClient mongoClient; - - public static MongoDBPersistanceAdapterBuilder builder(@NonNull String database, @NonNull MongoClient mongoClient) { - return new MongoDBPersistanceAdapterBuilder(database, mongoClient); - } - - @Override - public PersistenceAdapter build() { - return new MongoDBPersistenceAdapter(namespace, pollDelayGenerator, database, mongoClient, serde); - } -} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDbPersistanceAdapterBuilder.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDbPersistanceAdapterBuilder.java new file mode 100644 index 0000000..604ab8a --- /dev/null +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/adapter/builder/MongoDbPersistanceAdapterBuilder.java @@ -0,0 +1,38 @@ +package io.github.pavansharma36.workflow.mongodb.adapter.builder; + +import com.mongodb.client.MongoClient; +import io.github.pavansharma36.workflow.api.adapter.PersistenceAdapter; +import io.github.pavansharma36.workflow.api.adapter.builder.BasePersistenceAdapterBuilder; +import io.github.pavansharma36.workflow.mongodb.adapter.MongoDbPersistenceAdapter; +import lombok.AccessLevel; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +/** + * persistence adapter builder using mongo client. + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class MongoDbPersistanceAdapterBuilder + extends BasePersistenceAdapterBuilder { + + private final String database; + private final MongoClient mongoClient; + + /** + * require args builder. + * + * @param database - database + * @param mongoClient - mongo client + * @return - this + */ + public static MongoDbPersistanceAdapterBuilder builder(@NonNull String database, + @NonNull MongoClient mongoClient) { + return new MongoDbPersistanceAdapterBuilder(database, mongoClient); + } + + @Override + public PersistenceAdapter build() { + return new MongoDbPersistenceAdapter(namespace, pollDelayGenerator, + database, mongoClient, serde); + } +} diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java index d1d76dc..0efb62e 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/IdCodecs.java @@ -9,8 +9,14 @@ import org.bson.codecs.DecoderContext; import org.bson.codecs.EncoderContext; +/** + * codecs for Id classes. + */ public class IdCodecs { + /** + * ManagerId codec. + */ public static class ManagerIdCodec implements Codec { @Override public ManagerId decode(BsonReader bsonReader, DecoderContext decoderContext) { @@ -28,6 +34,9 @@ public Class getEncoderClass() { } } + /** + * RunId codec. + */ public static class RunIdCodec implements Codec { @Override @@ -46,6 +55,9 @@ public Class getEncoderClass() { } } + /** + * TaskId codec. + */ public static class TaskIdCodec implements Codec { @Override diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDbQueryHelper.java similarity index 87% rename from workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java rename to workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDbQueryHelper.java index 9a9f09d..ba9bd0f 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDBQueryHelper.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/MongoDbQueryHelper.java @@ -4,10 +4,16 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; +/** + * Helper class for mongodb queries. + */ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) -public class MongoDBQueryHelper { +public class MongoDbQueryHelper { + /** + * ManagerInfo details. + */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class ManagerInfo { public static final String MANAGER_ID_KEY = "managerId"; @@ -19,6 +25,9 @@ public static String collectionName(String namespace) { } } + /** + * TaskInfo details. + */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class TaskInfo { @@ -35,6 +44,10 @@ public static String collectionName(String namespace) { } } + /** + * RunInfo details. + */ + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class RunInfo { public static final String RUN_ID_KEY = "runId"; diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java index 0083b6b..966dd65 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodec.java @@ -11,6 +11,11 @@ import org.bson.codecs.EncoderContext; import org.bson.codecs.configuration.CodecRegistry; +/** + * codec using serde. + * + * @param - type of class to serialize. + */ public class SerdeCodec implements Codec { private final Serializer serializer; @@ -18,6 +23,13 @@ public class SerdeCodec implements Codec { private final Codec rawBsonDocumentCodec; private final Class type; + /** + * required args. + * + * @param serde - serde + * @param codecRegistry - registry + * @param type - classType + */ public SerdeCodec(Serde serde, CodecRegistry codecRegistry, Class type) { @@ -26,6 +38,7 @@ public SerdeCodec(Serde serde, this.rawBsonDocumentCodec = codecRegistry.get(RawBsonDocument.class); this.type = type; } + @Override public T decode(BsonReader bsonReader, DecoderContext decoderContext) { RawBsonDocument document = rawBsonDocumentCodec.decode(bsonReader, decoderContext); @@ -42,4 +55,5 @@ public void encode(BsonWriter bsonWriter, T t, EncoderContext encoderContext) { public Class getEncoderClass() { return type; } + } diff --git a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java index d7819bb..4c5eea6 100644 --- a/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java +++ b/workflow-mongodb/src/main/java/io/github/pavansharma36/workflow/mongodb/helper/SerdeCodecProvider.java @@ -7,11 +7,20 @@ import org.bson.codecs.Codec; import org.bson.codecs.configuration.CodecRegistry; +/** + * Helper class to get codec. + */ public class SerdeCodecProvider implements CodecRegistry { private final CodecRegistry delegation; private final Map, Codec> codecs; + /** + * required args. + * + * @param delegation - to delegate + * @param serde - serde + */ public SerdeCodecProvider(CodecRegistry delegation, Serde serde) { this.delegation = delegation; this.codecs = new HashMap<>(); @@ -20,16 +29,17 @@ public SerdeCodecProvider(CodecRegistry delegation, Serde serde) { } @Override - public Codec get(Class aClass, CodecRegistry codecRegistry) { - if (codecs.containsKey(aClass)) { - return (Codec) codecs.get(aClass); + public Codec get(Class clazz, CodecRegistry codecRegistry) { + if (codecs.containsKey(clazz)) { + return (Codec) codecs.get(clazz); } else { - return codecRegistry.get(aClass); + return codecRegistry.get(clazz); } } @Override - public Codec get(Class aClass) { - return get(aClass, delegation); + public Codec get(Class clazz) { + return get(clazz, delegation); } + } diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java index b15ff45..8e56bb2 100644 --- a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java @@ -8,7 +8,7 @@ import io.github.pavansharma36.workflow.api.util.FixedPollDelayGenerator; import io.github.pavansharma36.workflow.inmemory.builder.InmemoryQueueAdapterBuilder; import io.github.pavansharma36.workflow.inmemory.builder.InmemoryScheduleAdapterBuilder; -import io.github.pavansharma36.workflow.mongodb.adapter.builder.MongoDBPersistanceAdapterBuilder; +import io.github.pavansharma36.workflow.mongodb.adapter.builder.MongoDbPersistanceAdapterBuilder; import java.time.Duration; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.utility.DockerImageName; @@ -36,7 +36,7 @@ protected void post() { public WorkflowAdapter adapter() { final String namespace = "test"; return new WorkflowAdapterBuilder() - .withPersistenceAdapterBuilder(MongoDBPersistanceAdapterBuilder.builder("test", client) + .withPersistenceAdapterBuilder(MongoDbPersistanceAdapterBuilder.builder("test", client) .withNamespace(namespace)) .withScheduleAdapterBuilder(new InmemoryScheduleAdapterBuilder().withNamespace(namespace)) .withQueueAdapterBuilder(new InmemoryQueueAdapterBuilder().withNamespace(namespace)) From dc6aee5090650691038ca2e81ceec3f4c4792909 Mon Sep 17 00:00:00 2001 From: Pavan Sharma Date: Fri, 8 Dec 2023 00:11:50 +0530 Subject: [PATCH 2/2] fixed mongo tests --- workflow-api/src/test/resources/simplelogger.properties | 2 +- .../pavansharma36/workflow/mongodb/rule/MongoDBRule.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/workflow-api/src/test/resources/simplelogger.properties b/workflow-api/src/test/resources/simplelogger.properties index 2fbace9..40cfffe 100644 --- a/workflow-api/src/test/resources/simplelogger.properties +++ b/workflow-api/src/test/resources/simplelogger.properties @@ -1 +1 @@ -org.slf4j.simpleLogger.defaultLogLevel=warn \ No newline at end of file +org.slf4j.simpleLogger.defaultLogLevel=info \ No newline at end of file diff --git a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java index 8e56bb2..e898026 100644 --- a/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java +++ b/workflow-mongodb/src/test/java/io/github/pavansharma36/workflow/mongodb/rule/MongoDBRule.java @@ -38,8 +38,8 @@ public WorkflowAdapter adapter() { return new WorkflowAdapterBuilder() .withPersistenceAdapterBuilder(MongoDbPersistanceAdapterBuilder.builder("test", client) .withNamespace(namespace)) - .withScheduleAdapterBuilder(new InmemoryScheduleAdapterBuilder().withNamespace(namespace)) - .withQueueAdapterBuilder(new InmemoryQueueAdapterBuilder().withNamespace(namespace)) + .withScheduleAdapterBuilder(new InmemoryScheduleAdapterBuilder()) + .withQueueAdapterBuilder(new InmemoryQueueAdapterBuilder()) .withSchedulePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) .withQueuePollDelayGenerator(new FixedPollDelayGenerator(Duration.ofMillis(100L))) .build();