diff --git a/pom.xml b/pom.xml
index 462198a..35f60d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
camunda8-adapter
VanillaBP SPI adapter for Camunda 8.x
- 1.3.1-SNAPSHOT
+ 1.4.0-SNAPSHOT
pom
diff --git a/spring-boot/README.md b/spring-boot/README.md
index b2dfcf8..20ed3cc 100644
--- a/spring-boot/README.md
+++ b/spring-boot/README.md
@@ -190,15 +190,51 @@ Since Camunda 8 is an external system to your services one has to deal with even
1. Camunda 8 is not reachable / cannot process the confirmation of a completed task
1. The task to be completed was cancelled meanwhile e.g. due to boundary events
-If there is an exception in your business code and you have to roll back the transaction then Camunda's task retry-mechanism should retry as configured. Additionally, the `TaskException` is used for expected business errors handled by BPMN error boundary events which must not cause a rollback. To achieve both one should mark the service bean like this:
+In order to activate this behavior one has to mark methods accessing VanillaBP-APIs as `@Transactional`, either by
+using the method-level annotation:
```java
@Service
@WorkflowService(workflowAggregateClass = Ride.class)
-@Transactional(noRollbackFor = TaskException.class)
public class TaxiRide {
+ @Autowired
+ private ProcessService processService;
+
+ @Transactional
+ public void receivePayment(...) {
+ ...
+ processService.startWorkflow(ride);
+ ...
+ }
+
+ @Transactional
+ @WorkflowTask
+ public void chargeCreditCard(final Ride ride) {
+ ...
+ }
+
+ @Transactional
+ public void paymentReceived(final Ride ride) {
+ ...
+ processService.correlateMessage(ride, 'PaymentReceived');
+ ...
+ }
+}
+```
+
+or the class-level annotation:
+
+```java
+@Service
+@WorkflowService(workflowAggregateClass = Ride.class)
+@Transactional
+public class TaxiRide {
+ ...
+}
```
+If there is an exception in your business code and you have to roll back the transaction then Camunda's task retry-mechanism should retry as configured. Additionally, the `TaskException` is used for expected business errors handled by BPMN error boundary events which must not cause a rollback. This is handled by the adapter, one does not need to take care about it.
+
## Workflow aggregate serialization
On using C7 one can use workflow aggregates having relations and calculated values:
diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml
index 94d4aa6..a990186 100644
--- a/spring-boot/pom.xml
+++ b/spring-boot/pom.xml
@@ -5,7 +5,7 @@
org.camunda.community.vanillabp
camunda8-adapter
- 1.3.1-SNAPSHOT
+ 1.4.0-SNAPSHOT
camunda8-spring-boot-adapter
@@ -13,7 +13,7 @@
UTF-8
- 8.3.4.2
+ 8.5.4
@@ -38,14 +38,19 @@
1.1.1
- io.camunda
- spring-zeebe-starter
+ io.camunda.spring
+ spring-boot-starter-camunda
${spring.zeebe.version}
org.springframework
spring-tx
- 5.3.23
+ 6.1.3
+
+
+ org.springframework.boot
+ spring-boot-starter-aop
+ 3.2.5
jakarta.persistence
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
index 43b280c..c947d8d 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
@@ -1,12 +1,13 @@
package io.vanillabp.camunda8;
import io.camunda.zeebe.spring.client.CamundaAutoConfiguration;
-import io.camunda.zeebe.spring.client.jobhandling.DefaultCommandExceptionHandlingStrategy;
import io.vanillabp.camunda8.deployment.Camunda8DeploymentAdapter;
import io.vanillabp.camunda8.deployment.DeploymentRepository;
import io.vanillabp.camunda8.deployment.DeploymentResourceRepository;
import io.vanillabp.camunda8.deployment.DeploymentService;
import io.vanillabp.camunda8.service.Camunda8ProcessService;
+import io.vanillabp.camunda8.service.Camunda8TransactionAspect;
+import io.vanillabp.camunda8.service.Camunda8TransactionProcessor;
import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type;
import io.vanillabp.camunda8.wiring.Camunda8TaskHandler;
import io.vanillabp.camunda8.wiring.Camunda8TaskWiring;
@@ -17,6 +18,8 @@
import io.vanillabp.springboot.adapter.VanillaBpProperties;
import io.vanillabp.springboot.parameters.MethodParameter;
import jakarta.annotation.PostConstruct;
+import java.lang.reflect.Method;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -29,13 +32,13 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
+import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
import org.springframework.data.repository.CrudRepository;
-import java.lang.reflect.Method;
-import java.util.List;
-
@AutoConfigurationPackage(basePackageClasses = Camunda8AdapterConfiguration.class)
@AutoConfigureBefore(CamundaAutoConfiguration.class)
@EnableConfigurationProperties(Camunda8VanillaBpProperties.class)
@@ -57,9 +60,6 @@ public class Camunda8AdapterConfiguration extends AdapterConfigurationBase Camunda8ProcessService> newProcessServiceImplementation(
final var result = new Camunda8ProcessService(
camunda8Properties,
+ eventPublisher,
workflowAggregateRepository,
- workflowAggregate -> springDataUtil.getId(workflowAggregate),
+ springDataUtil::getId,
workflowAggregateClass);
putConnectableService(workflowAggregateClass, result);
@@ -185,4 +196,31 @@ public SpringBeanUtil vanillabpSpringBeanUtil(
}
+ /*
+ * https://www.tirasa.net/en/blog/dynamic-spring-s-transactional-2020-edition
+ */
+ /*
+ @Bean
+ public static BeanFactoryPostProcessor camunda8TransactionInterceptorInjector() {
+
+ return beanFactory -> {
+ String[] names = beanFactory.getBeanNamesForType(TransactionInterceptor.class);
+ for (String name : names) {
+ BeanDefinition bd = beanFactory.getBeanDefinition(name);
+ bd.setBeanClassName(Camunda8TransactionInterceptor.class.getName());
+ bd.setFactoryBeanName(null);
+ bd.setFactoryMethodName(null);
+ }
+ };
+
+ }
+ */
+
+ @Bean
+ public Camunda8TransactionProcessor camunda8TransactionProcessor() {
+
+ return new Camunda8TransactionProcessor();
+
+ }
+
}
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java
index 26e459b..e5b03de 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java
@@ -39,8 +39,8 @@ public String getTenantId(
if (!configuration.isUseTenants()) {
return null;
}
- if (StringUtils.hasText(configuration.getTenant())) {
- return configuration.getTenant();
+ if (StringUtils.hasText(configuration.getTenantId())) {
+ return configuration.getTenantId();
}
return workflowModuleId;
@@ -101,7 +101,7 @@ public static class AdapterConfiguration extends WorkerProperties {
private boolean useTenants = true;
- private String tenant;
+ private String tenantId;
public boolean isUseTenants() {
return useTenants;
@@ -111,12 +111,12 @@ public void setUseTenants(boolean useTenants) {
this.useTenants = useTenants;
}
- public String getTenant() {
- return tenant;
+ public String getTenantId() {
+ return tenantId;
}
- public void setTenant(String tenant) {
- this.tenant = tenant;
+ public void setTenantId(String tenantId) {
+ this.tenantId = tenantId;
}
}
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
index f058308..0c7eda7 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
@@ -5,15 +5,18 @@
import io.vanillabp.camunda8.Camunda8VanillaBpProperties;
import io.vanillabp.springboot.adapter.AdapterAwareProcessService;
import io.vanillabp.springboot.adapter.ProcessServiceImplementation;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.repository.CrudRepository;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
@Transactional(propagation = Propagation.MANDATORY)
public class Camunda8ProcessService
@@ -29,18 +32,22 @@ public class Camunda8ProcessService
private final Camunda8VanillaBpProperties camunda8Properties;
+ private final ApplicationEventPublisher publisher;
+
private AdapterAwareProcessService parent;
-
+
private ZeebeClient client;
-
+
public Camunda8ProcessService(
final Camunda8VanillaBpProperties camunda8Properties,
+ final ApplicationEventPublisher publisher,
final CrudRepository workflowAggregateRepository,
final Function getWorkflowAggregateId,
final Class workflowAggregateClass) {
super();
this.camunda8Properties = camunda8Properties;
+ this.publisher = publisher;
this.workflowAggregateRepository = workflowAggregateRepository;
this.workflowAggregateClass = workflowAggregateClass;
this.getWorkflowAggregateId = getWorkflowAggregateId;
@@ -98,31 +105,34 @@ public CrudRepository getWorkflowAggregateRepository() {
@Override
public DE startWorkflow(
final DE workflowAggregate) throws Exception {
-
- // persist to get ID in case of @Id @GeneratedValue
- // or force optimistic locking exceptions before running
- // the workflow if aggregate was already persisted before
- final var attachedAggregate = workflowAggregateRepository
- .save(workflowAggregate);
-
- final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId());
- final var command = client
- .newCreateInstanceCommand()
- .bpmnProcessId(parent.getPrimaryBpmnProcessId())
- .latestVersion()
- .variables(attachedAggregate);
- (tenantId == null
- ? command
- : command.tenantId(tenantId))
- .send()
- .get(10, TimeUnit.SECONDS);
+ return runInTransaction(
+ workflowAggregate,
+ attachedAggregate -> {
+ final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId());
+ final var command = client
+ .newCreateInstanceCommand()
+ .bpmnProcessId(parent.getPrimaryBpmnProcessId())
+ .latestVersion()
+ .variables(attachedAggregate);
- try {
- return attachedAggregate;
- } catch (RuntimeException exception) {
- throw exception;
- }
+ try {
+ (tenantId == null
+ ? command
+ : command.tenantId(tenantId))
+ .send()
+ .get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Starting workflow '"
+ + parent.getPrimaryBpmnProcessId()
+ + "‘ for aggregate '"
+ + attachedAggregate
+ + "' failed!",
+ e);
+ }
+ },
+ "startWorkflow");
}
@@ -130,19 +140,20 @@ public DE startWorkflow(
public DE correlateMessage(
final DE workflowAggregate,
final String messageName) {
-
- final var attachedAggregate = workflowAggregateRepository
- .save(workflowAggregate);
- final var correlationId = getWorkflowAggregateId
- .apply(workflowAggregate);
-
- correlateMessage(
+
+ return runInTransaction(
workflowAggregate,
- messageName,
- correlationId.toString());
-
- return attachedAggregate;
-
+ attachedAggregate -> {
+ final var correlationId = getWorkflowAggregateId
+ .apply(workflowAggregate);
+
+ doCorrelateMessage(
+ workflowAggregate,
+ messageName,
+ correlationId.toString());
+ },
+ "correlateMessage");
+
}
@Override
@@ -161,12 +172,21 @@ public DE correlateMessage(
final DE workflowAggregate,
final String messageName,
final String correlationId) {
-
- // persist to get ID in case of @Id @GeneratedValue
- // and force optimistic locking exceptions before running
- // the workflow if aggregate was already persisted before
- final var attachedAggregate = workflowAggregateRepository
- .save(workflowAggregate);
+
+ return runInTransaction(
+ workflowAggregate,
+ attachedAggregate -> doCorrelateMessage(
+ attachedAggregate,
+ messageName,
+ correlationId),
+ "correlateMessage-by-correlationId");
+
+ }
+
+ private void doCorrelateMessage(
+ final DE attachedAggregate,
+ final String messageName,
+ final String correlationId) {
final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId());
final var command = client
@@ -188,8 +208,6 @@ public DE correlateMessage(
parent.getPrimaryBpmnProcessId(),
messageKey);
- return attachedAggregate;
-
}
@Override
@@ -209,23 +227,23 @@ public DE correlateMessage(
public DE completeTask(
final DE workflowAggregate,
final String taskId) {
-
- // force optimistic locking exceptions before running the workflow
- final var attachedAggregate = workflowAggregateRepository
- .save(workflowAggregate);
-
- client
- .newCompleteCommand(Long.parseLong(taskId, 16))
- .variables(attachedAggregate)
- .send()
- .join();
- logger.trace("Complete usertask '{}' for process '{}'",
+ return runInTransaction(
+ workflowAggregate,
taskId,
- parent.getPrimaryBpmnProcessId());
-
- return attachedAggregate;
-
+ attachedAggregate -> {
+ client
+ .newCompleteCommand(Long.parseLong(taskId, 16))
+ .variables(attachedAggregate)
+ .send()
+ .join();
+
+ logger.trace("Complete task '{}' of process '{}'",
+ taskId,
+ parent.getPrimaryBpmnProcessId());
+ },
+ "completeTask");
+
}
@Override
@@ -233,7 +251,21 @@ public DE completeUserTask(
final DE workflowAggregate,
final String taskId) {
- return completeTask(workflowAggregate, taskId);
+ return runInTransaction(
+ workflowAggregate,
+ taskId,
+ attachedAggregate -> {
+ client
+ .newCompleteCommand(Long.parseLong(taskId, 16))
+ .variables(attachedAggregate)
+ .send()
+ .join();
+
+ logger.trace("Complete user task '{}' of process '{}'",
+ taskId,
+ parent.getPrimaryBpmnProcessId());
+ },
+ "completeUserTask");
}
@@ -243,22 +275,22 @@ public DE cancelTask(
final String taskId,
final String errorCode) {
- // force optimistic locking exceptions before running the workflow
- final var attachedAggregate = workflowAggregateRepository
- .save(workflowAggregate);
-
- client
- .newThrowErrorCommand(Long.parseLong(taskId))
- .errorCode(errorCode)
- .send()
- .join();
-
- logger.trace("Complete usertask '{}' for process '{}'",
+ return runInTransaction(
+ workflowAggregate,
taskId,
- parent.getPrimaryBpmnProcessId());
-
- return attachedAggregate;
-
+ attachedAggregate -> {
+ client
+ .newThrowErrorCommand(Long.parseLong(taskId))
+ .errorCode(errorCode)
+ .send()
+ .join();
+
+ logger.trace("Complete task '{}' of process '{}'",
+ taskId,
+ parent.getPrimaryBpmnProcessId());
+ },
+ "cancelTask");
+
}
@Override
@@ -271,4 +303,54 @@ public DE cancelUserTask(
}
+ private DE runInTransaction(
+ final DE workflowAggregate,
+ final Consumer runnable,
+ final String methodSignature) {
+
+ return runInTransaction(
+ workflowAggregate,
+ null,
+ runnable,
+ methodSignature);
+
+ }
+
+ private DE runInTransaction(
+ final DE workflowAggregate,
+ final String taskIdToTestForAlreadyCompletedOrCancelled,
+ final Consumer runnable,
+ final String methodSignature) {
+
+ // persist to get ID in case of @Id @GeneratedValue
+ // or force optimistic locking exceptions before running
+ // the workflow if aggregate was already persisted before
+ final var attachedAggregate = workflowAggregateRepository
+ .save(workflowAggregate);
+
+ if (TransactionSynchronizationManager.isActualTransactionActive()) {
+ if (taskIdToTestForAlreadyCompletedOrCancelled != null) {
+ publisher.publishEvent(
+ new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+ methodSignature,
+ () -> client
+ .newUpdateTimeoutCommand(Long.parseUnsignedLong(taskIdToTestForAlreadyCompletedOrCancelled, 16))
+ .timeout(Duration.ofMinutes(10))
+ .send()
+ .join(5, TimeUnit.MINUTES), // needs to run synchronously
+ () -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId()));
+ }
+ publisher.publishEvent(
+ new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+ methodSignature,
+ () -> runnable.accept(attachedAggregate),
+ () -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId()));
+ } else {
+ runnable.accept(attachedAggregate);
+ }
+
+ return attachedAggregate;
+
+ }
+
}
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
new file mode 100644
index 0000000..b5ffa45
--- /dev/null
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
@@ -0,0 +1,241 @@
+package io.vanillabp.camunda8.service;
+
+import io.vanillabp.spi.service.TaskException;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
+
+@Aspect
+public class Camunda8TransactionAspect {
+
+ private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionAspect.class);
+
+ public static class TaskHandlerActions {
+ public Supplier>> testForTaskAlreadyCompletedOrCancelledCommand;
+ public Map.Entry, Function> bpmnErrorCommand;
+ public Map.Entry, Function> handlerFailedCommand;
+ public Supplier>> handlerCompletedCommand;
+ }
+
+ public static class RunDeferredInTransaction {
+ public RunDeferredInTransactionSupplier[] argsSupplier;
+ public Runnable saveAggregateAfterWorkflowTask;
+ }
+
+ public interface RunDeferredInTransactionSupplier extends Supplier