From 1ab8f5e22c37cd3c69804150acadf04122e48e53 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Tue, 16 Apr 2024 07:16:48 +0200
Subject: [PATCH 01/15] Introduce transaction interceptor

---
 spring-boot/pom.xml                           |   8 +-
 .../Camunda8AdapterConfiguration.java         |  42 +++-
 .../service/Camunda8ProcessService.java       | 208 +++++++++++------
 .../Camunda8TransactionInterceptor.java       | 106 +++++++++
 .../service/Camunda8TransactionProcessor.java | 147 ++++++++++++
 .../camunda8/wiring/Camunda8TaskHandler.java  | 221 +++++++++++++-----
 .../camunda8/wiring/Camunda8TaskWiring.java   |  10 +-
 .../wiring/Camunda8UserTaskHandler.java       |   2 +-
 8 files changed, 594 insertions(+), 150 deletions(-)
 create mode 100644 spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java
 create mode 100644 spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java

diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml
index 94d4aa6..9b58fa0 100644
--- a/spring-boot/pom.xml
+++ b/spring-boot/pom.xml
@@ -13,7 +13,7 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spring.zeebe.version>8.3.4.2</spring.zeebe.version>
+    <spring.zeebe.version>8.5.0-rc2</spring.zeebe.version>
   </properties>
 
   <build>
@@ -38,14 +38,14 @@
       <version>1.1.1</version>
     </dependency>
     <dependency>
-      <groupId>io.camunda</groupId>
-      <artifactId>spring-zeebe-starter</artifactId>
+      <groupId>io.camunda.spring</groupId>
+      <artifactId>spring-boot-starter-camunda</artifactId>
       <version>${spring.zeebe.version}</version>
     </dependency>
     <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-tx</artifactId>
-      <version>5.3.23</version>
+      <version>6.1.3</version>
     </dependency>
     <dependency>
       <groupId>jakarta.persistence</groupId>
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
index 43b280c..a1a89c4 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
@@ -1,12 +1,13 @@
 package io.vanillabp.camunda8;
 
 import io.camunda.zeebe.spring.client.CamundaAutoConfiguration;
-import io.camunda.zeebe.spring.client.jobhandling.DefaultCommandExceptionHandlingStrategy;
 import io.vanillabp.camunda8.deployment.Camunda8DeploymentAdapter;
 import io.vanillabp.camunda8.deployment.DeploymentRepository;
 import io.vanillabp.camunda8.deployment.DeploymentResourceRepository;
 import io.vanillabp.camunda8.deployment.DeploymentService;
 import io.vanillabp.camunda8.service.Camunda8ProcessService;
+import io.vanillabp.camunda8.service.Camunda8TransactionInterceptor;
+import io.vanillabp.camunda8.service.Camunda8TransactionProcessor;
 import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type;
 import io.vanillabp.camunda8.wiring.Camunda8TaskHandler;
 import io.vanillabp.camunda8.wiring.Camunda8TaskWiring;
@@ -23,15 +24,19 @@
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
 import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
 import org.springframework.boot.autoconfigure.AutoConfigureBefore;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Scope;
 import org.springframework.data.repository.CrudRepository;
+import org.springframework.transaction.interceptor.TransactionInterceptor;
 
 import java.lang.reflect.Method;
 import java.util.List;
@@ -57,9 +62,6 @@ public class Camunda8AdapterConfiguration extends AdapterConfigurationBase<Camun
     @Autowired
     private ApplicationContext applicationContext;
 
-    @Autowired
-    private DefaultCommandExceptionHandlingStrategy commandExceptionHandlingStrategy;
-
     @Autowired
     private DeploymentRepository deploymentRepository;
 
@@ -69,6 +71,9 @@ public class Camunda8AdapterConfiguration extends AdapterConfigurationBase<Camun
     @Autowired
     private Camunda8VanillaBpProperties camunda8Properties;
 
