From 1ab8f5e22c37cd3c69804150acadf04122e48e53 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Tue, 16 Apr 2024 07:16:48 +0200 Subject: [PATCH 01/15] Introduce transaction interceptor --- spring-boot/pom.xml | 8 +- .../Camunda8AdapterConfiguration.java | 42 +++- .../service/Camunda8ProcessService.java | 208 +++++++++++------ .../Camunda8TransactionInterceptor.java | 106 +++++++++ .../service/Camunda8TransactionProcessor.java | 147 ++++++++++++ .../camunda8/wiring/Camunda8TaskHandler.java | 221 +++++++++++++----- .../camunda8/wiring/Camunda8TaskWiring.java | 10 +- .../wiring/Camunda8UserTaskHandler.java | 2 +- 8 files changed, 594 insertions(+), 150 deletions(-) create mode 100644 spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java create mode 100644 spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml index 94d4aa6..9b58fa0 100644 --- a/spring-boot/pom.xml +++ b/spring-boot/pom.xml @@ -13,7 +13,7 @@ UTF-8 - 8.3.4.2 + 8.5.0-rc2 @@ -38,14 +38,14 @@ 1.1.1 - io.camunda - spring-zeebe-starter + io.camunda.spring + spring-boot-starter-camunda ${spring.zeebe.version} org.springframework spring-tx - 5.3.23 + 6.1.3 jakarta.persistence diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java index 43b280c..a1a89c4 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java @@ -1,12 +1,13 @@ package io.vanillabp.camunda8; import io.camunda.zeebe.spring.client.CamundaAutoConfiguration; -import io.camunda.zeebe.spring.client.jobhandling.DefaultCommandExceptionHandlingStrategy; import io.vanillabp.camunda8.deployment.Camunda8DeploymentAdapter; import io.vanillabp.camunda8.deployment.DeploymentRepository; import io.vanillabp.camunda8.deployment.DeploymentResourceRepository; import io.vanillabp.camunda8.deployment.DeploymentService; import io.vanillabp.camunda8.service.Camunda8ProcessService; +import io.vanillabp.camunda8.service.Camunda8TransactionInterceptor; +import io.vanillabp.camunda8.service.Camunda8TransactionProcessor; import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type; import io.vanillabp.camunda8.wiring.Camunda8TaskHandler; import io.vanillabp.camunda8.wiring.Camunda8TaskWiring; @@ -23,15 +24,19 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigurationPackage; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; import org.springframework.data.repository.CrudRepository; +import org.springframework.transaction.interceptor.TransactionInterceptor; import java.lang.reflect.Method; import java.util.List; @@ -57,9 +62,6 @@ public class Camunda8AdapterConfiguration extends AdapterConfigurationBase Camunda8ProcessService newProcessServiceImplementation( final var result = new Camunda8ProcessService( camunda8Properties, + eventPublisher, workflowAggregateRepository, - workflowAggregate -> springDataUtil.getId(workflowAggregate), + springDataUtil::getId, workflowAggregateClass); putConnectableService(workflowAggregateClass, result); @@ -185,4 +190,29 @@ public SpringBeanUtil vanillabpSpringBeanUtil( } + /* + * https://www.tirasa.net/en/blog/dynamic-spring-s-transactional-2020-edition + */ + @Bean + public static BeanFactoryPostProcessor camunda8TransactionInterceptorInjector() { + + return beanFactory -> { + String[] names = beanFactory.getBeanNamesForType(TransactionInterceptor.class); + for (String name : names) { + BeanDefinition bd = beanFactory.getBeanDefinition(name); + bd.setBeanClassName(Camunda8TransactionInterceptor.class.getName()); + bd.setFactoryBeanName(null); + bd.setFactoryMethodName(null); + } + }; + + } + + @Bean + public Camunda8TransactionProcessor camunda8TransactionProcessor() { + + return new Camunda8TransactionProcessor(); + + } + } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java index f058308..80fd1a1 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java @@ -7,12 +7,16 @@ import io.vanillabp.springboot.adapter.ProcessServiceImplementation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.repository.CrudRepository; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; @Transactional(propagation = Propagation.MANDATORY) @@ -29,18 +33,22 @@ public class Camunda8ProcessService private final Camunda8VanillaBpProperties camunda8Properties; + private final ApplicationEventPublisher publisher; + private AdapterAwareProcessService parent; private ZeebeClient client; - + public Camunda8ProcessService( final Camunda8VanillaBpProperties camunda8Properties, + final ApplicationEventPublisher publisher, final CrudRepository workflowAggregateRepository, final Function getWorkflowAggregateId, final Class workflowAggregateClass) { super(); this.camunda8Properties = camunda8Properties; + this.publisher = publisher; this.workflowAggregateRepository = workflowAggregateRepository; this.workflowAggregateClass = workflowAggregateClass; this.getWorkflowAggregateId = getWorkflowAggregateId; @@ -98,31 +106,33 @@ public CrudRepository getWorkflowAggregateRepository() { @Override public DE startWorkflow( final DE workflowAggregate) throws Exception { - - // persist to get ID in case of @Id @GeneratedValue - // or force optimistic locking exceptions before running - // the workflow if aggregate was already persisted before - final var attachedAggregate = workflowAggregateRepository - .save(workflowAggregate); - - final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId()); - final var command = client - .newCreateInstanceCommand() - .bpmnProcessId(parent.getPrimaryBpmnProcessId()) - .latestVersion() - .variables(attachedAggregate); - (tenantId == null - ? command - : command.tenantId(tenantId)) - .send() - .get(10, TimeUnit.SECONDS); + return runInTransaction( + workflowAggregate, + attachedAggregate -> { + final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId()); + final var command = client + .newCreateInstanceCommand() + .bpmnProcessId(parent.getPrimaryBpmnProcessId()) + .latestVersion() + .variables(attachedAggregate); - try { - return attachedAggregate; - } catch (RuntimeException exception) { - throw exception; - } + try { + (tenantId == null + ? command + : command.tenantId(tenantId)) + .send() + .get(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException( + "Starting workflow '" + + parent.getPrimaryBpmnProcessId() + + "‘ for aggregate '" + + attachedAggregate + + "' failed!", + e); + } + }); } @@ -130,19 +140,19 @@ public DE startWorkflow( public DE correlateMessage( final DE workflowAggregate, final String messageName) { - - final var attachedAggregate = workflowAggregateRepository - .save(workflowAggregate); - final var correlationId = getWorkflowAggregateId - .apply(workflowAggregate); - - correlateMessage( + + return runInTransaction( workflowAggregate, - messageName, - correlationId.toString()); - - return attachedAggregate; - + attachedAggregate -> { + final var correlationId = getWorkflowAggregateId + .apply(workflowAggregate); + + doCorrelateMessage( + workflowAggregate, + messageName, + correlationId.toString()); + }); + } @Override @@ -161,12 +171,20 @@ public DE correlateMessage( final DE workflowAggregate, final String messageName, final String correlationId) { - - // persist to get ID in case of @Id @GeneratedValue - // and force optimistic locking exceptions before running - // the workflow if aggregate was already persisted before - final var attachedAggregate = workflowAggregateRepository - .save(workflowAggregate); + + return runInTransaction( + workflowAggregate, + attachedAggregate -> doCorrelateMessage( + attachedAggregate, + messageName, + correlationId)); + + } + + private void doCorrelateMessage( + final DE attachedAggregate, + final String messageName, + final String correlationId) { final var tenantId = camunda8Properties.getTenantId(parent.getWorkflowModuleId()); final var command = client @@ -188,8 +206,6 @@ public DE correlateMessage( parent.getPrimaryBpmnProcessId(), messageKey); - return attachedAggregate; - } @Override @@ -209,23 +225,22 @@ public DE correlateMessage( public DE completeTask( final DE workflowAggregate, final String taskId) { - - // force optimistic locking exceptions before running the workflow - final var attachedAggregate = workflowAggregateRepository - .save(workflowAggregate); - - client - .newCompleteCommand(Long.parseLong(taskId, 16)) - .variables(attachedAggregate) - .send() - .join(); - logger.trace("Complete usertask '{}' for process '{}'", + return runInTransaction( + workflowAggregate, taskId, - parent.getPrimaryBpmnProcessId()); - - return attachedAggregate; - + attachedAggregate -> { + client + .newCompleteCommand(Long.parseLong(taskId, 16)) + .variables(attachedAggregate) + .send() + .join(); + + logger.trace("Complete task '{}' of process '{}'", + taskId, + parent.getPrimaryBpmnProcessId()); + }); + } @Override @@ -243,22 +258,21 @@ public DE cancelTask( final String taskId, final String errorCode) { - // force optimistic locking exceptions before running the workflow - final var attachedAggregate = workflowAggregateRepository - .save(workflowAggregate); - - client - .newThrowErrorCommand(Long.parseLong(taskId)) - .errorCode(errorCode) - .send() - .join(); - - logger.trace("Complete usertask '{}' for process '{}'", + return runInTransaction( + workflowAggregate, taskId, - parent.getPrimaryBpmnProcessId()); - - return attachedAggregate; - + attachedAggregate -> { + client + .newThrowErrorCommand(Long.parseLong(taskId)) + .errorCode(errorCode) + .send() + .join(); + + logger.trace("Complete task '{}' of process '{}'", + taskId, + parent.getPrimaryBpmnProcessId()); + }); + } @Override @@ -271,4 +285,50 @@ public DE cancelUserTask( } + private DE runInTransaction( + final DE workflowAggregate, + final Consumer runnable) { + + return runInTransaction( + workflowAggregate, + null, + runnable); + + } + + private DE runInTransaction( + final DE workflowAggregate, + final String taskIdToTestForAlreadyCompletedOrCancelled, + final Consumer runnable) { + + // persist to get ID in case of @Id @GeneratedValue + // or force optimistic locking exceptions before running + // the workflow if aggregate was already persisted before + final var attachedAggregate = workflowAggregateRepository + .save(workflowAggregate); + + if (TransactionSynchronizationManager.isActualTransactionActive()) { + if (taskIdToTestForAlreadyCompletedOrCancelled != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( + Camunda8TransactionInterceptor.class, + () -> client + .newUpdateTimeoutCommand(Long.parseUnsignedLong(taskIdToTestForAlreadyCompletedOrCancelled, 16)) + .timeout(Duration.ofMinutes(10)) + .send() + .join(5, TimeUnit.MINUTES), // needs to run synchronously + () -> "update timeout (BPMN: " + parent.getPrimaryBpmnProcessId() + ")")); } + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + Camunda8TransactionInterceptor.class, + () -> runnable.accept(attachedAggregate), + () -> "complete command (BPMN: " + parent.getPrimaryBpmnProcessId() + ")")); + } else { + runnable.accept(attachedAggregate); + } + + return attachedAggregate; + + } + } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java new file mode 100644 index 0000000..d4ff928 --- /dev/null +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java @@ -0,0 +1,106 @@ +package io.vanillabp.camunda8.service; + +import io.vanillabp.spi.service.TaskException; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.transaction.TransactionManager; +import org.springframework.transaction.interceptor.TransactionAttributeSource; +import org.springframework.transaction.interceptor.TransactionInterceptor; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.lang.reflect.Method; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +public class Camunda8TransactionInterceptor extends TransactionInterceptor { + + private final ApplicationEventPublisher publisher; + + public static final ThreadLocal actions = ThreadLocal.withInitial(TaskHandlerActions::new); + + public Camunda8TransactionInterceptor( + final TransactionManager ptm, + final TransactionAttributeSource tas, + final ApplicationEventPublisher publisher) { + super(ptm, tas); + this.publisher = publisher; + } + + public static class TaskHandlerActions { + public Map.Entry> testForTaskAlreadyCompletedOrCancelledCommand; + public Map.Entry, Function> bpmnErrorCommand; + public Map.Entry, Function> handlerFailedCommand; + public Map.Entry> handlerCompletedCommand; + } + + @Override + protected Object invokeWithinTransaction( + final Method method, + final Class targetClass, + final InvocationCallback invocation) throws Throwable { + + return super.invokeWithinTransaction(method, targetClass, () -> { + if (!TransactionSynchronizationManager.isActualTransactionActive()) { + return invocation.proceedWithInvocation(); + } + try { + final var result = invocation.proceedWithInvocation(); + if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( + Camunda8TransactionInterceptor.class, + actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(), + actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue())); + } + if (actions.get().handlerCompletedCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + Camunda8TransactionInterceptor.class, + actions.get().handlerCompletedCommand.getKey(), + actions.get().handlerCompletedCommand.getValue())); + } + return result; + } catch (TaskException taskError) { + if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( + Camunda8TransactionInterceptor.class, + actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(), + actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue())); + } + if (actions.get().bpmnErrorCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + Camunda8TransactionInterceptor.class, + () -> actions.get().bpmnErrorCommand.getKey().accept(taskError), + () -> actions.get().bpmnErrorCommand.getValue().apply(taskError))); + } + return null; + } catch (Exception e) { + if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( + Camunda8TransactionInterceptor.class, + actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(), + actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue())); + } + if (actions.get().handlerFailedCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + Camunda8TransactionInterceptor.class, + () -> actions.get().handlerFailedCommand.getKey().accept(e), + () -> actions.get().handlerFailedCommand.getValue().apply(e))); + } + throw e; + } finally { + actions.get().bpmnErrorCommand = null; + actions.get().handlerCompletedCommand = null; + actions.get().handlerFailedCommand = null; + actions.get().testForTaskAlreadyCompletedOrCancelledCommand = null; + } + }); + + } + +} diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java new file mode 100644 index 0000000..7475240 --- /dev/null +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java @@ -0,0 +1,147 @@ +package io.vanillabp.camunda8.service; + +import io.camunda.zeebe.client.api.command.ClientStatusException; +import io.grpc.Status; +import io.vanillabp.spi.service.TaskException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEvent; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; +import org.springframework.transaction.interceptor.TransactionAspectSupport; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +public class Camunda8TransactionProcessor { + + private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionProcessor.class); + + public static void registerCallbacks( + final Map.Entry> testForTaskAlreadyCompletedOrCancelledCommand, + final Map.Entry, Function> bpmnErrorCommand, + final Map.Entry, Function> handlerFailedCommand, + final Map.Entry> handlerCompletedCommand) { + + final var actions = Camunda8TransactionInterceptor.actions.get(); + actions.testForTaskAlreadyCompletedOrCancelledCommand = testForTaskAlreadyCompletedOrCancelledCommand; + actions.bpmnErrorCommand = bpmnErrorCommand; + actions.handlerFailedCommand = handlerFailedCommand; + actions.handlerCompletedCommand = handlerCompletedCommand; + + } + + public static Map.Entry, Function> bpmnErrorCommandCallback() { + + return Camunda8TransactionInterceptor + .actions + .get() + .bpmnErrorCommand; + + } + + public static Map.Entry, Function> handlerFailedCommandCallback() { + + return Camunda8TransactionInterceptor + .actions + .get() + .handlerFailedCommand; + + } + + public static Map.Entry> handlerCompletedCommandCallback() { + + return Camunda8TransactionInterceptor + .actions + .get() + .handlerCompletedCommand; + + } + + public static void unregisterCallbacks() { + + final var actions = Camunda8TransactionInterceptor.actions.get(); + actions.testForTaskAlreadyCompletedOrCancelledCommand = null; + actions.bpmnErrorCommand = null; + actions.handlerFailedCommand = null; + actions.handlerCompletedCommand = null; + + } + + public static class Camunda8CommandAfterTx extends ApplicationEvent { + final Supplier description; + final Runnable runnable; + public Camunda8CommandAfterTx( + final Object source, + final Runnable runnable, + final Supplier description) { + super(source); + this.runnable = runnable; + this.description = description; + } + } + + public static class Camunda8TestForTaskAlreadyCompletedOrCancelled extends ApplicationEvent { + final Supplier description; + final Runnable runnable; + public Camunda8TestForTaskAlreadyCompletedOrCancelled( + final Object source, + final Runnable runnable, + final Supplier description) { + super(source); + this.runnable = runnable; + this.description = description; + } + } + + @TransactionalEventListener( + phase = TransactionPhase.BEFORE_COMMIT, + fallbackExecution = true) + public void processPreCommit( + final Camunda8TestForTaskAlreadyCompletedOrCancelled event) { + + try { + // this runnable will test whether the task still exists + event.runnable.run(); + } catch (Exception e) { + // if the task is completed or cancelled, then the tx is rolled back + if ((e instanceof ClientStatusException clientStatusException) + && (clientStatusException.getStatus().getCode() == Status.NOT_FOUND.getCode())) { + logger.warn( + "Will rollback because job was already completed/cancelled! Tested with command '{}‘ giving status 'NOT_FOUND'", + event.description.get()); + } else { + logger.warn( + "Will rollback because testing for job '{}' failed!", + event.description.get(), + e); + } + if (TransactionSynchronizationManager.isActualTransactionActive()) { + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + } + } + + } + + @TransactionalEventListener( + phase = TransactionPhase.AFTER_COMMIT, + fallbackExecution = true) + public void processPostCommit( + final Camunda8CommandAfterTx event) { + + try { + // this runnable will instruct Zeebe + event.runnable.run(); + } catch (Exception e) { + logger.error( + "Could not execute '{}'! Manual action required!", + event.description.get(), + e); + } + + } + +} diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java index 3d5d39f..848d27f 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java @@ -1,15 +1,14 @@ package io.vanillabp.camunda8.wiring; -import io.camunda.zeebe.client.api.command.FinalCommandStep; +import io.camunda.zeebe.client.ZeebeClient; import io.camunda.zeebe.client.api.response.ActivatedJob; import io.camunda.zeebe.client.api.worker.JobClient; import io.camunda.zeebe.client.api.worker.JobHandler; -import io.camunda.zeebe.spring.client.jobhandling.CommandWrapper; -import io.camunda.zeebe.spring.client.jobhandling.DefaultCommandExceptionHandlingStrategy; +import io.vanillabp.camunda8.service.Camunda8TransactionProcessor; import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type; import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceIndexMethodParameter; import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceTotalMethodParameter; -import io.vanillabp.spi.service.TaskEvent.Event; +import io.vanillabp.spi.service.TaskEvent; import io.vanillabp.spi.service.TaskException; import io.vanillabp.springboot.adapter.MultiInstance; import io.vanillabp.springboot.adapter.TaskHandlerBase; @@ -18,26 +17,29 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.repository.CrudRepository; -import org.springframework.transaction.annotation.Transactional; import java.lang.reflect.Method; +import java.time.Duration; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; -public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler { +public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler, Consumer { private static final Logger logger = LoggerFactory.getLogger(Camunda8TaskHandler.class); - private final DefaultCommandExceptionHandlingStrategy commandExceptionHandlingStrategy; - private final Type taskType; private final String idPropertyName; + private ZeebeClient zeebeClient; + public Camunda8TaskHandler( final Type taskType, - final DefaultCommandExceptionHandlingStrategy commandExceptionHandlingStrategy, final CrudRepository workflowAggregateRepository, final Object bean, final Method method, @@ -46,11 +48,18 @@ public Camunda8TaskHandler( super(workflowAggregateRepository, bean, method, parameters); this.taskType = taskType; - this.commandExceptionHandlingStrategy = commandExceptionHandlingStrategy; this.idPropertyName = idPropertyName; } - + + @Override + public void accept( + final ZeebeClient zeebeClient) { + + this.zeebeClient = zeebeClient; + + } + @Override protected Logger getLogger() { @@ -60,28 +69,34 @@ protected Logger getLogger() { @SuppressWarnings("unchecked") @Override - @Transactional public void handle( final JobClient client, - final ActivatedJob job) throws Exception { + final ActivatedJob job) { - CommandWrapper command = null; + Runnable jobPostAction = null; + Supplier description = null; try { final var businessKey = getVariable(job, idPropertyName); - - logger.trace("Will handle task '{}' of workflow '{}' ('{}') as job '{}'", + + logger.trace("Will handle task '{}' (task-definition '{}‘) of workflow '{}' (instance-id '{}') as job '{}'", job.getElementId(), + job.getType(), + job.getBpmnProcessId(), job.getProcessInstanceKey(), - job.getProcessDefinitionKey(), job.getKey()); - + final var taskIdRetrieved = new AtomicBoolean(false); - + final var workflowAggregateCache = new WorkflowAggregateCache(); + + Camunda8TransactionProcessor.registerCallbacks( + doTestForTaskWasCompletedOrCancelled(job), + doThrowError(client, job, workflowAggregateCache), + doFailed(client, job), + doComplete(client, job, workflowAggregateCache)); + final Function multiInstanceSupplier = multiInstanceVariable -> getVariable(job, multiInstanceVariable); - - final var workflowAggregateCache = new WorkflowAggregateCache(); - + super.execute( workflowAggregateCache, businessKey, @@ -100,7 +115,7 @@ public void handle( (args, param) -> processTaskEventParameter( args, param, - () -> Event.CREATED), + () -> TaskEvent.Event.CREATED), (args, param) -> processMultiInstanceIndexParameter( args, param, @@ -127,21 +142,48 @@ public void handle( if ((taskType != Type.USERTASK) && !taskIdRetrieved.get()) { - command = createCompleteCommand(client, job, workflowAggregateCache.workflowAggregate); + final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback(); + if (callback != null) { + jobPostAction = callback.getKey(); + description = callback.getValue(); + } } } catch (TaskException bpmnError) { - command = createThrowErrorCommand(client, job, bpmnError); + final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback(); + if (callback != null) { + jobPostAction = () -> callback.getKey().accept(bpmnError); + description = () -> callback.getValue().apply(bpmnError); + } } catch (Exception e) { - logger.error("Failed to execute job '{}'", job.getKey(), e); - command = createFailedCommand(client, job, e); + final var callback = Camunda8TransactionProcessor.handlerFailedCommandCallback(); + if (callback != null) { + logger.error("Failed to execute job '{}'", job.getKey(), e); + jobPostAction = () -> callback.getKey().accept(e); + description = () -> callback.getValue().apply(e); + } + } finally { + Camunda8TransactionProcessor.unregisterCallbacks(); } - if (command != null) { - command.executeAsync(); + if (jobPostAction != null) { + try { + jobPostAction.run(); + } catch (Exception e) { + if (description != null) { + logger.error( + "Could not execute '{}'! Manual action required!", + description.get(), + e); + } else { + logger.error( + "Manual action required due to:", + e); + } + } } } - + @Override protected Object getMultiInstanceElement( final String name, @@ -195,54 +237,105 @@ private Object getVariable( } @SuppressWarnings("unchecked") - public CommandWrapper createCompleteCommand( + public Map.Entry> doTestForTaskWasCompletedOrCancelled( + final ActivatedJob job) { + + return Map.entry( + () -> zeebeClient + .newUpdateTimeoutCommand(job) + .timeout(Duration.ofMinutes(10)) + .send() + .join(5, TimeUnit.MINUTES), // needs to run synchronously + () -> "update timeout (BPMN: " + job.getBpmnProcessId() + + "; Element: " + job.getElementId() + + "; Task-Definition: " + job.getType() + + "; Process-Instance: " + job.getProcessInstanceKey() + + "; Job: " + job.getKey() + + ")"); + + } + + @SuppressWarnings("unchecked") + public Map.Entry> doComplete( final JobClient jobClient, final ActivatedJob job, - final Object workflowAggregateId) { + final WorkflowAggregateCache workflowAggregateCache) { - var completeCommand = jobClient - .newCompleteCommand(job.getKey()); - - if (workflowAggregateId != null) { - completeCommand = completeCommand.variables(workflowAggregateId); - } - - return new CommandWrapper( - (FinalCommandStep) ((FinalCommandStep) completeCommand), - job, - commandExceptionHandlingStrategy); + return Map.entry( + () -> { + var completeCommand = jobClient + .newCompleteCommand(job.getKey()); + + if (workflowAggregateCache.workflowAggregate != null) { + completeCommand = completeCommand.variables(workflowAggregateCache.workflowAggregate); + } + + completeCommand + .send() + .exceptionally(t -> { + throw new RuntimeException("error", t); + }); + }, + () -> "complete command (BPMN: " + job.getBpmnProcessId() + + "; Element: " + job.getElementId() + + "; Task-Definition: " + job.getType() + + "; Process-Instance: " + job.getProcessInstanceKey() + + "; Job: " + job.getKey() + + ")"); } - private CommandWrapper createThrowErrorCommand( + private Map.Entry, Function> doThrowError( final JobClient jobClient, final ActivatedJob job, - final TaskException bpmnError) { + final WorkflowAggregateCache workflowAggregateCache) { - return new CommandWrapper( - jobClient - .newThrowErrorCommand(job.getKey()) - .errorCode(bpmnError.getErrorCode()) - .errorMessage(bpmnError.getErrorName()), - job, - commandExceptionHandlingStrategy); + return Map.entry( + taskException -> { + var throwErrorCommand = jobClient + .newThrowErrorCommand(job.getKey()) + .errorCode(taskException.getErrorCode()) + .errorMessage(taskException.getErrorName()); + if (workflowAggregateCache.workflowAggregate != null) { + throwErrorCommand = throwErrorCommand.variables(workflowAggregateCache.workflowAggregate); + } + + throwErrorCommand + .send() + .exceptionally(t -> { throw new RuntimeException("error", t); }); + }, + taskException -> "throw error command (BPMN: " + job.getBpmnProcessId() + + "; Element: " + job.getElementId() + + "; Task-Definition: " + job.getType() + + "; Process-Instance: " + job.getProcessInstanceKey() + + "; Job: " + job.getKey() + + ")"); } @SuppressWarnings("unchecked") - private CommandWrapper createFailedCommand( + private Map.Entry, Function> doFailed( final JobClient jobClient, - final ActivatedJob job, - final Exception e) { - - return new CommandWrapper( - (FinalCommandStep) ((FinalCommandStep) jobClient - .newFailCommand(job) - .retries(0) - .errorMessage(e.getMessage())), - job, - commandExceptionHandlingStrategy); - + final ActivatedJob job) { + + return Map.entry( + exception -> { + jobClient + .newFailCommand(job) + .retries(0) + .errorMessage(exception.getMessage()) + .send() + .exceptionally(t -> { + throw new RuntimeException("error", t); + }); + }, + taskException -> "fail command (BPMN: " + job.getBpmnProcessId() + + "; Element: " + job.getElementId() + + "; Task-Definition: " + job.getType() + + "; Process-Instance: " + job.getProcessInstanceKey() + + "; Job: " + job.getKey() + + ")"); + } } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java index c3da080..c346cee 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java @@ -55,6 +55,8 @@ public class Camunda8TaskWiring extends TaskWiringBase workers = new LinkedList<>(); + private List handlers = new LinkedList<>(); + private Set userTaskTenantIds = new HashSet<>(); private final Camunda8VanillaBpProperties camunda8Properties; @@ -96,6 +98,7 @@ public void accept( final ZeebeClient client) { this.client = client; + handlers.forEach(handler -> handler.accept(client)); } @@ -239,6 +242,11 @@ protected void connectToBpms( method, parameters, idPropertyName); + if (this.client != null) { + taskHandler.accept(this.client); + } else { + handlers.add(taskHandler); + } if (connectable.getType() == Type.USERTASK) { @@ -251,7 +259,7 @@ protected void connectToBpms( return; } - + final var variablesToFetch = getVariablesToFetch(idPropertyName, parameters); final var worker = client .newWorker() diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java index 7daa7ec..c75b2a9 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8UserTaskHandler.java @@ -18,7 +18,7 @@ public class Camunda8UserTaskHandler implements JobHandler { private final Map taskHandlers = new HashMap<>(); - private String workerId; + private final String workerId; public Camunda8UserTaskHandler( final String workerId) { From eb899ecb0bd24d5adcde647209da845dcf3025f3 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Fri, 19 Apr 2024 14:40:59 +0200 Subject: [PATCH 02/15] Improve README.md --- spring-boot/README.md | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/spring-boot/README.md b/spring-boot/README.md index b2dfcf8..20ed3cc 100644 --- a/spring-boot/README.md +++ b/spring-boot/README.md @@ -190,15 +190,51 @@ Since Camunda 8 is an external system to your services one has to deal with even 1. Camunda 8 is not reachable / cannot process the confirmation of a completed task 1. The task to be completed was cancelled meanwhile e.g. due to boundary events -If there is an exception in your business code and you have to roll back the transaction then Camunda's task retry-mechanism should retry as configured. Additionally, the `TaskException` is used for expected business errors handled by BPMN error boundary events which must not cause a rollback. To achieve both one should mark the service bean like this: +In order to activate this behavior one has to mark methods accessing VanillaBP-APIs as `@Transactional`, either by +using the method-level annotation: ```java @Service @WorkflowService(workflowAggregateClass = Ride.class) -@Transactional(noRollbackFor = TaskException.class) public class TaxiRide { + @Autowired + private ProcessService processService; + + @Transactional + public void receivePayment(...) { + ... + processService.startWorkflow(ride); + ... + } + + @Transactional + @WorkflowTask + public void chargeCreditCard(final Ride ride) { + ... + } + + @Transactional + public void paymentReceived(final Ride ride) { + ... + processService.correlateMessage(ride, 'PaymentReceived'); + ... + } +} +``` + +or the class-level annotation: + +```java +@Service +@WorkflowService(workflowAggregateClass = Ride.class) +@Transactional +public class TaxiRide { + ... +} ``` +If there is an exception in your business code and you have to roll back the transaction then Camunda's task retry-mechanism should retry as configured. Additionally, the `TaskException` is used for expected business errors handled by BPMN error boundary events which must not cause a rollback. This is handled by the adapter, one does not need to take care about it. + ## Workflow aggregate serialization On using C7 one can use workflow aggregates having relations and calculated values: From 8c4b4683ebb7cdbb811a00bc5fb43475bad8be87 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Fri, 19 Apr 2024 14:41:33 +0200 Subject: [PATCH 03/15] Upgade version due to TX incompatibility --- pom.xml | 2 +- spring-boot/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 462198a..35f60d6 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ camunda8-adapter VanillaBP SPI adapter for Camunda 8.x - 1.3.1-SNAPSHOT + 1.4.0-SNAPSHOT pom diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml index 9b58fa0..4f239c5 100644 --- a/spring-boot/pom.xml +++ b/spring-boot/pom.xml @@ -5,7 +5,7 @@ org.camunda.community.vanillabp camunda8-adapter - 1.3.1-SNAPSHOT + 1.4.0-SNAPSHOT camunda8-spring-boot-adapter From 5597ab3348f91bb4051f31621036a24fcb10460c Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Mon, 6 May 2024 14:34:28 +0200 Subject: [PATCH 04/15] Fix wrong column type --- .../io/vanillabp/camunda8/liquibase/initial_setup.yaml | 2 +- .../resources/io/vanillabp/camunda8/liquibase/issue_26.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml index ef56bbe..c98fd6b 100644 --- a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml +++ b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/initial_setup.yaml @@ -2,7 +2,7 @@ databaseChangeLog: - changeSet: id: initial_setup.yaml author: stephanpelikan - dbms: "!oracle" + dbms: "h2" changes: - createTable: tableName: CAMUNDA8_RESOURCES diff --git a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml index f846ca2..abfa18e 100644 --- a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml +++ b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml @@ -2,7 +2,7 @@ databaseChangeLog: - changeSet: id: issue_26.yaml#initial_setup author: stephanpelikan - dbms: oracle + dbms: "!h2" changes: - createTable: tableName: CAMUNDA8_RESOURCES @@ -108,7 +108,7 @@ databaseChangeLog: - changeSet: id: issue_26.yaml#rename_columns author: stephanpelikan - dbms: "!oracle" + dbms: h2 changes: - renameColumn: tableName: CAMUNDA8_RESOURCES From 7ffc561b6d4358cf7602781ed1b7e332aedccdc2 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Mon, 6 May 2024 14:34:57 +0200 Subject: [PATCH 05/15] Upgrade to Camunda Spring Zeebe 8.5.2 --- spring-boot/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml index 4f239c5..8a00392 100644 --- a/spring-boot/pom.xml +++ b/spring-boot/pom.xml @@ -13,7 +13,7 @@ UTF-8 - 8.5.0-rc2 + 8.5.2 From 5a05238f5d7506aa68e684ef0bc7a89ebb53d9a2 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Wed, 8 May 2024 11:35:29 +0200 Subject: [PATCH 06/15] Change column type to BIGINT --- .../io/vanillabp/camunda8/liquibase/issue_26.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml index abfa18e..dd7498f 100644 --- a/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml +++ b/spring-boot/src/main/resources/io/vanillabp/camunda8/liquibase/issue_26.yaml @@ -162,3 +162,11 @@ databaseChangeLog: tableName: CAMUNDA8_DEPLOYMENTS oldColumnName: BPMN_PROCESS_ID newColumnName: C8D_BPMN_PROCESS_ID + - changeSet: + id: issue_26.yaml#change_type_of_definition_key + author: stephanpelikan + changes: + - modifyDataType: + columnName: C8D_DEFINITION_KEY + newDataType: bigint + tableName: CAMUNDA8_DEPLOYMENTS From 31872bee17084b0a7f22f1fd27d9768e3a188d71 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Thu, 23 May 2024 12:59:31 +0200 Subject: [PATCH 07/15] Upgrade Spring Zeebe to 8.5.3 --- spring-boot/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml index 8a00392..e40afdf 100644 --- a/spring-boot/pom.xml +++ b/spring-boot/pom.xml @@ -13,7 +13,7 @@ UTF-8 - 8.5.2 + 8.5.3 From 5993aec80234627a6d23dea2d631321c315b1cf1 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Tue, 4 Jun 2024 13:35:49 +0200 Subject: [PATCH 08/15] Stop unintentionally auto-completing user-tasks in transactions --- spring-boot/pom.xml | 5 + .../Camunda8AdapterConfiguration.java | 19 ++- .../service/Camunda8ProcessService.java | 62 +++++--- .../service/Camunda8TransactionAspect.java | 133 ++++++++++++++++++ .../service/Camunda8TransactionProcessor.java | 34 +++-- ...otUsedCamunda8TransactionInterceptor.java} | 42 +++--- .../camunda8/wiring/Camunda8TaskHandler.java | 40 ++++-- 7 files changed, 262 insertions(+), 73 deletions(-) create mode 100644 spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java rename spring-boot/src/main/java/io/vanillabp/camunda8/service/{Camunda8TransactionInterceptor.java => NotUsedCamunda8TransactionInterceptor.java} (78%) diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml index e40afdf..10a003a 100644 --- a/spring-boot/pom.xml +++ b/spring-boot/pom.xml @@ -47,6 +47,11 @@ spring-tx 6.1.3 + + org.springframework.boot + spring-boot-starter-aop + 3.2.5 + jakarta.persistence jakarta.persistence-api diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java index a1a89c4..033ed65 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java @@ -6,7 +6,7 @@ import io.vanillabp.camunda8.deployment.DeploymentResourceRepository; import io.vanillabp.camunda8.deployment.DeploymentService; import io.vanillabp.camunda8.service.Camunda8ProcessService; -import io.vanillabp.camunda8.service.Camunda8TransactionInterceptor; +import io.vanillabp.camunda8.service.Camunda8TransactionAspect; import io.vanillabp.camunda8.service.Camunda8TransactionProcessor; import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type; import io.vanillabp.camunda8.wiring.Camunda8TaskHandler; @@ -18,14 +18,14 @@ import io.vanillabp.springboot.adapter.VanillaBpProperties; import io.vanillabp.springboot.parameters.MethodParameter; import jakarta.annotation.PostConstruct; +import java.lang.reflect.Method; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.beans.factory.config.BeanDefinition; -import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigurationPackage; import org.springframework.boot.autoconfigure.AutoConfigureBefore; @@ -36,10 +36,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; import org.springframework.data.repository.CrudRepository; -import org.springframework.transaction.interceptor.TransactionInterceptor; - -import java.lang.reflect.Method; -import java.util.List; @AutoConfigurationPackage(basePackageClasses = Camunda8AdapterConfiguration.class) @AutoConfigureBefore(CamundaAutoConfiguration.class) @@ -82,6 +78,13 @@ public void init() { } + @Bean + public Camunda8TransactionAspect camunda8TransactionAspect() { + + return new Camunda8TransactionAspect(eventPublisher); + + } + @Override public String getAdapterId() { @@ -193,6 +196,7 @@ public SpringBeanUtil vanillabpSpringBeanUtil( /* * https://www.tirasa.net/en/blog/dynamic-spring-s-transactional-2020-edition */ + /* @Bean public static BeanFactoryPostProcessor camunda8TransactionInterceptorInjector() { @@ -207,6 +211,7 @@ public static BeanFactoryPostProcessor camunda8TransactionInterceptorInjector() }; } + */ @Bean public Camunda8TransactionProcessor camunda8TransactionProcessor() { diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java index 80fd1a1..0c7eda7 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8ProcessService.java @@ -5,6 +5,11 @@ import io.vanillabp.camunda8.Camunda8VanillaBpProperties; import io.vanillabp.springboot.adapter.AdapterAwareProcessService; import io.vanillabp.springboot.adapter.ProcessServiceImplementation; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; @@ -13,12 +18,6 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronizationManager; -import java.time.Duration; -import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; - @Transactional(propagation = Propagation.MANDATORY) public class Camunda8ProcessService implements ProcessServiceImplementation { @@ -36,7 +35,7 @@ public class Camunda8ProcessService private final ApplicationEventPublisher publisher; private AdapterAwareProcessService parent; - + private ZeebeClient client; public Camunda8ProcessService( @@ -132,7 +131,8 @@ public DE startWorkflow( + "' failed!", e); } - }); + }, + "startWorkflow"); } @@ -151,7 +151,8 @@ public DE correlateMessage( workflowAggregate, messageName, correlationId.toString()); - }); + }, + "correlateMessage"); } @@ -177,7 +178,8 @@ public DE correlateMessage( attachedAggregate -> doCorrelateMessage( attachedAggregate, messageName, - correlationId)); + correlationId), + "correlateMessage-by-correlationId"); } @@ -239,7 +241,8 @@ public DE completeTask( logger.trace("Complete task '{}' of process '{}'", taskId, parent.getPrimaryBpmnProcessId()); - }); + }, + "completeTask"); } @@ -248,7 +251,21 @@ public DE completeUserTask( final DE workflowAggregate, final String taskId) { - return completeTask(workflowAggregate, taskId); + return runInTransaction( + workflowAggregate, + taskId, + attachedAggregate -> { + client + .newCompleteCommand(Long.parseLong(taskId, 16)) + .variables(attachedAggregate) + .send() + .join(); + + logger.trace("Complete user task '{}' of process '{}'", + taskId, + parent.getPrimaryBpmnProcessId()); + }, + "completeUserTask"); } @@ -271,7 +288,8 @@ public DE cancelTask( logger.trace("Complete task '{}' of process '{}'", taskId, parent.getPrimaryBpmnProcessId()); - }); + }, + "cancelTask"); } @@ -287,19 +305,22 @@ public DE cancelUserTask( private DE runInTransaction( final DE workflowAggregate, - final Consumer runnable) { + final Consumer runnable, + final String methodSignature) { return runInTransaction( workflowAggregate, null, - runnable); + runnable, + methodSignature); } private DE runInTransaction( final DE workflowAggregate, final String taskIdToTestForAlreadyCompletedOrCancelled, - final Consumer runnable) { + final Consumer runnable, + final String methodSignature) { // persist to get ID in case of @Id @GeneratedValue // or force optimistic locking exceptions before running @@ -311,18 +332,19 @@ private DE runInTransaction( if (taskIdToTestForAlreadyCompletedOrCancelled != null) { publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( - Camunda8TransactionInterceptor.class, + methodSignature, () -> client .newUpdateTimeoutCommand(Long.parseUnsignedLong(taskIdToTestForAlreadyCompletedOrCancelled, 16)) .timeout(Duration.ofMinutes(10)) .send() .join(5, TimeUnit.MINUTES), // needs to run synchronously - () -> "update timeout (BPMN: " + parent.getPrimaryBpmnProcessId() + ")")); } + () -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId())); + } publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - Camunda8TransactionInterceptor.class, + methodSignature, () -> runnable.accept(attachedAggregate), - () -> "complete command (BPMN: " + parent.getPrimaryBpmnProcessId() + ")")); + () -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId())); } else { runnable.accept(attachedAggregate); } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java new file mode 100644 index 0000000..ee072d6 --- /dev/null +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java @@ -0,0 +1,133 @@ +package io.vanillabp.camunda8.service; + +import io.vanillabp.spi.service.TaskException; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +@Aspect +public class Camunda8TransactionAspect { + + private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionAspect.class); + + public static class TaskHandlerActions { + public Supplier>> testForTaskAlreadyCompletedOrCancelledCommand; + public Map.Entry, Function> bpmnErrorCommand; + public Map.Entry, Function> handlerFailedCommand; + public Supplier>> handlerCompletedCommand; + } + + public static final ThreadLocal actions = ThreadLocal.withInitial(TaskHandlerActions::new); + + private final ApplicationEventPublisher publisher; + + public Camunda8TransactionAspect( + final ApplicationEventPublisher publisher) { + + this.publisher = publisher; + + } + + @Around("@annotation(io.vanillabp.spi.service.WorkflowTask)") + private Object checkForTransaction( + final ProceedingJoinPoint pjp) throws Throwable { + + final var methodSignature = pjp.getSignature().toLongString(); + + if (!TransactionSynchronizationManager.isActualTransactionActive()) { + clearCallbacks(); + logger.trace("Disable TX callbacks for {}: No TX active", methodSignature); + } + try { + + final var value = pjp.proceed(); + if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); + if (handlerTestCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( + methodSignature, + handlerTestCommand.getKey(), + handlerTestCommand.getValue())); + } + } + if (actions.get().handlerCompletedCommand != null) { + final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get(); + if (handlerCompletedCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + handlerCompletedCommand.getKey(), + handlerCompletedCommand.getValue())); + } + } + return value; + + } catch (TaskException taskError) { + + if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); + if (handlerTestCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( + methodSignature, + handlerTestCommand.getKey(), + handlerTestCommand.getValue())); + } + } + if (actions.get().bpmnErrorCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + () -> actions.get().bpmnErrorCommand.getKey().accept(taskError), + () -> actions.get().bpmnErrorCommand.getValue().apply(taskError))); + } + return null; + + } catch (Exception e) { + + if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); + if (handlerTestCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( + methodSignature, + handlerTestCommand.getKey(), + handlerTestCommand.getValue())); + } + } + if (actions.get().handlerFailedCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + () -> actions.get().handlerFailedCommand.getKey().accept(e), + () -> actions.get().handlerFailedCommand.getValue().apply(e))); + } + throw e; + + } finally { + + clearCallbacks(); + + } + + } + + public static void clearCallbacks() { + + actions.get().bpmnErrorCommand = null; + actions.get().handlerCompletedCommand = null; + actions.get().handlerFailedCommand = null; + actions.get().testForTaskAlreadyCompletedOrCancelledCommand = null; + + } + +} diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java index 7475240..82a6b27 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java @@ -3,6 +3,10 @@ import io.camunda.zeebe.client.api.command.ClientStatusException; import io.grpc.Status; import io.vanillabp.spi.service.TaskException; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEvent; @@ -11,22 +15,17 @@ import org.springframework.transaction.interceptor.TransactionAspectSupport; import org.springframework.transaction.support.TransactionSynchronizationManager; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - public class Camunda8TransactionProcessor { private static final Logger logger = LoggerFactory.getLogger(Camunda8TransactionProcessor.class); public static void registerCallbacks( - final Map.Entry> testForTaskAlreadyCompletedOrCancelledCommand, + final Supplier>> testForTaskAlreadyCompletedOrCancelledCommand, final Map.Entry, Function> bpmnErrorCommand, final Map.Entry, Function> handlerFailedCommand, - final Map.Entry> handlerCompletedCommand) { + final Supplier>> handlerCompletedCommand) { - final var actions = Camunda8TransactionInterceptor.actions.get(); + final var actions = Camunda8TransactionAspect.actions.get(); actions.testForTaskAlreadyCompletedOrCancelledCommand = testForTaskAlreadyCompletedOrCancelledCommand; actions.bpmnErrorCommand = bpmnErrorCommand; actions.handlerFailedCommand = handlerFailedCommand; @@ -36,7 +35,7 @@ public static void registerCallbacks( public static Map.Entry, Function> bpmnErrorCommandCallback() { - return Camunda8TransactionInterceptor + return Camunda8TransactionAspect .actions .get() .bpmnErrorCommand; @@ -45,7 +44,7 @@ public static Map.Entry, Function public static Map.Entry, Function> handlerFailedCommandCallback() { - return Camunda8TransactionInterceptor + return Camunda8TransactionAspect .actions .get() .handlerFailedCommand; @@ -54,16 +53,17 @@ public static Map.Entry, Function> handle public static Map.Entry> handlerCompletedCommandCallback() { - return Camunda8TransactionInterceptor + return Camunda8TransactionAspect .actions .get() - .handlerCompletedCommand; + .handlerCompletedCommand + .get(); } public static void unregisterCallbacks() { - final var actions = Camunda8TransactionInterceptor.actions.get(); + final var actions = Camunda8TransactionAspect.actions.get(); actions.testForTaskAlreadyCompletedOrCancelledCommand = null; actions.bpmnErrorCommand = null; actions.handlerFailedCommand = null; @@ -104,6 +104,9 @@ public void processPreCommit( final Camunda8TestForTaskAlreadyCompletedOrCancelled event) { try { + logger.trace("Will test for existence of task '{}' initiated by: {}", + event.description.get(), + event.getSource()); // this runnable will test whether the task still exists event.runnable.run(); } catch (Exception e) { @@ -133,11 +136,14 @@ public void processPostCommit( final Camunda8CommandAfterTx event) { try { + logger.trace("Will execute Camunda command for '{}' initiated by: {}", + event.description.get(), + event.getSource()); // this runnable will instruct Zeebe event.runnable.run(); } catch (Exception e) { logger.error( - "Could not execute '{}'! Manual action required!", + "Could not execute camunda command for '{}'! Manual action required!", event.description.get(), e); } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/NotUsedCamunda8TransactionInterceptor.java similarity index 78% rename from spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java rename to spring-boot/src/main/java/io/vanillabp/camunda8/service/NotUsedCamunda8TransactionInterceptor.java index d4ff928..e5be06a 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionInterceptor.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/NotUsedCamunda8TransactionInterceptor.java @@ -1,25 +1,24 @@ package io.vanillabp.camunda8.service; import io.vanillabp.spi.service.TaskException; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.transaction.TransactionManager; -import org.springframework.transaction.interceptor.TransactionAttributeSource; -import org.springframework.transaction.interceptor.TransactionInterceptor; -import org.springframework.transaction.support.TransactionSynchronizationManager; - import java.lang.reflect.Method; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.transaction.TransactionManager; +import org.springframework.transaction.interceptor.TransactionAttributeSource; +import org.springframework.transaction.interceptor.TransactionInterceptor; +import org.springframework.transaction.support.TransactionSynchronizationManager; -public class Camunda8TransactionInterceptor extends TransactionInterceptor { +public class NotUsedCamunda8TransactionInterceptor extends TransactionInterceptor { private final ApplicationEventPublisher publisher; public static final ThreadLocal actions = ThreadLocal.withInitial(TaskHandlerActions::new); - public Camunda8TransactionInterceptor( + public NotUsedCamunda8TransactionInterceptor( final TransactionManager ptm, final TransactionAttributeSource tas, final ApplicationEventPublisher publisher) { @@ -31,7 +30,7 @@ public static class TaskHandlerActions { public Map.Entry> testForTaskAlreadyCompletedOrCancelledCommand; public Map.Entry, Function> bpmnErrorCommand; public Map.Entry, Function> handlerFailedCommand; - public Map.Entry> handlerCompletedCommand; + public Supplier>> handlerCompletedCommand; } @Override @@ -45,34 +44,39 @@ protected Object invokeWithinTransaction( return invocation.proceedWithInvocation(); } try { + logger.info("Before TX"); final var result = invocation.proceedWithInvocation(); + logger.info("After TX"); if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( - Camunda8TransactionInterceptor.class, + NotUsedCamunda8TransactionInterceptor.class, actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(), actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue())); } if (actions.get().handlerCompletedCommand != null) { - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - Camunda8TransactionInterceptor.class, - actions.get().handlerCompletedCommand.getKey(), - actions.get().handlerCompletedCommand.getValue())); + final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get(); + if (handlerCompletedCommand != null) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + NotUsedCamunda8TransactionInterceptor.class, + handlerCompletedCommand.getKey(), + handlerCompletedCommand.getValue())); + } } return result; } catch (TaskException taskError) { if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( - Camunda8TransactionInterceptor.class, + NotUsedCamunda8TransactionInterceptor.class, actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(), actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue())); } if (actions.get().bpmnErrorCommand != null) { publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - Camunda8TransactionInterceptor.class, + NotUsedCamunda8TransactionInterceptor.class, () -> actions.get().bpmnErrorCommand.getKey().accept(taskError), () -> actions.get().bpmnErrorCommand.getValue().apply(taskError))); } @@ -81,14 +85,14 @@ protected Object invokeWithinTransaction( if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled( - Camunda8TransactionInterceptor.class, + NotUsedCamunda8TransactionInterceptor.class, actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getKey(), actions.get().testForTaskAlreadyCompletedOrCancelledCommand.getValue())); } if (actions.get().handlerFailedCommand != null) { publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - Camunda8TransactionInterceptor.class, + NotUsedCamunda8TransactionInterceptor.class, () -> actions.get().handlerFailedCommand.getKey().accept(e), () -> actions.get().handlerFailedCommand.getValue().apply(e))); } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java index 848d27f..1904f16 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java @@ -14,10 +14,6 @@ import io.vanillabp.springboot.adapter.TaskHandlerBase; import io.vanillabp.springboot.adapter.wiring.WorkflowAggregateCache; import io.vanillabp.springboot.parameters.MethodParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.repository.CrudRepository; - import java.lang.reflect.Method; import java.time.Duration; import java.util.List; @@ -27,6 +23,9 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.repository.CrudRepository; public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler, Consumer { @@ -88,11 +87,29 @@ public void handle( final var taskIdRetrieved = new AtomicBoolean(false); final var workflowAggregateCache = new WorkflowAggregateCache(); + // Any callback used in this method is executed in case of no active transaction. + // In case of an active transaction the callbacks are used by the Camunda8TransactionInterceptor. Camunda8TransactionProcessor.registerCallbacks( - doTestForTaskWasCompletedOrCancelled(job), + () -> { + if (taskType == Type.USERTASK) { + return null; + } + if (taskIdRetrieved.get()) { // async processing of service-task + return null; + } + return doTestForTaskWasCompletedOrCancelled(job); + }, doThrowError(client, job, workflowAggregateCache), doFailed(client, job), - doComplete(client, job, workflowAggregateCache)); + () -> { + if (taskType == Type.USERTASK) { + return null; + } + if (taskIdRetrieved.get()) { // async processing of service-task + return null; + } + return doComplete(client, job, workflowAggregateCache); + }); final Function multiInstanceSupplier = multiInstanceVariable -> getVariable(job, multiInstanceVariable); @@ -140,13 +157,10 @@ public void handle( return workflowAggregateCache.workflowAggregate; }, multiInstanceSupplier)); - if ((taskType != Type.USERTASK) - && !taskIdRetrieved.get()) { - final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback(); - if (callback != null) { - jobPostAction = callback.getKey(); - description = callback.getValue(); - } + final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback(); + if (callback != null) { + jobPostAction = callback.getKey(); + description = callback.getValue(); } } catch (TaskException bpmnError) { final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback(); From 950a7d17dda9b16f5aba54555d7dc02822c5ff4d Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Fri, 7 Jun 2024 10:23:32 +0200 Subject: [PATCH 09/15] Upgrade Spring Zeebe to 8.5.4 --- spring-boot/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-boot/pom.xml b/spring-boot/pom.xml index 10a003a..a990186 100644 --- a/spring-boot/pom.xml +++ b/spring-boot/pom.xml @@ -13,7 +13,7 @@ UTF-8 - 8.5.3 + 8.5.4 From f933a217891ce6c2625b84cf1f05316bdddf9c84 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Fri, 7 Jun 2024 16:18:54 +0200 Subject: [PATCH 10/15] Fix wrong definition of tenant-id property --- .../camunda8/Camunda8VanillaBpProperties.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java index 26e459b..e5b03de 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java @@ -39,8 +39,8 @@ public String getTenantId( if (!configuration.isUseTenants()) { return null; } - if (StringUtils.hasText(configuration.getTenant())) { - return configuration.getTenant(); + if (StringUtils.hasText(configuration.getTenantId())) { + return configuration.getTenantId(); } return workflowModuleId; @@ -101,7 +101,7 @@ public static class AdapterConfiguration extends WorkerProperties { private boolean useTenants = true; - private String tenant; + private String tenantId; public boolean isUseTenants() { return useTenants; @@ -111,12 +111,12 @@ public void setUseTenants(boolean useTenants) { this.useTenants = useTenants; } - public String getTenant() { - return tenant; + public String getTenantId() { + return tenantId; } - public void setTenant(String tenant) { - this.tenant = tenant; + public void setTenantId(String tenantId) { + this.tenantId = tenantId; } } From 378ef1866bdae53a09f9c9638ce888f6203b8525 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Mon, 10 Jun 2024 15:33:39 +0200 Subject: [PATCH 11/15] Fix NPE due to different execution times --- .../camunda8/service/Camunda8TransactionAspect.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java index ee072d6..5a25645 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java @@ -84,11 +84,13 @@ private Object checkForTransaction( } } if (actions.get().bpmnErrorCommand != null) { + final var runnable = actions.get().handlerFailedCommand.getKey(); + final var description = actions.get().handlerFailedCommand.getValue(); publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8CommandAfterTx( methodSignature, - () -> actions.get().bpmnErrorCommand.getKey().accept(taskError), - () -> actions.get().bpmnErrorCommand.getValue().apply(taskError))); + () -> runnable.accept(taskError), + () -> description.apply(taskError))); } return null; @@ -105,11 +107,13 @@ private Object checkForTransaction( } } if (actions.get().handlerFailedCommand != null) { + final var runnable = actions.get().handlerFailedCommand.getKey(); + final var description = actions.get().handlerFailedCommand.getValue(); publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8CommandAfterTx( methodSignature, - () -> actions.get().handlerFailedCommand.getKey().accept(e), - () -> actions.get().handlerFailedCommand.getValue().apply(e))); + () -> runnable.accept(e), + () -> description.apply(e))); } throw e; From 7bd651caaf7d265538c6580d4cfe4e9dbfe477a8 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Fri, 14 Jun 2024 08:14:17 +0200 Subject: [PATCH 12/15] Fix wrong TX command --- .../vanillabp/camunda8/service/Camunda8TransactionAspect.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java index 5a25645..4ba5a73 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java @@ -84,8 +84,8 @@ private Object checkForTransaction( } } if (actions.get().bpmnErrorCommand != null) { - final var runnable = actions.get().handlerFailedCommand.getKey(); - final var description = actions.get().handlerFailedCommand.getValue(); + final var runnable = actions.get().bpmnErrorCommand.getKey(); + final var description = actions.get().bpmnErrorCommand.getValue(); publisher.publishEvent( new Camunda8TransactionProcessor.Camunda8CommandAfterTx( methodSignature, From da0cbe5a496c6e40e4d81460c0bea566b2ed320f Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Tue, 18 Jun 2024 14:47:21 +0200 Subject: [PATCH 13/15] Load and save aggregate as part of an active transaction --- .../service/Camunda8TransactionAspect.java | 160 +++++++++++++--- .../service/Camunda8TransactionProcessor.java | 6 +- .../camunda8/wiring/Camunda8TaskHandler.java | 173 ++++++++++++------ 3 files changed, 246 insertions(+), 93 deletions(-) diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java index 4ba5a73..b5ffa45 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java @@ -25,8 +25,17 @@ public static class TaskHandlerActions { public Supplier>> handlerCompletedCommand; } + public static class RunDeferredInTransaction { + public RunDeferredInTransactionSupplier[] argsSupplier; + public Runnable saveAggregateAfterWorkflowTask; + } + + public interface RunDeferredInTransactionSupplier extends Supplier { } + public static final ThreadLocal actions = ThreadLocal.withInitial(TaskHandlerActions::new); + public static final ThreadLocal runDeferredInTransaction = ThreadLocal.withInitial(RunDeferredInTransaction::new); + private final ApplicationEventPublisher publisher; public Camunda8TransactionAspect( @@ -36,20 +45,44 @@ public Camunda8TransactionAspect( } + public static void registerDeferredInTransaction( + final RunDeferredInTransactionSupplier[] argsSupplier, + final Runnable saveAggregateAfterWorkflowTask) { + + runDeferredInTransaction.get().argsSupplier = argsSupplier; + runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = saveAggregateAfterWorkflowTask; + + } + + public static void unregisterDeferredInTransaction() { + + runDeferredInTransaction.get().argsSupplier = null; + runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = null; + + } + + private void saveWorkflowAggregate() { + + runDeferredInTransaction.get().saveAggregateAfterWorkflowTask.run(); + + } + @Around("@annotation(io.vanillabp.spi.service.WorkflowTask)") private Object checkForTransaction( final ProceedingJoinPoint pjp) throws Throwable { final var methodSignature = pjp.getSignature().toLongString(); - - if (!TransactionSynchronizationManager.isActualTransactionActive()) { - clearCallbacks(); - logger.trace("Disable TX callbacks for {}: No TX active", methodSignature); - } + + final var isTxActive = TransactionSynchronizationManager.isActualTransactionActive(); + try { - final var value = pjp.proceed(); - if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + final var newArgs = runDeferredInTransactionArgsSupplier(pjp.getArgs()); + final var value = pjp.proceed(newArgs); // run @WorkflowTask annotated method + saveWorkflowAggregate(); + + if (isTxActive + && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) { final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); if (handlerTestCommand != null) { publisher.publishEvent( @@ -62,18 +95,37 @@ private Object checkForTransaction( if (actions.get().handlerCompletedCommand != null) { final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get(); if (handlerCompletedCommand != null) { - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - methodSignature, - handlerCompletedCommand.getKey(), - handlerCompletedCommand.getValue())); + if (isTxActive) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + handlerCompletedCommand.getKey(), + handlerCompletedCommand.getValue())); + } else { + try { + handlerCompletedCommand.getKey().run(); + } catch (Exception e) { + final var description = handlerCompletedCommand.getValue(); + if (description != null) { + logger.error( + "Could not execute '{}'! Manual action required!", + description.get(), + e); + } else { + logger.error( + "Manual action required due to:", + e); + } + } + } } } return value; } catch (TaskException taskError) { - if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + if (isTxActive + && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) { final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); if (handlerTestCommand != null) { publisher.publishEvent( @@ -86,17 +138,35 @@ private Object checkForTransaction( if (actions.get().bpmnErrorCommand != null) { final var runnable = actions.get().bpmnErrorCommand.getKey(); final var description = actions.get().bpmnErrorCommand.getValue(); - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - methodSignature, - () -> runnable.accept(taskError), - () -> description.apply(taskError))); + if (isTxActive) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + () -> runnable.accept(taskError), + () -> description.apply(taskError))); + } else { + try { + runnable.accept(taskError); + } catch (Exception e) { + if (description != null) { + logger.error( + "Could not execute '{}'! Manual action required!", + description.apply(taskError), + e); + } else { + logger.error( + "Manual action required due to:", + e); + } + } + } } return null; } catch (Exception e) { - if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + if (isTxActive + && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) { final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); if (handlerTestCommand != null) { publisher.publishEvent( @@ -109,18 +179,31 @@ private Object checkForTransaction( if (actions.get().handlerFailedCommand != null) { final var runnable = actions.get().handlerFailedCommand.getKey(); final var description = actions.get().handlerFailedCommand.getValue(); - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - methodSignature, - () -> runnable.accept(e), - () -> description.apply(e))); + if (isTxActive) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + () -> runnable.accept(e), + () -> description.apply(e))); + } else { + try { + runnable.accept(e); + } catch (Exception ie) { + if (description != null) { + logger.error( + "Could not execute '{}'! Manual action required!", + description.apply(e), + ie); + } else { + logger.error( + "Manual action required due to:", + ie); + } + } + } } throw e; - } finally { - - clearCallbacks(); - } } @@ -134,4 +217,25 @@ public static void clearCallbacks() { } + private Object[] runDeferredInTransactionArgsSupplier( + final Object[] originalArgs) { + + if (originalArgs == null) { + return null; + } + + final var newArgs = new Object[ originalArgs.length ]; + for (var i = 0; i < originalArgs.length; ++i) { + final var supplier = runDeferredInTransaction.get().argsSupplier[i]; + if (supplier != null) { + newArgs[i] = supplier.get(); + } else { + newArgs[i] = originalArgs[i]; + } + } + + return newArgs; + + } + } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java index 82a6b27..cf19ec4 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java @@ -63,11 +63,7 @@ public static Map.Entry> handlerCompletedCommandCallb public static void unregisterCallbacks() { - final var actions = Camunda8TransactionAspect.actions.get(); - actions.testForTaskAlreadyCompletedOrCancelledCommand = null; - actions.bpmnErrorCommand = null; - actions.handlerFailedCommand = null; - actions.handlerCompletedCommand = null; + Camunda8TransactionAspect.clearCallbacks(); } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java index 1904f16..57f7b1c 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java @@ -4,18 +4,23 @@ import io.camunda.zeebe.client.api.response.ActivatedJob; import io.camunda.zeebe.client.api.worker.JobClient; import io.camunda.zeebe.client.api.worker.JobHandler; +import io.vanillabp.camunda8.service.Camunda8TransactionAspect; import io.vanillabp.camunda8.service.Camunda8TransactionProcessor; import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type; import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceIndexMethodParameter; import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceTotalMethodParameter; +import io.vanillabp.spi.service.MultiInstanceElementResolver; import io.vanillabp.spi.service.TaskEvent; import io.vanillabp.spi.service.TaskException; import io.vanillabp.springboot.adapter.MultiInstance; import io.vanillabp.springboot.adapter.TaskHandlerBase; import io.vanillabp.springboot.adapter.wiring.WorkflowAggregateCache; import io.vanillabp.springboot.parameters.MethodParameter; +import io.vanillabp.springboot.parameters.ResolverBasedMultiInstanceMethodParameter; +import io.vanillabp.springboot.parameters.WorkflowAggregateMethodParameter; import java.lang.reflect.Method; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -61,19 +66,17 @@ public void accept( @Override protected Logger getLogger() { - + return logger; - + } @SuppressWarnings("unchecked") @Override public void handle( final JobClient client, - final ActivatedJob job) { + final ActivatedJob job) throws Exception { - Runnable jobPostAction = null; - Supplier description = null; try { final var businessKey = getVariable(job, idPropertyName); @@ -87,6 +90,10 @@ public void handle( final var taskIdRetrieved = new AtomicBoolean(false); final var workflowAggregateCache = new WorkflowAggregateCache(); + Camunda8TransactionAspect.registerDeferredInTransaction( + new Camunda8TransactionAspect.RunDeferredInTransactionSupplier[parameters.size()], + saveAggregateAfterWorkflowTask(workflowAggregateCache)); + // Any callback used in this method is executed in case of no active transaction. // In case of an active transaction the callbacks are used by the Camunda8TransactionInterceptor. Camunda8TransactionProcessor.registerCallbacks( @@ -97,7 +104,7 @@ public void handle( if (taskIdRetrieved.get()) { // async processing of service-task return null; } - return doTestForTaskWasCompletedOrCancelled(job); + return testForTaskWasCompletedOrCancelled(job); }, doThrowError(client, job, workflowAggregateCache), doFailed(client, job), @@ -117,7 +124,7 @@ public void handle( super.execute( workflowAggregateCache, businessKey, - true, + false, // will be done within transaction boundaries (args, param) -> processTaskParameter( args, param, @@ -157,43 +164,9 @@ public void handle( return workflowAggregateCache.workflowAggregate; }, multiInstanceSupplier)); - final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback(); - if (callback != null) { - jobPostAction = callback.getKey(); - description = callback.getValue(); - } - } catch (TaskException bpmnError) { - final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback(); - if (callback != null) { - jobPostAction = () -> callback.getKey().accept(bpmnError); - description = () -> callback.getValue().apply(bpmnError); - } - } catch (Exception e) { - final var callback = Camunda8TransactionProcessor.handlerFailedCommandCallback(); - if (callback != null) { - logger.error("Failed to execute job '{}'", job.getKey(), e); - jobPostAction = () -> callback.getKey().accept(e); - description = () -> callback.getValue().apply(e); - } } finally { Camunda8TransactionProcessor.unregisterCallbacks(); - } - - if (jobPostAction != null) { - try { - jobPostAction.run(); - } catch (Exception e) { - if (description != null) { - logger.error( - "Could not execute '{}'! Manual action required!", - description.get(), - e); - } else { - logger.error( - "Manual action required due to:", - e); - } - } + Camunda8TransactionAspect.unregisterDeferredInTransaction(); } } @@ -205,53 +178,64 @@ protected Object getMultiInstanceElement( return multiInstanceSupplier .apply(name); - + } - + @Override protected Integer getMultiInstanceIndex( final String name, final Function multiInstanceSupplier) { - + return (Integer) multiInstanceSupplier .apply(name + Camunda8MultiInstanceIndexMethodParameter.SUFFIX) - 1; - + } - + @Override protected Integer getMultiInstanceTotal( final String name, final Function multiInstanceSupplier) { - + return (Integer) multiInstanceSupplier .apply(name + Camunda8MultiInstanceTotalMethodParameter.SUFFIX); - + } - + @Override protected MultiInstance getMultiInstance( final String name, final Function multiInstanceSupplier) { - + return new MultiInstance( getMultiInstanceElement(name, multiInstanceSupplier), getMultiInstanceTotal(name, multiInstanceSupplier), getMultiInstanceIndex(name, multiInstanceSupplier)); - + } - + private Object getVariable( final ActivatedJob job, final String name) { - + return job .getVariablesAsMap() .get(name); - + } - @SuppressWarnings("unchecked") - public Map.Entry> doTestForTaskWasCompletedOrCancelled( + public Runnable saveAggregateAfterWorkflowTask( + final WorkflowAggregateCache aggregateCache) { + + return () -> { + if (aggregateCache.workflowAggregate != null) { + workflowAggregateRepository.save(aggregateCache.workflowAggregate); + } + }; + + } + + @SuppressWarnings("unchecked") + public Map.Entry> testForTaskWasCompletedOrCancelled( final ActivatedJob job) { return Map.entry( @@ -259,7 +243,8 @@ public Map.Entry> doTestForTaskWasCompletedOrCancelle .newUpdateTimeoutCommand(job) .timeout(Duration.ofMinutes(10)) .send() - .join(5, TimeUnit.MINUTES), // needs to run synchronously + .join(5, TimeUnit.MINUTES) + , // needs to run synchronously () -> "update timeout (BPMN: " + job.getBpmnProcessId() + "; Element: " + job.getElementId() + "; Task-Definition: " + job.getType() @@ -317,7 +302,9 @@ private Map.Entry, Function> doTh throwErrorCommand .send() - .exceptionally(t -> { throw new RuntimeException("error", t); }); + .exceptionally(t -> { + throw new RuntimeException("error", t); + }); }, taskException -> "throw error command (BPMN: " + job.getBpmnProcessId() + "; Element: " + job.getElementId() @@ -326,7 +313,7 @@ private Map.Entry, Function> doTh + "; Job: " + job.getKey() + ")"); } - + @SuppressWarnings("unchecked") private Map.Entry, Function> doFailed( final JobClient jobClient, @@ -352,4 +339,70 @@ private Map.Entry, Function> doFailed( } + protected boolean processWorkflowAggregateParameter( + final Object[] args, + final MethodParameter param, + final WorkflowAggregateCache workflowAggregateCache, + final Object workflowAggregateId) { + + if (!(param instanceof WorkflowAggregateMethodParameter)) { + return true; + } + + Camunda8TransactionAspect.runDeferredInTransaction.get().argsSupplier[param.getIndex()] = () -> { + // Using findById is required to get an object instead of a Hibernate proxy. + // Otherwise for e.g. Camunda8 connector JSON serialization of the + // workflow aggregate is not possible. + workflowAggregateCache.workflowAggregate = workflowAggregateRepository + .findById(workflowAggregateId) + .orElse(null); + return workflowAggregateCache.workflowAggregate; + }; + + args[param.getIndex()] = null; // will be set by deferred execution of supplier + + return false; + + } + + protected boolean processMultiInstanceResolverParameter( + final Object[] args, + final MethodParameter param, + final Supplier workflowAggregate, + final Function multiInstanceSupplier) { + + if (!(param instanceof ResolverBasedMultiInstanceMethodParameter)) { + return true; + } + + @SuppressWarnings("unchecked") + final var resolver = + (MultiInstanceElementResolver) + ((ResolverBasedMultiInstanceMethodParameter) param).getResolverBean(); + + final var multiInstances = new HashMap>(); + + resolver + .getNames() + .forEach(name -> multiInstances.put(name, getMultiInstance(name, multiInstanceSupplier))); + + Camunda8TransactionAspect.runDeferredInTransaction.get().argsSupplier[param.getIndex()] = () -> { + try { + return resolver.resolve(workflowAggregate.get(), multiInstances); + } catch (Exception e) { + throw new RuntimeException( + "Failed processing MultiInstanceElementResolver for parameter '" + + param.getParameter() + + "' of method '" + + method + + "'", e); + } + }; + + args[param.getIndex()] = null; // will be set by deferred execution of supplier + + return false; + + } + } From cba5dea40473ee0f087c9e98d32b16361156024a Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Wed, 19 Jun 2024 14:00:30 +0200 Subject: [PATCH 14/15] Try to run C8 aspect within TX aspect --- .../io/vanillabp/camunda8/Camunda8AdapterConfiguration.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java index 033ed65..c947d8d 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8AdapterConfiguration.java @@ -35,6 +35,8 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; import org.springframework.data.repository.CrudRepository; @AutoConfigurationPackage(basePackageClasses = Camunda8AdapterConfiguration.class) @@ -79,6 +81,7 @@ public void init() { } @Bean + @Order(Ordered.LOWEST_PRECEDENCE) public Camunda8TransactionAspect camunda8TransactionAspect() { return new Camunda8TransactionAspect(eventPublisher); From 195d677965fb799f55fb54a6d345882a67332380 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Thu, 20 Jun 2024 09:04:06 +0200 Subject: [PATCH 15/15] Enforce rollback in case of testing for task fails --- .../service/Camunda8TransactionProcessor.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java index cf19ec4..56b2321 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java @@ -12,8 +12,6 @@ import org.springframework.context.ApplicationEvent; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; -import org.springframework.transaction.interceptor.TransactionAspectSupport; -import org.springframework.transaction.support.TransactionSynchronizationManager; public class Camunda8TransactionProcessor { @@ -109,17 +107,13 @@ public void processPreCommit( // if the task is completed or cancelled, then the tx is rolled back if ((e instanceof ClientStatusException clientStatusException) && (clientStatusException.getStatus().getCode() == Status.NOT_FOUND.getCode())) { - logger.warn( - "Will rollback because job was already completed/cancelled! Tested with command '{}‘ giving status 'NOT_FOUND'", - event.description.get()); + throw new RuntimeException( + "Will rollback because job was already completed/cancelled! Test-command giving status 'NOT_FOUND':\n" + + event.description.get()); } else { - logger.warn( - "Will rollback because testing for job '{}' failed!", - event.description.get(), - e); - } - if (TransactionSynchronizationManager.isActualTransactionActive()) { - TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + throw new RuntimeException( + "Will rollback because testing for job '{}' failed! Test-command:\n" + + event.description.get()); } }