Skip to content

Commit

Permalink
Provide ability to set task-specific Zeebe properties
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanpelikan committed Apr 18, 2024
1 parent 4fb4b43 commit 65d7bcf
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 31 deletions.
42 changes: 42 additions & 0 deletions spring-boot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ To run Camunda 8 on your local computer for development purposes consider to use
1. [Worker ID](#worker-id)
1. [Module aware deployment](#module-aware-deployment)
1. [SPI Binding validation](#spi-binding-validation)
1. [Managing Camunda 8 connectivity](#managing-camunda-8-connectivity)
1. [Multi-instance](#multi-instance)
1. [Message correlation IDs](#message-correlation-ids)
1. [Transaction behavior](#transaction-behavior)
Expand Down Expand Up @@ -94,6 +95,47 @@ spring:
ddl-auto: update
```

### Managing Camunda 8 connectivity

The Camunda 8 adapter is based on the [spring-zeebe](https://github.com/camunda-community-hub/spring-zeebe) client.
Therefore, all settings about Camunda 8 connectivity have to be provided as can be found in that [documentation](https://github.com/camunda-community-hub/spring-zeebe#configuring-camunda-8-connection).

However, there are a couple of settings specific to tasks:

1. `task-timeout`: The lock duration Zeebe will wait, after the task has been fetched by your implementation,
until Zeebe assumes your execution has crashed and the task needs to be re-fetched. Zeebe`s current
default value is 5 minutes. Set this values in respect to the expected time to complete the task.
1. `stream-enabled`: Whether to use polling or streaming to receive new tasks. Current default value is `false` (means polling).
1. `stream-timeout`: The duration to refresh the stream's connection. Current default value is no timeout.
1. `poll-interval`: Interval to poll for new tasks, if streaming of tasks is disabled. Zeebe's current default value is 100ms.
1. `poll-request-timeout`: The request-timeout for polling Zeebe for new tasks. Zeebe`s current default value is 20 seconds.

All these values can be set for specific tasks or all tasks of a workflow or all tasks
of a workflow module. Task-specific values will override workflow's or workflow-module's values and workflow-specific
values will override workflow-module's values:

```yaml
vanillabp:
workflow-modules:
Demo:
adapters:
camunda8:
# default to all workflows of the workflow-module `Demo`
task-timeout: PT5M
workflows:
DemoWorkflow:
adapters:
camunda8:
# default to all tasks of the workflow `DemoWorkflow`
task-timeout: P10M # overrides vanillabp.workflow-modules.Demo.adapters.camunda8.task-timeout
tasks:
logError:
adapters:
camunda8:
# used only for the task 'logError' of the workflow `DemoWorkflow`
task-timeout: PT3S # overrides vanillabp.workflow-modules.Demo.workflows.DemoWorkflow.adapters.camunda8.task-timeout
```
## Multi-instance
Since Camunda 8 is a remote engine the workflow is processed in a different runtime environment. Due to this fact the Blueprint adapter cannot do the entire binding of multi-instance context information under the hood. In the BPMN the multi-instance aspects like the input element, the element's index and the total number of elements have to be defined according to a certain naming convention:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.vanillabp.camunda8;

import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1;
import io.vanillabp.springboot.adapter.VanillaBpProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.util.Map;

@ConfigurationProperties(prefix = VanillaBpProperties.PREFIX, ignoreUnknownFields = true)
Expand Down Expand Up @@ -44,7 +46,58 @@ public String getTenantId(

}

public static class AdapterConfiguration {
public WorkerProperties getUserTaskWorkerProperties(
final String workflowModuleId) {

return getWorkerProperties(workflowModuleId, null, null);

}

public WorkerProperties getWorkerProperties(
final String workflowModuleId,
final String bpmnProcessId,
final String taskDefinition) {

WorkerProperties result = new WorkerProperties();

final var workflowModule = workflowModules.get(workflowModuleId);
if (workflowModule == null) {
return result;
}
final var workflowModuleAdapter = workflowModule.getAdapters().get(Camunda8AdapterConfiguration.ADAPTER_ID);
if (workflowModuleAdapter != null) {
result.apply(workflowModuleAdapter);
}

if (bpmnProcessId == null) {
return result;
}
final var workflow = workflowModule.getWorkflows().get(bpmnProcessId);
if (workflow == null) {
return result;
}
final var workflowAdapter = workflow.getAdapters().get(Camunda8AdapterConfiguration.ADAPTER_ID);
if (workflowAdapter != null) {
result.apply(workflowAdapter);
}

if (taskDefinition == null) {
return result;
}
final var task = workflow.getTasks().get(taskDefinition);
if (task == null) {
return result;
}
final var taskAdapter = task.getAdapters().get(Camunda8AdapterConfiguration.ADAPTER_ID);
if (taskAdapter != null) {
result.apply(taskAdapter);
}

return result;

}

public static class AdapterConfiguration extends WorkerProperties {

private boolean useTenants = true;

Expand Down Expand Up @@ -74,6 +127,8 @@ public static class WorkflowModuleAdapterProperties {

private Map<String, AdapterConfiguration> adapters = Map.of();

private Map<String, WorkflowAdapterProperties> workflows = Map.of();

public Map<String, AdapterConfiguration> getAdapters() {
return adapters;
}
Expand All @@ -82,6 +137,173 @@ public void setAdapters(Map<String, AdapterConfiguration> adapters) {
this.adapters = adapters;
}

public Map<String, WorkflowAdapterProperties> getWorkflows() { return workflows; }

public void setWorkflows(Map<String, WorkflowAdapterProperties> workflows) {

this.workflows = workflows;
workflows.forEach((bpmnProcessId, properties) -> {
properties.bpmnProcessId = bpmnProcessId;
properties.workflowModule = this;
});

}

}

public static class WorkflowAdapterProperties {

String bpmnProcessId;

WorkflowModuleAdapterProperties workflowModule;

private Map<String, WorkerProperties> adapters = Map.of();

private Map<String, TaskProperties> tasks = Map.of();

public WorkflowModuleAdapterProperties getWorkflowModule() {
return workflowModule;
}

public String getBpmnProcessId() {
return bpmnProcessId;
}

public Map<String, WorkerProperties> getAdapters() {
return adapters;
}

public void setAdapters(Map<String, WorkerProperties> adapters) {
this.adapters = adapters;
}

public Map<String, TaskProperties> getTasks() {
return tasks;
}

public void setTasks(Map<String, TaskProperties> tasks) {
this.tasks = tasks;
}

}

public static class WorkerProperties {

public WorkerProperties() {}

public void apply(
final WorkerProperties original) {

if (original.taskTimeout != null) {
this.taskTimeout = original.taskTimeout;
}
if (original.pollInterval != null) {
this.pollInterval = original.pollInterval;
}
if (original.pollRequestTimeout != null) {
this.pollRequestTimeout = original.pollRequestTimeout;
}
if (original.isStreamEnabled() != null) {
this.streamEnabled = original.isStreamEnabled();
}
if (original.streamTimeout != null) {
this.streamTimeout = original.streamTimeout;
}

}

public void applyToWorker(
final JobWorkerBuilderStep1.JobWorkerBuilderStep3 workerBuilder) {

if (taskTimeout != null) {
workerBuilder.timeout(taskTimeout);
}
applyToUserTaskWorker(workerBuilder);

}

public void applyToUserTaskWorker(
final JobWorkerBuilderStep1.JobWorkerBuilderStep3 workerBuilder) {

if (pollInterval != null) {
workerBuilder.pollInterval(pollInterval);
}
if (pollRequestTimeout != null) {
workerBuilder.requestTimeout(pollRequestTimeout);
}
if (streamEnabled!= null) {
workerBuilder.streamEnabled(streamEnabled);
}
if (streamTimeout != null) {
workerBuilder.streamTimeout(streamTimeout);
}

}

private Duration taskTimeout = null;

private Duration pollInterval = null;

private Duration pollRequestTimeout = null;

private Boolean streamEnabled = null;

private Duration streamTimeout = null;

public Duration getTaskTimeout() {
return taskTimeout;
}

public Duration getPollInterval() {
return pollInterval;
}

public Duration getPollRequestTimeout() {
return pollRequestTimeout;
}

public Boolean isStreamEnabled() {
return streamEnabled;
}

public Duration getStreamTimeout() {
return streamTimeout;
}

public void setTaskTimeout(Duration taskTimeout) {
this.taskTimeout = taskTimeout;
}

public void setPollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
}

public void setPollRequestTimeout(Duration pollRequestTimeout) {
this.pollRequestTimeout = pollRequestTimeout;
}

public void setStreamEnabled(boolean streamEnabled) {
this.streamEnabled = streamEnabled;
}

public void setStreamTimeout(Duration streamTimeout) {
this.streamTimeout = streamTimeout;
}

}

public static class TaskProperties {

private Map<String, WorkerProperties> adapters = Map.of();

public Map<String, WorkerProperties> getAdapters() {
return adapters;
}

public void setAdapters(Map<String, WorkerProperties> adapters) {
this.adapters = adapters;
}

}

}
Loading

0 comments on commit 65d7bcf

Please sign in to comment.