+    @Autowired
+    private ApplicationEventPublisher eventPublisher;
+
     @PostConstruct
     public void init() {
         
@@ -148,7 +153,6 @@ public Camunda8TaskHandler camunda8TaskHandler(
         
         return new Camunda8TaskHandler(
                 taskType,
-                commandExceptionHandlingStrategy,
                 repository,
                 bean,
                 method,
@@ -166,8 +170,9 @@ public <DE> Camunda8ProcessService<?> newProcessServiceImplementation(
 
         final var result = new Camunda8ProcessService<DE>(
                 camunda8Properties,
+                eventPublisher,
                 workflowAggregateRepository,
-                workflowAggregate -> springDataUtil.getId(workflowAggregate),
+                springDataUtil::getId,
                 workflowAggregateClass);
 
         putConnectableService(workflowAggregateClass, result);
@@ -185,4 +190,29 @@ public SpringBeanUtil vanillabpSpringBeanUtil(
 
     }
 
+    /*
+     * https://www.tirasa.net/en/blog/dynamic-spring-s-transactional-2020-edition
+     */
+    @Bean
+    public static BeanFactoryPostProcessor camunda8TransactionInterceptorInjector() {
+
+        return beanFactory -> {
+            String[] names = beanFactory.getBeanNamesForType(TransactionInterceptor.class);
+            for (String name : names) {
+                BeanDefinition bd = beanFactory.getBeanDefinition(name);
+                bd.setBeanClassName(Camunda8TransactionInterceptor.class.getName());
+                bd.setFactoryBeanName(null);
+                bd.setFactoryMethodName(null);
+            }
+        };
+
+    }
+
+    @Bean
+    public Camunda8TransactionProcessor camunda8TransactionProcessor() {
+
+        return new Camunda8TransactionProcessor();
+
+    }
+
 }
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
index f058308..80fd1a1 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
@@ -7,12 +7,16 @@
 import io.vanillabp.springboot.adapter.ProcessServiceImplementation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.data.repository.CrudRepository;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 @Transactional(propagation = Propagation.MANDATORY)
@@ -29,18 +33,22 @@ public class Camunda8ProcessService<DE>
 
     private final Camunda8VanillaBpProperties camunda8Properties;
 
+    private final ApplicationEventPublisher publisher;
+
     private AdapterAwareProcessService<DE> parent;
     
     private ZeebeClient client;
-        
+
     public Camunda8ProcessService(
             final Camunda8VanillaBpProperties camunda8Properties,
+            final ApplicationEventPublisher publisher,
             final CrudRepository<DE, Object> workflowAggregateRepository,
             final Function<DE, Object> getWorkflowAggregateId,
             final Class<DE> workflowAggregateClass) {
         
         super();
         this.camunda8Properties = camunda8Properties;
+        this.publisher = publisher;
         this.workflowAggregateRepository = workflowAggregateRepository;
         this.workflowAggregateClass = workflowAggregateClass;
         this.getWorkflowAggregateId = getWorkflowAggregateId;
@@ -98,31 +106,33 @@ public CrudRepository<DE, Object> getWorkflowAggregateRepository() {
     @Override
     public DE startWorkflow(
             final DE workflowAggregate) throws Exception {
-        
-        // persist to get ID in case of @Id @GeneratedValue
-        // or force optimistic locking exceptions before running
-        // the workflow if aggregate was already persisted before
-        final var attachedAggregate = workflowAggregateRepository
-                .save(workflowAggregate);
-
-        final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId());
-        final var command = client
-                .newCreateInstanceCommand()
-                .bpmnProcessId(parent.getPrimaryBpmnProcessId())
-                .latestVersion()
-                .variables(attachedAggregate);
 
-        (tenantId == null
-                ? command
-                : command.tenantId(tenantId))
-                .send()
-                .get(10, TimeUnit.SECONDS);
+        return runInTransaction(
+                workflowAggregate,
+                attachedAggregate -> {
+                    final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId());
+                    final var command = client
+                            .newCreateInstanceCommand()
+                            .bpmnProcessId(parent.getPrimaryBpmnProcessId())
+                            .latestVersion()
+                            .variables(attachedAggregate);
 
-        try {
-            return attachedAggregate;
-        } catch (RuntimeException exception) {
-            throw exception;
-        }
+                    try {
+                        (tenantId == null
+                                ? command
+                                : command.tenantId(tenantId))
+                                .send()
+                                .get(10, TimeUnit.SECONDS);
+                    } catch (Exception e) {
+                        throw new RuntimeException(
+                                "Starting workflow '"
+                                + parent.getPrimaryBpmnProcessId()
+                                + "‘ for aggregate '"
+                                + attachedAggregate
+                                + "' failed!",
+                                e);
+                    }
+                });
         
     }
 
@@ -130,19 +140,19 @@ public DE startWorkflow(
     public DE correlateMessage(
             final DE workflowAggregate,
             final String messageName) {
-        
-        final var attachedAggregate = workflowAggregateRepository
-                .save(workflowAggregate);
-        final var correlationId = getWorkflowAggregateId
-                .apply(workflowAggregate);
-        
-        correlateMessage(
+
+        return runInTransaction(
                 workflowAggregate,
-                messageName,
-                correlationId.toString());
-        
-        return attachedAggregate;
-        
+                attachedAggregate -> {
+                    final var correlationId = getWorkflowAggregateId
+                            .apply(workflowAggregate);
+
+                    doCorrelateMessage(
+                            workflowAggregate,
+                            messageName,
+                            correlationId.toString());
+                });
+
     }
     
     @Override
@@ -161,12 +171,20 @@ public DE correlateMessage(
             final DE workflowAggregate,
             final String messageName,
             final String correlationId) {
-            
-        // persist to get ID in case of @Id @GeneratedValue
-        // and force optimistic locking exceptions before running
-        // the workflow if aggregate was already persisted before
-        final var attachedAggregate = workflowAggregateRepository
-                .save(workflowAggregate);
+
+        return runInTransaction(
+                workflowAggregate,
+                attachedAggregate -> doCorrelateMessage(
+                        attachedAggregate,
+                        messageName,
+                        correlationId));
+
+    }
+
+    private void doCorrelateMessage(
+            final DE attachedAggregate,
+            final String messageName,
+            final String correlationId) {
 
         final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId());
         final var command = client
@@ -188,8 +206,6 @@ public DE correlateMessage(
                 parent.getPrimaryBpmnProcessId(),
                 messageKey);
         
-        return attachedAggregate;
-        
     }
     
     @Override
@@ -209,23 +225,22 @@ public DE correlateMessage(
     public DE completeTask(
             final DE workflowAggregate,
             final String taskId) {
-        
-        // force optimistic locking exceptions before running the workflow
-        final var attachedAggregate = workflowAggregateRepository
-                .save(workflowAggregate);
-        
-        client
-                .newCompleteCommand(Long.parseLong(taskId, 16))
-                .variables(attachedAggregate)
-                .send()
-                .join();
 
-        logger.trace("Complete usertask '{}' for process '{}'",
+        return runInTransaction(
+                workflowAggregate,
                 taskId,
-                parent.getPrimaryBpmnProcessId());
-        
-        return attachedAggregate;
-        
+                attachedAggregate -> {
+                    client
+                            .newCompleteCommand(Long.parseLong(taskId, 16))
+                            .variables(attachedAggregate)
+                            .send()
+                            .join();
+
+                    logger.trace("Complete task '{}' of process '{}'",
+                            taskId,
+                            parent.getPrimaryBpmnProcessId());
+                });
+
     }
     
     @Override
@@ -243,22 +258,21 @@ public DE cancelTask(
             final String taskId,
             final String errorCode) {
 
-        // force optimistic locking exceptions before running the workflow
-        final var attachedAggregate = workflowAggregateRepository
-                .save(workflowAggregate);
-        
-        client
-                .newThrowErrorCommand(Long.parseLong(taskId))
-                .errorCode(errorCode)
-                .send()
-                .join();
-
-        logger.trace("Complete usertask '{}' for process '{}'",
+        return runInTransaction(
+                workflowAggregate,
                 taskId,
-                parent.getPrimaryBpmnProcessId());
-        
-        return attachedAggregate;
-        
+                attachedAggregate -> {
+                    client
+                            .newThrowErrorCommand(Long.parseLong(taskId))
+                            .errorCode(errorCode)
+                            .send()
+                            .join();
+
+                    logger.trace("Complete task '{}' of process '{}'",
+                            taskId,
+                            parent.getPrimaryBpmnProcessId());
+                });
+
     }
     
     @Override
@@ -271,4 +285,50 @@ public DE cancelUserTask(
 
     }
 
+    private DE runInTransaction(
+            final DE workflowAggregate,
+            final Consumer<DE> runnable) {
+
+        return runInTransaction(
+                workflowAggregate,
+                null,
+                runnable);
+
+    }
+
+    private DE runInTransaction(
+            final DE workflowAggregate,
+            final String taskIdToTestForAlreadyCompletedOrCancelled,
+            final Consumer<DE> runnable) {
+
+        // persist to get ID in case of @Id @GeneratedValue
+        // or force optimistic locking exceptions before running
+        // the workflow if aggregate was already persisted before
+        final var attachedAggregate = workflowAggregateRepository
+                .save(workflowAggregate);
+
+        if (TransactionSynchronizationManager.isActualTransactionActive()) {
+            if (taskIdToTestForAlreadyCompletedOrCancelled != null) {
+                publisher.publishEvent(
+                        new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                                Camunda8TransactionInterceptor.class,
+                                () -> client
+                                        .newUpdateTimeoutCommand(Long.parseUnsignedLong(taskIdToTestForAlreadyCompletedOrCancelled, 16))
+                                        .timeout(Duration.ofMinutes(10))
+                                        .send()
+                                        .join(5, TimeUnit.MINUTES), // needs to run synchronously
+                                () -> "update timeout (BPMN: " + parent.getPrimaryBpmnProcessId() + ")"));            }
+            publisher.publishEvent(
+                    new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                            Camunda8TransactionInterceptor.class,
+                            () -> runnable.accept(attachedAggregate),
+                            () -> "complete command (BPMN: " + parent.getPrimaryBpmnProcessId() + ")"));
+        } else {
+            runnable.accept(attachedAggregate);
+        }
+
+        return attachedAggregate;
+
+    }
+
 }
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java
new file mode 100644
index 0000000..d4ff928
--- /dev/null
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java
@@ -0,0 +1,106 @@
+package io.vanillabp.camunda8.service;
+
+import io.vanillabp.spi.service.TaskException;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.transaction.TransactionManager;
+import org.springframework.transaction.interceptor.TransactionAttributeSource;
+import org.springframework.transaction.interceptor.TransactionInterceptor;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class Camunda8TransactionInterceptor extends TransactionInterceptor {
+
+    private final ApplicationEventPublisher publisher;
+
+    public static final ThreadLocal<TaskHandlerActions> actions = ThreadLocal.withInitial(TaskHandlerActions::new);
+
+    public Camunda8TransactionInterceptor(
+            final TransactionManager ptm,
+            final TransactionAttributeSource tas,
+            final ApplicationEventPublisher publisher) {
+        super(ptm, tas);
+        this.publisher = publisher;
+    }
+
+    public static class TaskHandlerActions {
+        public Map.Entry<Runnable, Supplier<String>> testForTaskAlreadyCompletedOrCancelledCommand;
+        public Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommand;
+        public Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommand;
+        public Map.Entry<Runnable, Supplier<String>> handlerCompletedCommand;
+    }
+
+    @Override
+    protected Object invokeWithinTransaction(
+            final Method method,
+            final Class<?> targetClass,
+            final InvocationCallback invocation) throws Throwable {
+
+        return super.invokeWithinTransaction(method, targetClass, () -> {
+            if (!TransactionSynchronizationManager.isActualTransactionActive()) {
+                return invocation.proceedWithInvocation();
+            }
+            try {
+                final var result = invocation.proceedWithInvocation();
+                if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                                    Camunda8TransactionInterceptor.class,
+                                    actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(),
+                                    actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue()));
+                }
+                if (actions.get().handlerCompletedCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                    Camunda8TransactionInterceptor.class,
+                                    actions.get().handlerCompletedCommand.getKey(),
+                                    actions.get().handlerCompletedCommand.getValue()));
+                }
+                return result;
+            } catch (TaskException taskError) {
+                if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                                    Camunda8TransactionInterceptor.class,
+                                    actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(),
+                                    actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue()));
+                }
+                if (actions.get().bpmnErrorCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                    Camunda8TransactionInterceptor.class,
+                                    () -> actions.get().bpmnErrorCommand.getKey().accept(taskError),
+                                    () -> actions.get().bpmnErrorCommand.getValue().apply(taskError)));
+                }
+                return null;
+            } catch (Exception e) {
+                if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                                    Camunda8TransactionInterceptor.class,
+                                    actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(),
+                                    actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue()));
+                }
+                if (actions.get().handlerFailedCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                    Camunda8TransactionInterceptor.class,
+                                    () -> actions.get().handlerFailedCommand.getKey().accept(e),
+                                    () -> actions.get().handlerFailedCommand.getValue().apply(e)));
+                }
+                throw e;
+            } finally {
+                actions.get().bpmnErrorCommand = null;
+                actions.get().handlerCompletedCommand = null;
+                actions.get().handlerFailedCommand = null;
+                actions.get().testForTaskAlreadyCompletedOrCancelledCommand = null;
+            }
+        });
+
+    }
+
+}
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
new file mode 100644
index 0000000..7475240
--- /dev/null
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
@@ -0,0 +1,147 @@
+package io.vanillabp.camunda8.service;
+
+import io.camunda.zeebe.client.api.command.ClientStatusException;
+import io.grpc.Status;
+import io.vanillabp.spi.service.TaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+import org.springframework.transaction.interceptor.TransactionAspectSupport;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
+
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class Camunda8TransactionProcessor {
+
+    private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionProcessor.class);
+
+    public static void registerCallbacks(
+            final Map.Entry<Runnable, Supplier<String>> testForTaskAlreadyCompletedOrCancelledCommand,
+            final Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommand,
+            final Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommand,
+            final Map.Entry<Runnable, Supplier<String>> handlerCompletedCommand) {
+
+        final var actions = Camunda8TransactionInterceptor.actions.get();
+        actions.testForTaskAlreadyCompletedOrCancelledCommand = testForTaskAlreadyCompletedOrCancelledCommand;
+        actions.bpmnErrorCommand = bpmnErrorCommand;
+        actions.handlerFailedCommand = handlerFailedCommand;
+        actions.handlerCompletedCommand = handlerCompletedCommand;
+
+    }
+
+    public static Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommandCallback() {
+
+        return Camunda8TransactionInterceptor
+                .actions
+                .get()
+                .bpmnErrorCommand;
+
+    }
+
+    public static Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommandCallback() {
+
+        return Camunda8TransactionInterceptor
+                .actions
+                .get()
+                .handlerFailedCommand;
+
+    }
+
+    public static Map.Entry<Runnable, Supplier<String>> handlerCompletedCommandCallback() {
+
+        return Camunda8TransactionInterceptor
+                .actions
+                .get()
+                .handlerCompletedCommand;
+
+    }
+
+    public static void unregisterCallbacks() {
+
+        final var actions = Camunda8TransactionInterceptor.actions.get();
+        actions.testForTaskAlreadyCompletedOrCancelledCommand = null;
+        actions.bpmnErrorCommand = null;
+        actions.handlerFailedCommand = null;
+        actions.handlerCompletedCommand = null;
+
+    }
+
+    public static class Camunda8CommandAfterTx extends ApplicationEvent {
+        final Supplier<String> description;
+        final Runnable runnable;
+        public Camunda8CommandAfterTx(
+                final Object source,
+                final Runnable runnable,
+                final Supplier<String> description) {
+            super(source);
+            this.runnable = runnable;
+            this.description = description;
+        }
+    }
+
+    public static class Camunda8TestForTaskAlreadyCompletedOrCancelled extends ApplicationEvent {
+        final Supplier<String> description;
+        final Runnable runnable;
+        public Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                final Object source,
+                final Runnable runnable,
+                final Supplier<String> description) {
+            super(source);
+            this.runnable = runnable;
+            this.description = description;
+        }
+    }
+
+    @TransactionalEventListener(
+            phase = TransactionPhase.BEFORE_COMMIT,
+            fallbackExecution = true)
+    public void processPreCommit(
+            final Camunda8TestForTaskAlreadyCompletedOrCancelled event) {
+
+        try {
+            // this runnable will test whether the task still exists
+            event.runnable.run();
+        } catch (Exception e) {
+            // if the task is completed or cancelled, then the tx is rolled back
+            if ((e instanceof ClientStatusException clientStatusException)
+                    && (clientStatusException.getStatus().getCode() == Status.NOT_FOUND.getCode())) {
+                logger.warn(
+                        "Will rollback because job was already completed/cancelled! Tested with command '{}‘ giving status 'NOT_FOUND'",
+                        event.description.get());
+            } else {
+                logger.warn(
+                        "Will rollback because testing for job '{}' failed!",
+                        event.description.get(),
+                        e);
+            }
+            if (TransactionSynchronizationManager.isActualTransactionActive()) {
+                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
+            }
+        }
+
+    }
+
+    @TransactionalEventListener(
+            phase = TransactionPhase.AFTER_COMMIT,
+            fallbackExecution = true)
+    public void processPostCommit(
+            final Camunda8CommandAfterTx event) {
+
+        try {
+            // this runnable will instruct Zeebe
+            event.runnable.run();
+        } catch (Exception e) {
+            logger.error(
+                    "Could not execute '{}'! Manual action required!",
+                    event.description.get(),
+                    e);
+        }
+
+    }
+
+}
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
index 3d5d39f..848d27f 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
@@ -1,15 +1,14 @@
 package io.vanillabp.camunda8.wiring;
 
-import io.camunda.zeebe.client.api.command.FinalCommandStep;
+import io.camunda.zeebe.client.ZeebeClient;
 import io.camunda.zeebe.client.api.response.ActivatedJob;
 import io.camunda.zeebe.client.api.worker.JobClient;
 import io.camunda.zeebe.client.api.worker.JobHandler;
-import io.camunda.zeebe.spring.client.jobhandling.CommandWrapper;
-import io.camunda.zeebe.spring.client.jobhandling.DefaultCommandExceptionHandlingStrategy;
+import io.vanillabp.camunda8.service.Camunda8TransactionProcessor;
 import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type;
 import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceIndexMethodParameter;
 import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceTotalMethodParameter;
-import io.vanillabp.spi.service.TaskEvent.Event;
+import io.vanillabp.spi.service.TaskEvent;
 import io.vanillabp.spi.service.TaskException;
 import io.vanillabp.springboot.adapter.MultiInstance;
 import io.vanillabp.springboot.adapter.TaskHandlerBase;
@@ -18,26 +17,29 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.data.repository.CrudRepository;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
-public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler {
+public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler, Consumer<ZeebeClient> {
 
     private static final Logger logger = LoggerFactory.getLogger(Camunda8TaskHandler.class);
 
-    private final DefaultCommandExceptionHandlingStrategy commandExceptionHandlingStrategy;
-
     private final Type taskType;
 
     private final String idPropertyName;
 
+    private ZeebeClient zeebeClient;
+
     public Camunda8TaskHandler(
             final Type taskType,
-            final DefaultCommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
             final CrudRepository<Object, Object> workflowAggregateRepository,
             final Object bean,
             final Method method,
@@ -46,11 +48,18 @@ public Camunda8TaskHandler(
 
         super(workflowAggregateRepository, bean, method, parameters);
         this.taskType = taskType;
-        this.commandExceptionHandlingStrategy = commandExceptionHandlingStrategy;
         this.idPropertyName = idPropertyName;
 
     }
-    
+
+    @Override
+    public void accept(
+            final ZeebeClient zeebeClient) {
+
+        this.zeebeClient = zeebeClient;
+
+    }
+
     @Override
     protected Logger getLogger() {
         
@@ -60,28 +69,34 @@ protected Logger getLogger() {
 
     @SuppressWarnings("unchecked")
     @Override
-    @Transactional
     public void handle(
             final JobClient client,
-            final ActivatedJob job) throws Exception {
+            final ActivatedJob job) {
 
-        CommandWrapper command = null;
+        Runnable jobPostAction = null;
+        Supplier<String> description = null;
         try {
             final var businessKey = getVariable(job, idPropertyName);
-            
-            logger.trace("Will handle task '{}' of workflow '{}' ('{}') as job '{}'",
+
+            logger.trace("Will handle task '{}' (task-definition '{}‘) of workflow '{}' (instance-id '{}') as job '{}'",
                     job.getElementId(),
+                    job.getType(),
+                    job.getBpmnProcessId(),
                     job.getProcessInstanceKey(),
-                    job.getProcessDefinitionKey(),
                     job.getKey());
-            
+
             final var taskIdRetrieved = new AtomicBoolean(false);
-            
+            final var workflowAggregateCache = new WorkflowAggregateCache();
+
+            Camunda8TransactionProcessor.registerCallbacks(
+                    doTestForTaskWasCompletedOrCancelled(job),
+                    doThrowError(client, job, workflowAggregateCache),
+                    doFailed(client, job),
+                    doComplete(client, job, workflowAggregateCache));
+
             final Function<String, Object> multiInstanceSupplier
                     = multiInstanceVariable -> getVariable(job, multiInstanceVariable);
-            
-            final var workflowAggregateCache = new WorkflowAggregateCache();
-            
+
             super.execute(
                     workflowAggregateCache,
                     businessKey,
@@ -100,7 +115,7 @@ public void handle(
                     (args, param) -> processTaskEventParameter(
                             args,
                             param,
-                            () -> Event.CREATED),
+                            () -> TaskEvent.Event.CREATED),
                     (args, param) -> processMultiInstanceIndexParameter(
                             args,
                             param,
@@ -127,21 +142,48 @@ public void handle(
 
             if ((taskType != Type.USERTASK)
                     && !taskIdRetrieved.get()) {
-                command = createCompleteCommand(client, job, workflowAggregateCache.workflowAggregate);
+                final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback();
+                if (callback != null) {
+                    jobPostAction = callback.getKey();
+                    description = callback.getValue();
+                }
             }
         } catch (TaskException bpmnError) {
-            command = createThrowErrorCommand(client, job, bpmnError);
+            final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback();
+            if (callback != null) {
+                jobPostAction = () -> callback.getKey().accept(bpmnError);
+                description = () -> callback.getValue().apply(bpmnError);
+            }
         } catch (Exception e) {
-            logger.error("Failed to execute job '{}'", job.getKey(), e);
-            command = createFailedCommand(client, job, e);
+            final var callback = Camunda8TransactionProcessor.handlerFailedCommandCallback();
+            if (callback != null) {
+                logger.error("Failed to execute job '{}'", job.getKey(), e);
+                jobPostAction = () -> callback.getKey().accept(e);
+                description = () -> callback.getValue().apply(e);
+            }
+        } finally {
+            Camunda8TransactionProcessor.unregisterCallbacks();
         }
 
-        if (command != null) {
-            command.executeAsync();
+        if (jobPostAction != null) {
+            try {
+                jobPostAction.run();
+            } catch (Exception e) {
+                if (description != null) {
+                    logger.error(
+                            "Could not execute '{}'! Manual action required!",
+                            description.get(),
+                            e);
+                } else {
+                    logger.error(
+                            "Manual action required due to:",
+                            e);
+                }
+            }
         }
 
     }
-    
+
     @Override
     protected Object getMultiInstanceElement(
             final String name,
@@ -195,54 +237,105 @@ private Object getVariable(
     }
 
     @SuppressWarnings("unchecked")
-    public CommandWrapper createCompleteCommand(
+    public Map.Entry<Runnable, Supplier<String>> doTestForTaskWasCompletedOrCancelled(
+            final ActivatedJob job) {
+
+        return Map.entry(
+                () -> zeebeClient
+                        .newUpdateTimeoutCommand(job)
+                        .timeout(Duration.ofMinutes(10))
+                        .send()
+                        .join(5, TimeUnit.MINUTES), // needs to run synchronously
+                () -> "update timeout (BPMN: " + job.getBpmnProcessId()
+                        + "; Element: " + job.getElementId()
+                        + "; Task-Definition: " + job.getType()
+                        + "; Process-Instance: " + job.getProcessInstanceKey()
+                        + "; Job: " + job.getKey()
+                        + ")");
+
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map.Entry<Runnable, Supplier<String>> doComplete(
             final JobClient jobClient,
             final ActivatedJob job,
-            final Object workflowAggregateId) {
+            final WorkflowAggregateCache workflowAggregateCache) {
 
-        var completeCommand = jobClient
-                .newCompleteCommand(job.getKey());
-        
-        if (workflowAggregateId != null) {
-            completeCommand = completeCommand.variables(workflowAggregateId);
-        }
-        
-        return new CommandWrapper(
-                (FinalCommandStep<Void>) ((FinalCommandStep<?>) completeCommand),
-                job,
-                commandExceptionHandlingStrategy);
+        return Map.entry(
+                () -> {
+                    var completeCommand = jobClient
+                            .newCompleteCommand(job.getKey());
+
+                    if (workflowAggregateCache.workflowAggregate != null) {
+                        completeCommand = completeCommand.variables(workflowAggregateCache.workflowAggregate);
+                    }
+
+                    completeCommand
+                            .send()
+                            .exceptionally(t -> {
+                                throw new RuntimeException("error", t);
+                            });
+                },
+                () -> "complete command (BPMN: " + job.getBpmnProcessId()
+                        + "; Element: " + job.getElementId()
+                        + "; Task-Definition: " + job.getType()
+                        + "; Process-Instance: " + job.getProcessInstanceKey()
+                        + "; Job: " + job.getKey()
+                        + ")");
 
     }
 
-    private CommandWrapper createThrowErrorCommand(
+    private Map.Entry<Consumer<TaskException>, Function<TaskException, String>> doThrowError(
             final JobClient jobClient,
             final ActivatedJob job,
-            final TaskException bpmnError) {
+            final WorkflowAggregateCache workflowAggregateCache) {
 
-        return new CommandWrapper(
-                jobClient
-                        .newThrowErrorCommand(job.getKey())
-                        .errorCode(bpmnError.getErrorCode())
-                        .errorMessage(bpmnError.getErrorName()),
-                job,
-                commandExceptionHandlingStrategy);
+        return Map.entry(
+                taskException -> {
+                    var throwErrorCommand = jobClient
+                            .newThrowErrorCommand(job.getKey())
+                            .errorCode(taskException.getErrorCode())
+                            .errorMessage(taskException.getErrorName());
 
+                    if (workflowAggregateCache.workflowAggregate != null) {
+                        throwErrorCommand = throwErrorCommand.variables(workflowAggregateCache.workflowAggregate);
+                    }
+
+                    throwErrorCommand
+                            .send()
+                            .exceptionally(t -> { throw new RuntimeException("error", t); });
+                },
+                taskException -> "throw error command (BPMN: " + job.getBpmnProcessId()
+                        + "; Element: " + job.getElementId()
+                        + "; Task-Definition: " + job.getType()
+                        + "; Process-Instance: " + job.getProcessInstanceKey()
+                        + "; Job: " + job.getKey()
+                        + ")");
     }
     
     @SuppressWarnings("unchecked")
-    private CommandWrapper createFailedCommand(
+    private Map.Entry<Consumer<Exception>, Function<Exception, String>> doFailed(
             final JobClient jobClient,
-            final ActivatedJob job,
-            final Exception e) {
-        
-        return new CommandWrapper(
-                (FinalCommandStep<Void>) ((FinalCommandStep<?>) jobClient
-                        .newFailCommand(job)
-                        .retries(0)
-                        .errorMessage(e.getMessage())),
-                job,
-                commandExceptionHandlingStrategy);
-        
+            final ActivatedJob job) {
+
+        return Map.entry(
+                exception -> {
+                    jobClient
+                            .newFailCommand(job)
+                            .retries(0)
+                            .errorMessage(exception.getMessage())
+                            .send()
+                            .exceptionally(t -> {
+                                throw new RuntimeException("error", t);
+                            });
+                },
+                taskException -> "fail command (BPMN: " + job.getBpmnProcessId()
+                        + "; Element: " + job.getElementId()
+                        + "; Task-Definition: " + job.getType()
+                        + "; Process-Instance: " + job.getProcessInstanceKey()
+                        + "; Job: " + job.getKey()
+                        + ")");
+
     }
 
 }
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java
index c3da080..c346cee 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java
@@ -55,6 +55,8 @@ public class Camunda8TaskWiring extends TaskWiringBase<Camunda8Connectable, Camu
     
     private List<JobWorkerBuilderStep3> workers = new LinkedList<>();
 
+    private List<Camunda8TaskHandler> handlers = new LinkedList<>();
+
     private Set<String> userTaskTenantIds = new HashSet<>();
     
     private final Camunda8VanillaBpProperties camunda8Properties;
@@ -96,6 +98,7 @@ public void accept(
             final ZeebeClient client) {
         
         this.client = client;
+        handlers.forEach(handler -> handler.accept(client));
 
     }
     
@@ -239,6 +242,11 @@ protected void connectToBpms(
                 method,
                 parameters,
                 idPropertyName);
+        if (this.client != null) {
+            taskHandler.accept(this.client);
+        } else {
+            handlers.add(taskHandler);
+        }
 
         if (connectable.getType() == Type.USERTASK) {
 
@@ -251,7 +259,7 @@ protected void connectToBpms(
             return;
             
         }
-        
+
         final var variablesToFetch = getVariablesToFetch(idPropertyName, parameters);
         final var worker = client
                 .newWorker()
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java
index 7daa7ec..c75b2a9 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java
@@ -18,7 +18,7 @@ public class Camunda8UserTaskHandler implements JobHandler {
     
     private final Map<String, Camunda8TaskHandler> taskHandlers = new HashMap<>();
     
-    private String workerId;
+    private final String workerId;
     
     public Camunda8UserTaskHandler(
             final String workerId) {

From eb899ecb0bd24d5adcde647209da845dcf3025f3 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Fri, 19 Apr 2024 14:40:59 +0200
Subject: [PATCH 02/15] Improve README.md

---
 spring-boot/README.md | 40 ++++++++++++++++++++++++++++++++++++++--
 1 file changed, 38 insertions(+), 2 deletions(-)

diff --git a/spring-boot/README.md b/spring-boot/README.md
index b2dfcf8..20ed3cc 100644
--- a/spring-boot/README.md
+++ b/spring-boot/README.md
@@ -190,15 +190,51 @@ Since Camunda 8 is an external system to your services one has to deal with even
 1. Camunda 8 is not reachable / cannot process the confirmation of a completed task
 1. The task to be completed was cancelled meanwhile e.g. due to boundary events
 
-If there is an exception in your business code and you have to roll back the transaction then Camunda's task retry-mechanism should retry as configured. Additionally, the `TaskException` is used for expected business errors handled by BPMN error boundary events which must not cause a rollback. To achieve both one should mark the service bean like this:
+In order to activate this behavior one has to mark methods accessing VanillaBP-APIs as `@Transactional`, either by
+using the method-level annotation:
 
 ```java
 @Service
 @WorkflowService(workflowAggregateClass = Ride.class)
-@Transactional(noRollbackFor = TaskException.class)
 public class TaxiRide {
+   @Autowired
+   private ProcessService<Ride> processService;
+
+   @Transactional
+   public void receivePayment(...) {
+      ...
+      processService.startWorkflow(ride);
+      ...
+   }
+
+   @Transactional
+   @WorkflowTask
+   public void chargeCreditCard(final Ride ride) {
+      ...
+   }
+
+   @Transactional
+   public void paymentReceived(final Ride ride) {
+      ...
+      processService.correlateMessage(ride, 'PaymentReceived');
+      ...
+   }
+}
+```
+
+or the class-level annotation:
+
+```java
+@Service
+@WorkflowService(workflowAggregateClass = Ride.class)
+@Transactional
+public class TaxiRide {
+   ...
+}
 ```
 
+If there is an exception in your business code and you have to roll back the transaction then Camunda's task retry-mechanism should retry as configured. Additionally, the `TaskException` is used for expected business errors handled by BPMN error boundary events which must not cause a rollback. This is handled by the adapter, one does not need to take care about it.
+
 ## Workflow aggregate serialization
 
 On using C7 one can use workflow aggregates having relations and calculated values:

From 8c4b4683ebb7cdbb811a00bc5fb43475bad8be87 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Fri, 19 Apr 2024 14:41:33 +0200
Subject: [PATCH 03/15] Upgade version due to TX incompatibility

---
 pom.xml             | 2 +-
 spring-boot/pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 462198a..35f60d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
   <artifactId>camunda8-adapter</artifactId>
 
   <name>VanillaBP SPI adapter for Camunda 8.x</name>
-  <version>1.3.1-SNAPSHOT</version>
+  <version>1.4.0-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <properties>
diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml
index 9b58fa0..4f239c5 100644
--- a/spring-boot/pom.xml
+++ b/spring-boot/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>org.camunda.community.vanillabp</groupId>
     <artifactId>camunda8-adapter</artifactId>
-    <version>1.3.1-SNAPSHOT</version>
+    <version>1.4.0-SNAPSHOT</version>
   </parent>
   
   <artifactId>camunda8-spring-boot-adapter</artifactId>

From 5597ab3348f91bb4051f31621036a24fcb10460c Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Mon, 6 May 2024 14:34:28 +0200
Subject: [PATCH 04/15] Fix wrong column type

---
 .../io/vanillabp/camunda8/liquibase/initial_setup.yaml        | 2 +-
 .../resources/io/vanillabp/camunda8/liquibase/issue_26.yaml   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml
index ef56bbe..c98fd6b 100644
--- a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml
+++ b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml
@@ -2,7 +2,7 @@ databaseChangeLog:
   - changeSet:
       id: initial_setup.yaml
       author: stephanpelikan
-      dbms: "!oracle"
+      dbms: "h2"
       changes:
         - createTable:
             tableName: CAMUNDA8_RESOURCES
diff --git a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml
index f846ca2..abfa18e 100644
--- a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml
+++ b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml
@@ -2,7 +2,7 @@ databaseChangeLog:
   - changeSet:
       id: issue_26.yaml#initial_setup
       author: stephanpelikan
-      dbms: oracle
+      dbms: "!h2"
       changes:
         - createTable:
             tableName: CAMUNDA8_RESOURCES
@@ -108,7 +108,7 @@ databaseChangeLog:
   - changeSet:
       id: issue_26.yaml#rename_columns
       author: stephanpelikan
-      dbms: "!oracle"
+      dbms: h2
       changes:
         - renameColumn:
             tableName: CAMUNDA8_RESOURCES

From 7ffc561b6d4358cf7602781ed1b7e332aedccdc2 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Mon, 6 May 2024 14:34:57 +0200
Subject: [PATCH 05/15] Upgrade to Camunda Spring Zeebe 8.5.2

---
 spring-boot/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml
index 4f239c5..8a00392 100644
--- a/spring-boot/pom.xml
+++ b/spring-boot/pom.xml
@@ -13,7 +13,7 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spring.zeebe.version>8.5.0-rc2</spring.zeebe.version>
+    <spring.zeebe.version>8.5.2</spring.zeebe.version>
   </properties>
 
   <build>

From 5a05238f5d7506aa68e684ef0bc7a89ebb53d9a2 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Wed, 8 May 2024 11:35:29 +0200
Subject: [PATCH 06/15] Change column type to BIGINT

---
 .../io/vanillabp/camunda8/liquibase/issue_26.yaml         | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml
index abfa18e..dd7498f 100644
--- a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml
+++ b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml
@@ -162,3 +162,11 @@ databaseChangeLog:
             tableName: CAMUNDA8_DEPLOYMENTS
             oldColumnName: BPMN_PROCESS_ID
             newColumnName: C8D_BPMN_PROCESS_ID
+  - changeSet:
+      id: issue_26.yaml#change_type_of_definition_key
+      author: stephanpelikan
+      changes:
+        -  modifyDataType:
+             columnName:  C8D_DEFINITION_KEY
+             newDataType:  bigint
+             tableName:  CAMUNDA8_DEPLOYMENTS

From 31872bee17084b0a7f22f1fd27d9768e3a188d71 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Thu, 23 May 2024 12:59:31 +0200
Subject: [PATCH 07/15] Upgrade Spring Zeebe to 8.5.3

---
 spring-boot/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml
index 8a00392..e40afdf 100644
--- a/spring-boot/pom.xml
+++ b/spring-boot/pom.xml
@@ -13,7 +13,7 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spring.zeebe.version>8.5.2</spring.zeebe.version>
+    <spring.zeebe.version>8.5.3</spring.zeebe.version>
   </properties>
 
   <build>

From 5993aec80234627a6d23dea2d631321c315b1cf1 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Tue, 4 Jun 2024 13:35:49 +0200
Subject: [PATCH 08/15] Stop unintentionally auto-completing user-tasks in
 transactions

---
 spring-boot/pom.xml                           |   5 +
 .../Camunda8AdapterConfiguration.java         |  19 ++-
 .../service/Camunda8ProcessService.java       |  62 +++++---
 .../service/Camunda8TransactionAspect.java    | 133 ++++++++++++++++++
 .../service/Camunda8TransactionProcessor.java |  34 +++--
 ...otUsedCamunda8TransactionInterceptor.java} |  42 +++---
 .../camunda8/wiring/Camunda8TaskHandler.java  |  40 ++++--
 7 files changed, 262 insertions(+), 73 deletions(-)
 create mode 100644 spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
 rename spring-boot/src/main/java/io/vanillabp/camunda8/service/{Camunda8TransactionInterceptor.java => NotUsedCamunda8TransactionInterceptor.java} (78%)

diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml
index e40afdf..10a003a 100644
--- a/spring-boot/pom.xml
+++ b/spring-boot/pom.xml
@@ -47,6 +47,11 @@
       <artifactId>spring-tx</artifactId>
       <version>6.1.3</version>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-aop</artifactId>
+      <version>3.2.5</version><!-- see pom of spring-client-root -->
+    </dependency>
     <dependency>
       <groupId>jakarta.persistence</groupId>
       <artifactId>jakarta.persistence-api</artifactId>
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
index a1a89c4..033ed65 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
@@ -6,7 +6,7 @@
 import io.vanillabp.camunda8.deployment.DeploymentResourceRepository;
 import io.vanillabp.camunda8.deployment.DeploymentService;
 import io.vanillabp.camunda8.service.Camunda8ProcessService;
-import io.vanillabp.camunda8.service.Camunda8TransactionInterceptor;
+import io.vanillabp.camunda8.service.Camunda8TransactionAspect;
 import io.vanillabp.camunda8.service.Camunda8TransactionProcessor;
 import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type;
 import io.vanillabp.camunda8.wiring.Camunda8TaskHandler;
@@ -18,14 +18,14 @@
 import io.vanillabp.springboot.adapter.VanillaBpProperties;
 import io.vanillabp.springboot.parameters.MethodParameter;
 import jakarta.annotation.PostConstruct;
+import java.lang.reflect.Method;
+import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.beans.factory.config.BeanDefinition;
-import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
 import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
 import org.springframework.boot.autoconfigure.AutoConfigureBefore;
@@ -36,10 +36,6 @@
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Scope;
 import org.springframework.data.repository.CrudRepository;
-import org.springframework.transaction.interceptor.TransactionInterceptor;
-
-import java.lang.reflect.Method;
-import java.util.List;
 
 @AutoConfigurationPackage(basePackageClasses = Camunda8AdapterConfiguration.class)
 @AutoConfigureBefore(CamundaAutoConfiguration.class)
@@ -82,6 +78,13 @@ public void init() {
         
     }
 
+    @Bean
+    public Camunda8TransactionAspect camunda8TransactionAspect() {
+
+        return new Camunda8TransactionAspect(eventPublisher);
+
+    }
+
     @Override
     public String getAdapterId() {
         
@@ -193,6 +196,7 @@ public SpringBeanUtil vanillabpSpringBeanUtil(
     /*
      * https://www.tirasa.net/en/blog/dynamic-spring-s-transactional-2020-edition
      */
+    /*
     @Bean
     public static BeanFactoryPostProcessor camunda8TransactionInterceptorInjector() {
 
@@ -207,6 +211,7 @@ public static BeanFactoryPostProcessor camunda8TransactionInterceptorInjector()
         };
 
     }
+    */
 
     @Bean
     public Camunda8TransactionProcessor camunda8TransactionProcessor() {
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
index 80fd1a1..0c7eda7 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java
@@ -5,6 +5,11 @@
 import io.vanillabp.camunda8.Camunda8VanillaBpProperties;
 import io.vanillabp.springboot.adapter.AdapterAwareProcessService;
 import io.vanillabp.springboot.adapter.ProcessServiceImplementation;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationEventPublisher;
@@ -13,12 +18,6 @@
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
 @Transactional(propagation = Propagation.MANDATORY)
 public class Camunda8ProcessService<DE>
         implements ProcessServiceImplementation<DE> {
@@ -36,7 +35,7 @@ public class Camunda8ProcessService<DE>
     private final ApplicationEventPublisher publisher;
 
     private AdapterAwareProcessService<DE> parent;
-    
+
     private ZeebeClient client;
 
     public Camunda8ProcessService(
@@ -132,7 +131,8 @@ public DE startWorkflow(
                                 + "' failed!",
                                 e);
                     }
-                });
+                },
+                "startWorkflow");
         
     }
 
@@ -151,7 +151,8 @@ public DE correlateMessage(
                             workflowAggregate,
                             messageName,
                             correlationId.toString());
-                });
+                },
+                "correlateMessage");
 
     }
     
@@ -177,7 +178,8 @@ public DE correlateMessage(
                 attachedAggregate -> doCorrelateMessage(
                         attachedAggregate,
                         messageName,
-                        correlationId));
+                        correlationId),
+                "correlateMessage-by-correlationId");
 
     }
 
@@ -239,7 +241,8 @@ public DE completeTask(
                     logger.trace("Complete task '{}' of process '{}'",
                             taskId,
                             parent.getPrimaryBpmnProcessId());
-                });
+                },
+                "completeTask");
 
     }
     
@@ -248,7 +251,21 @@ public DE completeUserTask(
             final DE workflowAggregate,
             final String taskId) {
 
-        return completeTask(workflowAggregate, taskId);
+        return runInTransaction(
+                workflowAggregate,
+                taskId,
+                attachedAggregate -> {
+                    client
+                            .newCompleteCommand(Long.parseLong(taskId, 16))
+                            .variables(attachedAggregate)
+                            .send()
+                            .join();
+
+                    logger.trace("Complete user task '{}' of process '{}'",
+                            taskId,
+                            parent.getPrimaryBpmnProcessId());
+                },
+                "completeUserTask");
         
     }
     
@@ -271,7 +288,8 @@ public DE cancelTask(
                     logger.trace("Complete task '{}' of process '{}'",
                             taskId,
                             parent.getPrimaryBpmnProcessId());
-                });
+                },
+                "cancelTask");
 
     }
     
@@ -287,19 +305,22 @@ public DE cancelUserTask(
 
     private DE runInTransaction(
             final DE workflowAggregate,
-            final Consumer<DE> runnable) {
+            final Consumer<DE> runnable,
+            final String methodSignature) {
 
         return runInTransaction(
                 workflowAggregate,
                 null,
-                runnable);
+                runnable,
+                methodSignature);
 
     }
 
     private DE runInTransaction(
             final DE workflowAggregate,
             final String taskIdToTestForAlreadyCompletedOrCancelled,
-            final Consumer<DE> runnable) {
+            final Consumer<DE> runnable,
+            final String methodSignature) {
 
         // persist to get ID in case of @Id @GeneratedValue
         // or force optimistic locking exceptions before running
@@ -311,18 +332,19 @@ private DE runInTransaction(
             if (taskIdToTestForAlreadyCompletedOrCancelled != null) {
                 publisher.publishEvent(
                         new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
-                                Camunda8TransactionInterceptor.class,
+                                methodSignature,
                                 () -> client
                                         .newUpdateTimeoutCommand(Long.parseUnsignedLong(taskIdToTestForAlreadyCompletedOrCancelled, 16))
                                         .timeout(Duration.ofMinutes(10))
                                         .send()
                                         .join(5, TimeUnit.MINUTES), // needs to run synchronously
-                                () -> "update timeout (BPMN: " + parent.getPrimaryBpmnProcessId() + ")"));            }
+                                () -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId()));
+            }
             publisher.publishEvent(
                     new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
-                            Camunda8TransactionInterceptor.class,
+                            methodSignature,
                             () -> runnable.accept(attachedAggregate),
-                            () -> "complete command (BPMN: " + parent.getPrimaryBpmnProcessId() + ")"));
+                            () -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId()));
         } else {
             runnable.accept(attachedAggregate);
         }
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
new file mode 100644
index 0000000..ee072d6
--- /dev/null
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
@@ -0,0 +1,133 @@
+package io.vanillabp.camunda8.service;
+
+import io.vanillabp.spi.service.TaskException;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
+
+@Aspect
+public class Camunda8TransactionAspect {
+
+    private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionAspect.class);
+
+    public static class TaskHandlerActions {
+        public Supplier<Map.Entry<Runnable, Supplier<String>>> testForTaskAlreadyCompletedOrCancelledCommand;
+        public Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommand;
+        public Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommand;
+        public Supplier<Map.Entry<Runnable, Supplier<String>>> handlerCompletedCommand;
+    }
+
+    public static final ThreadLocal<TaskHandlerActions> actions = ThreadLocal.withInitial(TaskHandlerActions::new);
+
+    private final ApplicationEventPublisher publisher;
+
+    public Camunda8TransactionAspect(
+            final ApplicationEventPublisher publisher) {
+
+        this.publisher = publisher;
+
+    }
+
+    @Around("@annotation(io.vanillabp.spi.service.WorkflowTask)")
+    private Object checkForTransaction(
+            final ProceedingJoinPoint pjp) throws Throwable {
+
+        final var methodSignature = pjp.getSignature().toLongString();
+        
+        if (!TransactionSynchronizationManager.isActualTransactionActive()) {
+            clearCallbacks();
+            logger.trace("Disable TX callbacks for {}: No TX active", methodSignature);
+        }
+        try {
+
+            final var value = pjp.proceed();
+            if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+                final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
+                if (handlerTestCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                                    methodSignature,
+                                    handlerTestCommand.getKey(),
+                                    handlerTestCommand.getValue()));
+                }
+            }
+            if (actions.get().handlerCompletedCommand != null) {
+                final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get();
+                if (handlerCompletedCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                    methodSignature,
+                                    handlerCompletedCommand.getKey(),
+                                    handlerCompletedCommand.getValue()));
+                }
+            }
+            return value;
+
+        } catch (TaskException taskError) {
+
+            if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+                final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
+                if (handlerTestCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                                    methodSignature,
+                                    handlerTestCommand.getKey(),
+                                    handlerTestCommand.getValue()));
+                }
+            }
+            if (actions.get().bpmnErrorCommand != null) {
+                publisher.publishEvent(
+                        new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                methodSignature,
+                                () -> actions.get().bpmnErrorCommand.getKey().accept(taskError),
+                                () -> actions.get().bpmnErrorCommand.getValue().apply(taskError)));
+            }
+            return null;
+
+        } catch (Exception e) {
+
+            if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+                final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
+                if (handlerTestCommand != null) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
+                                    methodSignature,
+                                    handlerTestCommand.getKey(),
+                                    handlerTestCommand.getValue()));
+                }
+            }
+            if (actions.get().handlerFailedCommand != null) {
+                publisher.publishEvent(
+                        new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                methodSignature,
+                                () -> actions.get().handlerFailedCommand.getKey().accept(e),
+                                () -> actions.get().handlerFailedCommand.getValue().apply(e)));
+            }
+            throw e;
+
+        } finally {
+
+            clearCallbacks();
+
+        }
+
+    }
+
+    public static void clearCallbacks() {
+
+        actions.get().bpmnErrorCommand = null;
+        actions.get().handlerCompletedCommand = null;
+        actions.get().handlerFailedCommand = null;
+        actions.get().testForTaskAlreadyCompletedOrCancelledCommand = null;
+
+    }
+
+}
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
index 7475240..82a6b27 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
@@ -3,6 +3,10 @@
 import io.camunda.zeebe.client.api.command.ClientStatusException;
 import io.grpc.Status;
 import io.vanillabp.spi.service.TaskException;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationEvent;
@@ -11,22 +15,17 @@
 import org.springframework.transaction.interceptor.TransactionAspectSupport;
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
 public class Camunda8TransactionProcessor {
 
     private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionProcessor.class);
 
     public static void registerCallbacks(
-            final Map.Entry<Runnable, Supplier<String>> testForTaskAlreadyCompletedOrCancelledCommand,
+            final Supplier<Map.Entry<Runnable, Supplier<String>>> testForTaskAlreadyCompletedOrCancelledCommand,
             final Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommand,
             final Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommand,
-            final Map.Entry<Runnable, Supplier<String>> handlerCompletedCommand) {
+            final Supplier<Map.Entry<Runnable, Supplier<String>>> handlerCompletedCommand) {
 
-        final var actions = Camunda8TransactionInterceptor.actions.get();
+        final var actions = Camunda8TransactionAspect.actions.get();
         actions.testForTaskAlreadyCompletedOrCancelledCommand = testForTaskAlreadyCompletedOrCancelledCommand;
         actions.bpmnErrorCommand = bpmnErrorCommand;
         actions.handlerFailedCommand = handlerFailedCommand;
@@ -36,7 +35,7 @@ public static void registerCallbacks(
 
     public static Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommandCallback() {
 
-        return Camunda8TransactionInterceptor
+        return Camunda8TransactionAspect
                 .actions
                 .get()
                 .bpmnErrorCommand;
@@ -45,7 +44,7 @@ public static Map.Entry<Consumer<TaskException>, Function<TaskException, String>
 
     public static Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommandCallback() {
 
-        return Camunda8TransactionInterceptor
+        return Camunda8TransactionAspect
                 .actions
                 .get()
                 .handlerFailedCommand;
@@ -54,16 +53,17 @@ public static Map.Entry<Consumer<Exception>, Function<Exception, String>> handle
 
     public static Map.Entry<Runnable, Supplier<String>> handlerCompletedCommandCallback() {
 
-        return Camunda8TransactionInterceptor
+        return Camunda8TransactionAspect
                 .actions
                 .get()
-                .handlerCompletedCommand;
+                .handlerCompletedCommand
+                .get();
 
     }
 
     public static void unregisterCallbacks() {
 
-        final var actions = Camunda8TransactionInterceptor.actions.get();
+        final var actions = Camunda8TransactionAspect.actions.get();
         actions.testForTaskAlreadyCompletedOrCancelledCommand = null;
         actions.bpmnErrorCommand = null;
         actions.handlerFailedCommand = null;
@@ -104,6 +104,9 @@ public void processPreCommit(
             final Camunda8TestForTaskAlreadyCompletedOrCancelled event) {
 
         try {
+            logger.trace("Will test for existence of task '{}' initiated by: {}",
+                    event.description.get(),
+                    event.getSource());
             // this runnable will test whether the task still exists
             event.runnable.run();
         } catch (Exception e) {
@@ -133,11 +136,14 @@ public void processPostCommit(
             final Camunda8CommandAfterTx event) {
 
         try {
+            logger.trace("Will execute Camunda command for '{}' initiated by: {}",
+                    event.description.get(),
+                    event.getSource());
             // this runnable will instruct Zeebe
             event.runnable.run();
         } catch (Exception e) {
             logger.error(
-                    "Could not execute '{}'! Manual action required!",
+                    "Could not execute camunda command for '{}'! Manual action required!",
                     event.description.get(),
                     e);
         }
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/NotUsedCamunda8TransactionInterceptor.java
similarity index 78%
rename from spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java
rename to spring-boot/src/main/java/io/vanillabp/camunda8/service/NotUsedCamunda8TransactionInterceptor.java
index d4ff928..e5be06a 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/NotUsedCamunda8TransactionInterceptor.java
@@ -1,25 +1,24 @@
 package io.vanillabp.camunda8.service;
 
 import io.vanillabp.spi.service.TaskException;
-import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.transaction.TransactionManager;
-import org.springframework.transaction.interceptor.TransactionAttributeSource;
-import org.springframework.transaction.interceptor.TransactionInterceptor;
-import org.springframework.transaction.support.TransactionSynchronizationManager;
-
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.transaction.TransactionManager;
+import org.springframework.transaction.interceptor.TransactionAttributeSource;
+import org.springframework.transaction.interceptor.TransactionInterceptor;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
 
-public class Camunda8TransactionInterceptor extends TransactionInterceptor {
+public class NotUsedCamunda8TransactionInterceptor extends TransactionInterceptor {
 
     private final ApplicationEventPublisher publisher;
 
     public static final ThreadLocal<TaskHandlerActions> actions = ThreadLocal.withInitial(TaskHandlerActions::new);
 
-    public Camunda8TransactionInterceptor(
+    public NotUsedCamunda8TransactionInterceptor(
             final TransactionManager ptm,
             final TransactionAttributeSource tas,
             final ApplicationEventPublisher publisher) {
@@ -31,7 +30,7 @@ public static class TaskHandlerActions {
         public Map.Entry<Runnable, Supplier<String>> testForTaskAlreadyCompletedOrCancelledCommand;
         public Map.Entry<Consumer<TaskException>, Function<TaskException, String>> bpmnErrorCommand;
         public Map.Entry<Consumer<Exception>, Function<Exception, String>> handlerFailedCommand;
-        public Map.Entry<Runnable, Supplier<String>> handlerCompletedCommand;
+        public Supplier<Map.Entry<Runnable, Supplier<String>>> handlerCompletedCommand;
     }
 
     @Override
@@ -45,34 +44,39 @@ protected Object invokeWithinTransaction(
                 return invocation.proceedWithInvocation();
             }
             try {
+                logger.info("Before TX");
                 final var result = invocation.proceedWithInvocation();
+                logger.info("After TX");
                 if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
                     publisher.publishEvent(
                             new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
-                                    Camunda8TransactionInterceptor.class,
+                                    NotUsedCamunda8TransactionInterceptor.class,
                                     actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(),
                                     actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue()));
                 }
                 if (actions.get().handlerCompletedCommand != null) {
-                    publisher.publishEvent(
-                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
-                                    Camunda8TransactionInterceptor.class,
-                                    actions.get().handlerCompletedCommand.getKey(),
-                                    actions.get().handlerCompletedCommand.getValue()));
+                    final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get();
+                    if (handlerCompletedCommand != null) {
+                        publisher.publishEvent(
+                                new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                        NotUsedCamunda8TransactionInterceptor.class,
+                                        handlerCompletedCommand.getKey(),
+                                        handlerCompletedCommand.getValue()));
+                    }
                 }
                 return result;
             } catch (TaskException taskError) {
                 if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
                     publisher.publishEvent(
                             new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
-                                    Camunda8TransactionInterceptor.class,
+                                    NotUsedCamunda8TransactionInterceptor.class,
                                     actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(),
                                     actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue()));
                 }
                 if (actions.get().bpmnErrorCommand != null) {
                     publisher.publishEvent(
                             new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
-                                    Camunda8TransactionInterceptor.class,
+                                    NotUsedCamunda8TransactionInterceptor.class,
                                     () -> actions.get().bpmnErrorCommand.getKey().accept(taskError),
                                     () -> actions.get().bpmnErrorCommand.getValue().apply(taskError)));
                 }
@@ -81,14 +85,14 @@ protected Object invokeWithinTransaction(
                 if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
                     publisher.publishEvent(
                             new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
-                                    Camunda8TransactionInterceptor.class,
+                                    NotUsedCamunda8TransactionInterceptor.class,
                                     actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(),
                                     actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue()));
                 }
                 if (actions.get().handlerFailedCommand != null) {
                     publisher.publishEvent(
                             new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
-                                    Camunda8TransactionInterceptor.class,
+                                    NotUsedCamunda8TransactionInterceptor.class,
                                     () -> actions.get().handlerFailedCommand.getKey().accept(e),
                                     () -> actions.get().handlerFailedCommand.getValue().apply(e)));
                 }
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
index 848d27f..1904f16 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
@@ -14,10 +14,6 @@
 import io.vanillabp.springboot.adapter.TaskHandlerBase;
 import io.vanillabp.springboot.adapter.wiring.WorkflowAggregateCache;
 import io.vanillabp.springboot.parameters.MethodParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.data.repository.CrudRepository;
-
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.List;
@@ -27,6 +23,9 @@
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.repository.CrudRepository;
 
 public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler, Consumer<ZeebeClient> {
 
@@ -88,11 +87,29 @@ public void handle(
             final var taskIdRetrieved = new AtomicBoolean(false);
             final var workflowAggregateCache = new WorkflowAggregateCache();
 
+            // Any callback used in this method is executed in case of no active transaction.
+            // In case of an active transaction the callbacks are used by the Camunda8TransactionInterceptor.
             Camunda8TransactionProcessor.registerCallbacks(
-                    doTestForTaskWasCompletedOrCancelled(job),
+                    () -> {
+                        if (taskType == Type.USERTASK) {
+                            return null;
+                        }
+                        if (taskIdRetrieved.get()) { // async processing of service-task
+                            return null;
+                        }
+                        return doTestForTaskWasCompletedOrCancelled(job);
+                    },
                     doThrowError(client, job, workflowAggregateCache),
                     doFailed(client, job),
-                    doComplete(client, job, workflowAggregateCache));
+                    () -> {
+                        if (taskType == Type.USERTASK) {
+                            return null;
+                        }
+                        if (taskIdRetrieved.get()) { // async processing of service-task
+                            return null;
+                        }
+                        return doComplete(client, job, workflowAggregateCache);
+                    });
 
             final Function<String, Object> multiInstanceSupplier
                     = multiInstanceVariable -> getVariable(job, multiInstanceVariable);
@@ -140,13 +157,10 @@ public void handle(
                                 return workflowAggregateCache.workflowAggregate;
                             }, multiInstanceSupplier));
 
-            if ((taskType != Type.USERTASK)
-                    && !taskIdRetrieved.get()) {
-                final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback();
-                if (callback != null) {
-                    jobPostAction = callback.getKey();
-                    description = callback.getValue();
-                }
+            final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback();
+            if (callback != null) {
+                jobPostAction = callback.getKey();
+                description = callback.getValue();
             }
         } catch (TaskException bpmnError) {
             final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback();

From 950a7d17dda9b16f5aba54555d7dc02822c5ff4d Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Fri, 7 Jun 2024 10:23:32 +0200
Subject: [PATCH 09/15] Upgrade Spring Zeebe to 8.5.4

---
 spring-boot/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml
index 10a003a..a990186 100644
--- a/spring-boot/pom.xml
+++ b/spring-boot/pom.xml
@@ -13,7 +13,7 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spring.zeebe.version>8.5.3</spring.zeebe.version>
+    <spring.zeebe.version>8.5.4</spring.zeebe.version>
   </properties>
 
   <build>

From f933a217891ce6c2625b84cf1f05316bdddf9c84 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Fri, 7 Jun 2024 16:18:54 +0200
Subject: [PATCH 10/15] Fix wrong definition of tenant-id property

---
 .../camunda8/Camunda8VanillaBpProperties.java      | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java
index 26e459b..e5b03de 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java
@@ -39,8 +39,8 @@ public String getTenantId(
         if (!configuration.isUseTenants()) {
             return null;
         }
-        if (StringUtils.hasText(configuration.getTenant())) {
-            return configuration.getTenant();
+        if (StringUtils.hasText(configuration.getTenantId())) {
+            return configuration.getTenantId();
         }
         return workflowModuleId;
 
@@ -101,7 +101,7 @@ public static class AdapterConfiguration extends WorkerProperties {
 
         private boolean useTenants = true;
 
-        private String tenant;
+        private String tenantId;
 
         public boolean isUseTenants() {
             return useTenants;
@@ -111,12 +111,12 @@ public void setUseTenants(boolean useTenants) {
             this.useTenants = useTenants;
         }
 
-        public String getTenant() {
-            return tenant;
+        public String getTenantId() {
+            return tenantId;
         }
 
-        public void setTenant(String tenant) {
-            this.tenant = tenant;
+        public void setTenantId(String tenantId) {
+            this.tenantId = tenantId;
         }
 
     }

From 378ef1866bdae53a09f9c9638ce888f6203b8525 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Mon, 10 Jun 2024 15:33:39 +0200
Subject: [PATCH 11/15] Fix NPE due to different execution times

---
 .../camunda8/service/Camunda8TransactionAspect.java  | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
index ee072d6..5a25645 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
@@ -84,11 +84,13 @@ private Object checkForTransaction(
                 }
             }
             if (actions.get().bpmnErrorCommand != null) {
+                final var runnable = actions.get().handlerFailedCommand.getKey();
+                final var description = actions.get().handlerFailedCommand.getValue();
                 publisher.publishEvent(
                         new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
                                 methodSignature,
-                                () -> actions.get().bpmnErrorCommand.getKey().accept(taskError),
-                                () -> actions.get().bpmnErrorCommand.getValue().apply(taskError)));
+                                () -> runnable.accept(taskError),
+                                () -> description.apply(taskError)));
             }
             return null;
 
@@ -105,11 +107,13 @@ private Object checkForTransaction(
                 }
             }
             if (actions.get().handlerFailedCommand != null) {
+                final var runnable = actions.get().handlerFailedCommand.getKey();
+                final var description = actions.get().handlerFailedCommand.getValue();
                 publisher.publishEvent(
                         new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
                                 methodSignature,
-                                () -> actions.get().handlerFailedCommand.getKey().accept(e),
-                                () -> actions.get().handlerFailedCommand.getValue().apply(e)));
+                                () -> runnable.accept(e),
+                                () -> description.apply(e)));
             }
             throw e;
 

From 7bd651caaf7d265538c6580d4cfe4e9dbfe477a8 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Fri, 14 Jun 2024 08:14:17 +0200
Subject: [PATCH 12/15] Fix wrong TX command

---
 .../vanillabp/camunda8/service/Camunda8TransactionAspect.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
index 5a25645..4ba5a73 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
@@ -84,8 +84,8 @@ private Object checkForTransaction(
                 }
             }
             if (actions.get().bpmnErrorCommand != null) {
-                final var runnable = actions.get().handlerFailedCommand.getKey();
-                final var description = actions.get().handlerFailedCommand.getValue();
+                final var runnable = actions.get().bpmnErrorCommand.getKey();
+                final var description = actions.get().bpmnErrorCommand.getValue();
                 publisher.publishEvent(
                         new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
                                 methodSignature,

From da0cbe5a496c6e40e4d81460c0bea566b2ed320f Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Tue, 18 Jun 2024 14:47:21 +0200
Subject: [PATCH 13/15] Load and save aggregate as part of an active
 transaction

---
 .../service/Camunda8TransactionAspect.java    | 160 +++++++++++++---
 .../service/Camunda8TransactionProcessor.java |   6 +-
 .../camunda8/wiring/Camunda8TaskHandler.java  | 173 ++++++++++++------
 3 files changed, 246 insertions(+), 93 deletions(-)

diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
index 4ba5a73..b5ffa45 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java
@@ -25,8 +25,17 @@ public static class TaskHandlerActions {
         public Supplier<Map.Entry<Runnable, Supplier<String>>> handlerCompletedCommand;
     }
 
+    public static class RunDeferredInTransaction {
+        public RunDeferredInTransactionSupplier[] argsSupplier;
+        public Runnable saveAggregateAfterWorkflowTask;
+    }
+
+    public interface RunDeferredInTransactionSupplier extends Supplier<Object> { }
+
     public static final ThreadLocal<TaskHandlerActions> actions = ThreadLocal.withInitial(TaskHandlerActions::new);
 
+    public static final ThreadLocal<RunDeferredInTransaction> runDeferredInTransaction = ThreadLocal.withInitial(RunDeferredInTransaction::new);
+
     private final ApplicationEventPublisher publisher;
 
     public Camunda8TransactionAspect(
@@ -36,20 +45,44 @@ public Camunda8TransactionAspect(
 
     }
 
+    public static void registerDeferredInTransaction(
+            final RunDeferredInTransactionSupplier[] argsSupplier,
+            final Runnable saveAggregateAfterWorkflowTask) {
+
+        runDeferredInTransaction.get().argsSupplier = argsSupplier;
+        runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = saveAggregateAfterWorkflowTask;
+
+    }
+
+    public static void unregisterDeferredInTransaction() {
+
+        runDeferredInTransaction.get().argsSupplier = null;
+        runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = null;
+
+    }
+
+    private void saveWorkflowAggregate() {
+
+        runDeferredInTransaction.get().saveAggregateAfterWorkflowTask.run();
+
+    }
+
     @Around("@annotation(io.vanillabp.spi.service.WorkflowTask)")
     private Object checkForTransaction(
             final ProceedingJoinPoint pjp) throws Throwable {
 
         final var methodSignature = pjp.getSignature().toLongString();
-        
-        if (!TransactionSynchronizationManager.isActualTransactionActive()) {
-            clearCallbacks();
-            logger.trace("Disable TX callbacks for {}: No TX active", methodSignature);
-        }
+
+        final var isTxActive = TransactionSynchronizationManager.isActualTransactionActive();
+
         try {
 
-            final var value = pjp.proceed();
-            if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+            final var newArgs = runDeferredInTransactionArgsSupplier(pjp.getArgs());
+            final var value = pjp.proceed(newArgs); // run @WorkflowTask annotated method
+            saveWorkflowAggregate();
+
+            if (isTxActive
+                    && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) {
                 final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
                 if (handlerTestCommand != null) {
                     publisher.publishEvent(
@@ -62,18 +95,37 @@ private Object checkForTransaction(
             if (actions.get().handlerCompletedCommand != null) {
                 final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get();
                 if (handlerCompletedCommand != null) {
-                    publisher.publishEvent(
-                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
-                                    methodSignature,
-                                    handlerCompletedCommand.getKey(),
-                                    handlerCompletedCommand.getValue()));
+                    if (isTxActive) {
+                        publisher.publishEvent(
+                                new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                        methodSignature,
+                                        handlerCompletedCommand.getKey(),
+                                        handlerCompletedCommand.getValue()));
+                    } else {
+                        try {
+                            handlerCompletedCommand.getKey().run();
+                        } catch (Exception e) {
+                            final var description = handlerCompletedCommand.getValue();
+                            if (description != null) {
+                                logger.error(
+                                        "Could not execute '{}'! Manual action required!",
+                                        description.get(),
+                                        e);
+                            } else {
+                                logger.error(
+                                        "Manual action required due to:",
+                                        e);
+                            }
+                        }
+                    }
                 }
             }
             return value;
 
         } catch (TaskException taskError) {
 
-            if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+            if (isTxActive
+                    && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) {
                 final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
                 if (handlerTestCommand != null) {
                     publisher.publishEvent(
@@ -86,17 +138,35 @@ private Object checkForTransaction(
             if (actions.get().bpmnErrorCommand != null) {
                 final var runnable = actions.get().bpmnErrorCommand.getKey();
                 final var description = actions.get().bpmnErrorCommand.getValue();
-                publisher.publishEvent(
-                        new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
-                                methodSignature,
-                                () -> runnable.accept(taskError),
-                                () -> description.apply(taskError)));
+                if (isTxActive) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                    methodSignature,
+                                    () -> runnable.accept(taskError),
+                                    () -> description.apply(taskError)));
+                } else {
+                    try {
+                        runnable.accept(taskError);
+                    } catch (Exception e) {
+                        if (description != null) {
+                            logger.error(
+                                    "Could not execute '{}'! Manual action required!",
+                                    description.apply(taskError),
+                                    e);
+                        } else {
+                            logger.error(
+                                    "Manual action required due to:",
+                                    e);
+                        }
+                    }
+                }
             }
             return null;
 
         } catch (Exception e) {
 
-            if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
+            if (isTxActive
+                    && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) {
                 final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
                 if (handlerTestCommand != null) {
                     publisher.publishEvent(
@@ -109,18 +179,31 @@ private Object checkForTransaction(
             if (actions.get().handlerFailedCommand != null) {
                 final var runnable = actions.get().handlerFailedCommand.getKey();
                 final var description = actions.get().handlerFailedCommand.getValue();
-                publisher.publishEvent(
-                        new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
-                                methodSignature,
-                                () -> runnable.accept(e),
-                                () -> description.apply(e)));
+                if (isTxActive) {
+                    publisher.publishEvent(
+                            new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
+                                    methodSignature,
+                                    () -> runnable.accept(e),
+                                    () -> description.apply(e)));
+                } else {
+                    try {
+                        runnable.accept(e);
+                    } catch (Exception ie) {
+                        if (description != null) {
+                            logger.error(
+                                    "Could not execute '{}'! Manual action required!",
+                                    description.apply(e),
+                                    ie);
+                        } else {
+                            logger.error(
+                                    "Manual action required due to:",
+                                    ie);
+                        }
+                    }
+                }
             }
             throw e;
 
-        } finally {
-
-            clearCallbacks();
-
         }
 
     }
@@ -134,4 +217,25 @@ public static void clearCallbacks() {
 
     }
 
+    private Object[] runDeferredInTransactionArgsSupplier(
+            final Object[] originalArgs) {
+
+        if (originalArgs == null) {
+            return null;
+        }
+
+        final var newArgs = new Object[ originalArgs.length ];
+        for (var i = 0; i < originalArgs.length; ++i) {
+            final var supplier = runDeferredInTransaction.get().argsSupplier[i];
+            if (supplier != null) {
+                newArgs[i] = supplier.get();
+            } else {
+                newArgs[i] = originalArgs[i];
+            }
+        }
+
+        return newArgs;
+
+    }
+
 }
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
index 82a6b27..cf19ec4 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
@@ -63,11 +63,7 @@ public static Map.Entry<Runnable, Supplier<String>> handlerCompletedCommandCallb
 
     public static void unregisterCallbacks() {
 
-        final var actions = Camunda8TransactionAspect.actions.get();
-        actions.testForTaskAlreadyCompletedOrCancelledCommand = null;
-        actions.bpmnErrorCommand = null;
-        actions.handlerFailedCommand = null;
-        actions.handlerCompletedCommand = null;
+        Camunda8TransactionAspect.clearCallbacks();
 
     }
 
diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
index 1904f16..57f7b1c 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java
@@ -4,18 +4,23 @@
 import io.camunda.zeebe.client.api.response.ActivatedJob;
 import io.camunda.zeebe.client.api.worker.JobClient;
 import io.camunda.zeebe.client.api.worker.JobHandler;
+import io.vanillabp.camunda8.service.Camunda8TransactionAspect;
 import io.vanillabp.camunda8.service.Camunda8TransactionProcessor;
 import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type;
 import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceIndexMethodParameter;
 import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceTotalMethodParameter;
+import io.vanillabp.spi.service.MultiInstanceElementResolver;
 import io.vanillabp.spi.service.TaskEvent;
 import io.vanillabp.spi.service.TaskException;
 import io.vanillabp.springboot.adapter.MultiInstance;
 import io.vanillabp.springboot.adapter.TaskHandlerBase;
 import io.vanillabp.springboot.adapter.wiring.WorkflowAggregateCache;
 import io.vanillabp.springboot.parameters.MethodParameter;
+import io.vanillabp.springboot.parameters.ResolverBasedMultiInstanceMethodParameter;
+import io.vanillabp.springboot.parameters.WorkflowAggregateMethodParameter;
 import java.lang.reflect.Method;
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -61,19 +66,17 @@ public void accept(
 
     @Override
     protected Logger getLogger() {
-        
+
         return logger;
-        
+
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void handle(
             final JobClient client,
-            final ActivatedJob job) {
+            final ActivatedJob job) throws Exception {
 
-        Runnable jobPostAction = null;
-        Supplier<String> description = null;
         try {
             final var businessKey = getVariable(job, idPropertyName);
 
@@ -87,6 +90,10 @@ public void handle(
             final var taskIdRetrieved = new AtomicBoolean(false);
             final var workflowAggregateCache = new WorkflowAggregateCache();
 
+            Camunda8TransactionAspect.registerDeferredInTransaction(
+                    new Camunda8TransactionAspect.RunDeferredInTransactionSupplier[parameters.size()],
+                    saveAggregateAfterWorkflowTask(workflowAggregateCache));
+
             // Any callback used in this method is executed in case of no active transaction.
             // In case of an active transaction the callbacks are used by the Camunda8TransactionInterceptor.
             Camunda8TransactionProcessor.registerCallbacks(
@@ -97,7 +104,7 @@ public void handle(
                         if (taskIdRetrieved.get()) { // async processing of service-task
                             return null;
                         }
-                        return doTestForTaskWasCompletedOrCancelled(job);
+                        return testForTaskWasCompletedOrCancelled(job);
                     },
                     doThrowError(client, job, workflowAggregateCache),
                     doFailed(client, job),
@@ -117,7 +124,7 @@ public void handle(
             super.execute(
                     workflowAggregateCache,
                     businessKey,
-                    true,
+                    false, // will be done within transaction boundaries
                     (args, param) -> processTaskParameter(
                             args,
                             param,
@@ -157,43 +164,9 @@ public void handle(
                                 return workflowAggregateCache.workflowAggregate;
                             }, multiInstanceSupplier));
 
-            final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback();
-            if (callback != null) {
-                jobPostAction = callback.getKey();
-                description = callback.getValue();
-            }
-        } catch (TaskException bpmnError) {
-            final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback();
-            if (callback != null) {
-                jobPostAction = () -> callback.getKey().accept(bpmnError);
-                description = () -> callback.getValue().apply(bpmnError);
-            }
-        } catch (Exception e) {
-            final var callback = Camunda8TransactionProcessor.handlerFailedCommandCallback();
-            if (callback != null) {
-                logger.error("Failed to execute job '{}'", job.getKey(), e);
-                jobPostAction = () -> callback.getKey().accept(e);
-                description = () -> callback.getValue().apply(e);
-            }
         } finally {
             Camunda8TransactionProcessor.unregisterCallbacks();
-        }
-
-        if (jobPostAction != null) {
-            try {
-                jobPostAction.run();
-            } catch (Exception e) {
-                if (description != null) {
-                    logger.error(
-                            "Could not execute '{}'! Manual action required!",
-                            description.get(),
-                            e);
-                } else {
-                    logger.error(
-                            "Manual action required due to:",
-                            e);
-                }
-            }
+            Camunda8TransactionAspect.unregisterDeferredInTransaction();
         }
 
     }
@@ -205,53 +178,64 @@ protected Object getMultiInstanceElement(
 
         return multiInstanceSupplier
                 .apply(name);
-        
+
     }
-    
+
     @Override
     protected Integer getMultiInstanceIndex(
             final String name,
             final Function<String, Object> multiInstanceSupplier) {
-        
+
         return (Integer) multiInstanceSupplier
                 .apply(name + Camunda8MultiInstanceIndexMethodParameter.SUFFIX) - 1;
-        
+
     }
-    
+
     @Override
     protected Integer getMultiInstanceTotal(
             final String name,
             final Function<String, Object> multiInstanceSupplier) {
-        
+
         return (Integer) multiInstanceSupplier
                 .apply(name + Camunda8MultiInstanceTotalMethodParameter.SUFFIX);
-    
+
     }
-    
+
     @Override
     protected MultiInstance<Object> getMultiInstance(
             final String name,
             final Function<String, Object> multiInstanceSupplier) {
-        
+
         return new MultiInstance<Object>(
                 getMultiInstanceElement(name, multiInstanceSupplier),
                 getMultiInstanceTotal(name, multiInstanceSupplier),
                 getMultiInstanceIndex(name, multiInstanceSupplier));
-        
+
     }
-    
+
     private Object getVariable(
             final ActivatedJob job,
             final String name) {
-        
+
         return job
                 .getVariablesAsMap()
                 .get(name);
-        
+
     }
 
-    @SuppressWarnings("unchecked")
-    public Map.Entry<Runnable, Supplier<String>> doTestForTaskWasCompletedOrCancelled(
+    public Runnable saveAggregateAfterWorkflowTask(
+            final WorkflowAggregateCache aggregateCache) {
+
+        return () -> {
+                if (aggregateCache.workflowAggregate != null) {
+                    workflowAggregateRepository.save(aggregateCache.workflowAggregate);
+                }
+            };
+
+    }
+
+        @SuppressWarnings("unchecked")
+    public Map.Entry<Runnable, Supplier<String>> testForTaskWasCompletedOrCancelled(
             final ActivatedJob job) {
 
         return Map.entry(
@@ -259,7 +243,8 @@ public Map.Entry<Runnable, Supplier<String>> doTestForTaskWasCompletedOrCancelle
                         .newUpdateTimeoutCommand(job)
                         .timeout(Duration.ofMinutes(10))
                         .send()
-                        .join(5, TimeUnit.MINUTES), // needs to run synchronously
+                        .join(5, TimeUnit.MINUTES)
+                , // needs to run synchronously
                 () -> "update timeout (BPMN: " + job.getBpmnProcessId()
                         + "; Element: " + job.getElementId()
                         + "; Task-Definition: " + job.getType()
@@ -317,7 +302,9 @@ private Map.Entry<Consumer<TaskException>, Function<TaskException, String>> doTh
 
                     throwErrorCommand
                             .send()
-                            .exceptionally(t -> { throw new RuntimeException("error", t); });
+                            .exceptionally(t -> {
+                                throw new RuntimeException("error", t);
+                            });
                 },
                 taskException -> "throw error command (BPMN: " + job.getBpmnProcessId()
                         + "; Element: " + job.getElementId()
@@ -326,7 +313,7 @@ private Map.Entry<Consumer<TaskException>, Function<TaskException, String>> doTh
                         + "; Job: " + job.getKey()
                         + ")");
     }
-    
+
     @SuppressWarnings("unchecked")
     private Map.Entry<Consumer<Exception>, Function<Exception, String>> doFailed(
             final JobClient jobClient,
@@ -352,4 +339,70 @@ private Map.Entry<Consumer<Exception>, Function<Exception, String>> doFailed(
 
     }
 
+    protected boolean processWorkflowAggregateParameter(
+            final Object[] args,
+            final MethodParameter param,
+            final WorkflowAggregateCache workflowAggregateCache,
+            final Object workflowAggregateId) {
+
+        if (!(param instanceof WorkflowAggregateMethodParameter)) {
+            return true;
+        }
+
+        Camunda8TransactionAspect.runDeferredInTransaction.get().argsSupplier[param.getIndex()] = () -> {
+            // Using findById is required to get an object instead of a Hibernate proxy.
+            // Otherwise for e.g. Camunda8 connector JSON serialization of the
+            // workflow aggregate is not possible.
+            workflowAggregateCache.workflowAggregate = workflowAggregateRepository
+                    .findById(workflowAggregateId)
+                    .orElse(null);
+            return workflowAggregateCache.workflowAggregate;
+        };
+
+        args[param.getIndex()] = null; // will be set by deferred execution of supplier
+
+        return false;
+
+    }
+
+    protected boolean processMultiInstanceResolverParameter(
+            final Object[] args,
+            final MethodParameter param,
+            final Supplier<Object> workflowAggregate,
+            final Function<String, Object> multiInstanceSupplier) {
+
+        if (!(param instanceof ResolverBasedMultiInstanceMethodParameter)) {
+            return true;
+        }
+
+        @SuppressWarnings("unchecked")
+        final var resolver =
+                (MultiInstanceElementResolver<Object, Object>)
+                        ((ResolverBasedMultiInstanceMethodParameter) param).getResolverBean();
+
+        final var multiInstances = new HashMap<String, MultiInstanceElementResolver.MultiInstance<Object>>();
+
+        resolver
+                .getNames()
+                .forEach(name -> multiInstances.put(name, getMultiInstance(name, multiInstanceSupplier)));
+
+        Camunda8TransactionAspect.runDeferredInTransaction.get().argsSupplier[param.getIndex()] =  () -> {
+                try {
+                    return resolver.resolve(workflowAggregate.get(), multiInstances);
+                } catch (Exception e) {
+                    throw new RuntimeException(
+                            "Failed processing MultiInstanceElementResolver for parameter '"
+                                    + param.getParameter()
+                                    + "' of method '"
+                                    + method
+                                    + "'", e);
+                }
+            };
+
+        args[param.getIndex()] = null; // will be set by deferred execution of supplier
+
+        return false;
+
+    }
+
 }

From cba5dea40473ee0f087c9e98d32b16361156024a Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Wed, 19 Jun 2024 14:00:30 +0200
Subject: [PATCH 14/15] Try to run C8 aspect within TX aspect

---
 .../io/vanillabp/camunda8/Camunda8AdapterConfiguration.java    | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
index 033ed65..c947d8d 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java
@@ -35,6 +35,8 @@
 import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Scope;
+import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
 import org.springframework.data.repository.CrudRepository;
 
 @AutoConfigurationPackage(basePackageClasses = Camunda8AdapterConfiguration.class)
@@ -79,6 +81,7 @@ public void init() {
     }
 
     @Bean
+    @Order(Ordered.LOWEST_PRECEDENCE)
     public Camunda8TransactionAspect camunda8TransactionAspect() {
 
         return new Camunda8TransactionAspect(eventPublisher);

From 195d677965fb799f55fb54a6d345882a67332380 Mon Sep 17 00:00:00 2001
From: Stephan Pelikan <stephan.pelikan@phactum.at>
Date: Thu, 20 Jun 2024 09:04:06 +0200
Subject: [PATCH 15/15] Enforce rollback in case of testing for task fails

---
 .../service/Camunda8TransactionProcessor.java  | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
index cf19ec4..56b2321 100644
--- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
+++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java
@@ -12,8 +12,6 @@
 import org.springframework.context.ApplicationEvent;
 import org.springframework.transaction.event.TransactionPhase;
 import org.springframework.transaction.event.TransactionalEventListener;
-import org.springframework.transaction.interceptor.TransactionAspectSupport;
-import org.springframework.transaction.support.TransactionSynchronizationManager;
 
 public class Camunda8TransactionProcessor {
 
@@ -109,17 +107,13 @@ public void processPreCommit(
             // if the task is completed or cancelled, then the tx is rolled back
             if ((e instanceof ClientStatusException clientStatusException)
                     && (clientStatusException.getStatus().getCode() == Status.NOT_FOUND.getCode())) {
-                logger.warn(
-                        "Will rollback because job was already completed/cancelled! Tested with command '{}‘ giving status 'NOT_FOUND'",
-                        event.description.get());
+                throw new RuntimeException(
+                        "Will rollback because job was already completed/cancelled! Test-command giving status 'NOT_FOUND':\n"
+                        + event.description.get());
             } else {
-                logger.warn(
-                        "Will rollback because testing for job '{}' failed!",
-                        event.description.get(),
-                        e);
-            }
-            if (TransactionSynchronizationManager.isActualTransactionActive()) {
-                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
+                throw new RuntimeException(
+                        "Will rollback because testing for job '{}' failed! Test-command:\n"
+                                + event.description.get());
             }
         }