Skip to content

Commit

Permalink
[feature][adapter]: Use execution listener (#280)
Browse files Browse the repository at this point in the history
* Camunda Summit 2023: CamundaPlatform7DelegationWorker extended with support for executionListener.start and  executionListener.end header fields, which will trigger calls to the specified beans, use to simulate the execution listener support of C7.

* prepare worker example

* prepare worker example

* Camunda Summit 2023: Repositioned the RuntimeException and extended its trigger to exclude situations where only one or both listeners are defined.

* working, reducer of result needs fixing

* finalize and provide tests

* run spotless

* implement required changes

* Remove the cast on the variableStore

* listeners do not count as implementations

* append docs

---------

Co-authored-by: Martin Karsten <[email protected]>
Co-authored-by: Jonathan Lukas <[email protected]>
  • Loading branch information
3 people authored Jul 10, 2023
1 parent f79df84 commit cd3da69
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 69 deletions.
19 changes: 19 additions & 0 deletions camunda-7-adapter/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ expression, e.g.:
</bpmn:serviceTask>
```

If you are using an expression, you might want to write the result to a process variable. This can be achieved by setting an additional `resultVariable` header:

```xml

<bpmn:serviceTask id="task3" name="Expression">
<bpmn:extensionElements>
<zeebe:taskDefinition type="camunda-7-adapter"/>
<zeebe:taskHeaders>
<zeebe:header key="expression" value="${someBean.awesomeMethod(execution, someVar)}"/>
<zeebe:header key="resultVariable" value="awesomeMethodResult"/>
</zeebe:taskHeaders>
</bpmn:extensionElements>
</bpmn:serviceTask>
Expand All @@ -102,6 +105,22 @@ The external task workers can be mapped by using the `taskType` and insert the
</bpmn:serviceTask>
```

## Using Execution listener

To use execution listeners, use the header `executionListener.start` or `executionListener.end`. The value should be of type `delegateExpression` which returns a bean of type `ExecutionListener`:

```xml

<bpmn:serviceTask id="task3" name="Expression">
<bpmn:extensionElements>
<zeebe:taskHeaders>
<zeebe:header key="executionListener.start" value="${startExecutionListenerBean}"/>
<zeebe:header key="executionListener.end" value="${endExecutionListenerBean}"/>
</zeebe:taskHeaders>
</bpmn:extensionElements>
</bpmn:serviceTask>
```

## Example

Check out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class CamundaPlatform7AdapterConfig {
public ExternalTaskWorkerRegistration workerRegistration() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setBaseUrl("http://localhost");

return new ExternalTaskWorkerRegistration(configuration);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.camunda.community.migration.adapter.execution;

import java.util.Collections;
import java.util.Map;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.ProcessEngineServices;
Expand Down Expand Up @@ -27,10 +28,10 @@ public abstract class AbstractDelegateExecution extends SimpleVariableScope
protected RepositoryService repositoryService;

public AbstractDelegateExecution() {
super();
this(Collections.emptyMap());
}

public AbstractDelegateExecution(Map<String, ? extends Object> variables) {
public AbstractDelegateExecution(Map<String, Object> variables) {
super(variables);
}

Expand All @@ -41,7 +42,7 @@ public String getCurrentActivityName() {

@Override
public FlowElement getBpmnModelElementInstance() {
return getBpmnModelInstance().getModelElementById(getCurrentActivityId());
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,38 @@
import java.util.List;
import java.util.Map;
import org.camunda.bpm.engine.impl.core.variable.CoreVariableInstance;
import org.camunda.bpm.engine.impl.core.variable.scope.AbstractVariableScope;
import org.camunda.bpm.engine.impl.core.variable.scope.SimpleVariableInstance;
import org.camunda.bpm.engine.impl.core.variable.scope.SimpleVariableInstance.SimpleVariableInstanceFactory;
import org.camunda.bpm.engine.impl.core.variable.scope.VariableInstanceFactory;
import org.camunda.bpm.engine.impl.core.variable.scope.VariableInstanceLifecycleListener;
import org.camunda.bpm.engine.impl.core.variable.scope.VariableStore;
import org.camunda.bpm.engine.impl.core.variable.scope.*;

/**
* Simple VariableScope implementation that can be initialized with a Map and provides all variable
* methods required for implementing a DelegateExecution.
*
* @author Falko Menge (Camunda)
*/
public abstract class SimpleVariableScope extends AbstractVariableScope {
public class SimpleVariableScope extends AbstractVariableScope {

private static final long serialVersionUID = 1L;

protected VariableStore<SimpleVariableInstance> variableStore =
new VariableStore<SimpleVariableInstance>();
protected VariableInstanceFactory<CoreVariableInstance> variableInstanceFactory =
(name, value, isTransient) -> new SimpleVariableInstance(name, value);
protected VariableStore<CoreVariableInstance> variableStore = new VariableStore<>();

public SimpleVariableScope() {
super();
this(Collections.emptyMap());
}

public SimpleVariableScope(Map<String, ? extends Object> variables) {
public SimpleVariableScope(Map<String, ?> variables) {
super();
setVariables(variables);
}

protected VariableStore<CoreVariableInstance> getVariableStore() {
return (VariableStore) variableStore;
return variableStore;
}

@Override
protected VariableInstanceFactory<CoreVariableInstance> getVariableInstanceFactory() {
return (VariableInstanceFactory) SimpleVariableInstanceFactory.INSTANCE;
return variableInstanceFactory;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.camunda.community.migration.adapter.execution;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import java.util.HashMap;

/**
* DelegateExecution implementation that can be initialized with an {@link ActivatedJob} and
Expand All @@ -12,10 +13,10 @@ public class ZeebeJobDelegateExecution extends AbstractDelegateExecution {

private static final long serialVersionUID = 1L;

private ActivatedJob job;
private final ActivatedJob job;

public ZeebeJobDelegateExecution(ActivatedJob job) {
super(job.getVariablesAsMap());
super(new HashMap<>(job.getVariablesAsMap()));
this.job = job;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.camunda.community.migration.adapter.juel;

import org.camunda.bpm.engine.ArtifactFactory;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.springframework.stereotype.Component;

/**
* This wraps the access to {@link ClassLoader} and {@link ArtifactFactory} for loading of {@link
* org.camunda.bpm.engine.delegate.ExecutionListener} and {@link
* org.camunda.bpm.engine.delegate.JavaDelegate} by FQN String.
*/
@Component
public class ClassResolver {

private final ArtifactFactory artifactFactory;

public ClassResolver(ArtifactFactory artifactFactory) {
this.artifactFactory = artifactFactory;
}

public JavaDelegate loadJavaDelegate(String delegateName) {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Class<? extends JavaDelegate> clazz =
(Class<? extends JavaDelegate>) contextClassLoader.loadClass(delegateName);
return artifactFactory.getArtifact(clazz);
} catch (Exception e) {
throw new RuntimeException(
"Could not load delegation class '" + delegateName + "': " + e.getMessage(), e);
}
}

public ExecutionListener loadExecutionListener(String listenerName) {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Class<? extends ExecutionListener> clazz =
(Class<? extends ExecutionListener>) contextClassLoader.loadClass(listenerName);
return artifactFactory.getArtifact(clazz);
} catch (Exception e) {
throw new RuntimeException(
"Could not load listener class '" + listenerName + "': " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.camunda.community.migration.adapter.juel;

import org.camunda.bpm.engine.ProcessEngineException;
import org.camunda.bpm.engine.delegate.BaseDelegateExecution;
import org.camunda.bpm.engine.delegate.VariableScope;
import org.camunda.bpm.engine.impl.el.JuelExpression;
import org.camunda.bpm.engine.impl.el.JuelExpressionManager;
Expand All @@ -21,8 +20,7 @@ public EnginelessJuelExpression(
}

@Override
public Object getValue(VariableScope variableScope, BaseDelegateExecution contextExecution) {
variableScope.setVariable("execution", contextExecution);
public Object getValue(VariableScope variableScope) {
ELContext elContext = expressionManager.getElContext(variableScope);
try {
return valueExpression.getValue(elContext);
Expand All @@ -47,8 +45,6 @@ public Object getValue(VariableScope variableScope, BaseDelegateExecution contex
} catch (Exception e) {
throw new ProcessEngineException(
"Error while evaluating expression: " + expressionText + ". Cause: " + e.getMessage(), e);
} finally {
variableScope.removeVariable("execution");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.camunda.bpm.engine.impl.javax.el.ELContext;
import org.camunda.bpm.engine.impl.javax.el.ExpressionFactory;
import org.camunda.bpm.engine.impl.javax.el.ValueExpression;
import org.camunda.community.migration.adapter.execution.SimpleVariableScope;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -24,11 +25,15 @@ public JuelExpressionResolver(
this.expressionFactory = expressionFactory;
}

public Object evaluate(
String expressionString, VariableScope variableScope, DelegateExecution execution) {
public Object evaluate(String expressionString, DelegateExecution execution) {
ValueExpression valueExpression =
expressionFactory.createValueExpression(elContext, expressionString, Object.class);

// required because (in C7) we can use juel like `${execution.xxx()}`
VariableScope variableScope = new SimpleVariableScope(execution.getVariables());
variableScope.setVariable("execution", execution);

return new EnginelessJuelExpression(valueExpression, expressionManager, expressionString)
.getValue(variableScope, execution);
.getValue(variableScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@
import io.camunda.zeebe.client.api.worker.JobClient;
import io.camunda.zeebe.spring.client.annotation.JobWorker;
import io.camunda.zeebe.spring.client.exception.ZeebeBpmnError;
import java.util.HashMap;
import java.util.Map;
import org.camunda.bpm.engine.ArtifactFactory;
import org.camunda.bpm.engine.delegate.*;
import org.camunda.bpm.engine.delegate.BpmnError;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.camunda.bpm.engine.delegate.VariableScope;
import org.camunda.community.migration.adapter.execution.ZeebeJobDelegateExecution;
import org.camunda.community.migration.adapter.juel.ClassResolver;
import org.camunda.community.migration.adapter.juel.JuelExpressionResolver;
import org.springframework.stereotype.Component;

@Component
public class CamundaPlatform7DelegationWorker {

private final JuelExpressionResolver expressionResolver;
private final ArtifactFactory artifactFactory;
private final ClassResolver classResolver;

public CamundaPlatform7DelegationWorker(
JuelExpressionResolver expressionResolver, ArtifactFactory artifactFactory) {
JuelExpressionResolver expressionResolver, ClassResolver classResolver) {
this.expressionResolver = expressionResolver;
this.artifactFactory = artifactFactory;
this.classResolver = classResolver;
}

@JobWorker(type = "camunda-7-adapter", autoComplete = false)
Expand All @@ -36,58 +35,52 @@ public void delegateToCamundaPlatformCode(final JobClient client, final Activate
String delegateExpression = job.getCustomHeaders().get("delegateExpression");
String expression = job.getCustomHeaders().get("expression");
String resultVariable = job.getCustomHeaders().get("resultVariable");

String startListener = job.getCustomHeaders().get("executionListener.start");
String endListener = job.getCustomHeaders().get("executionListener.end");
// and delegate depending on exact way of implementation
Map<String, Object> resultPayload = null;
DelegateExecution execution = wrapDelegateExecution(job);
// this is required as we add the execution to the variables scope for expression evaluation
VariableScope variableScope = wrapDelegateExecution(job);

final DelegateExecution execution = new ZeebeJobDelegateExecution(job);

try {
if (delegateClass == null && delegateExpression == null && expression == null) {
throw new RuntimeException(
"Either 'class' or 'delegateExpression' or 'expression' must be specified in task headers for job :"
+ job);
}

if (startListener != null) {
ExecutionListener executionListener =
(ExecutionListener) expressionResolver.evaluate(startListener, execution);

executionListener.notify(execution);
}

if (delegateClass != null) {
JavaDelegate javaDelegate = loadJavaDelegate(delegateClass);
JavaDelegate javaDelegate = classResolver.loadJavaDelegate(delegateClass);
javaDelegate.execute(execution);
resultPayload = execution.getVariables();
} else if (delegateExpression != null) {
JavaDelegate javaDelegate =
(JavaDelegate)
expressionResolver.evaluate(delegateExpression, variableScope, execution);
(JavaDelegate) expressionResolver.evaluate(delegateExpression, execution);
javaDelegate.execute(execution);
resultPayload = execution.getVariables();
} else if (expression != null) {
Object result = expressionResolver.evaluate(expression, variableScope, execution);
Object result = expressionResolver.evaluate(expression, execution);

if (resultVariable != null) {
resultPayload = new HashMap<>();
resultPayload.put(resultVariable, result);
execution.setVariable(resultVariable, result);
}
} else {
throw new RuntimeException(
"Either 'class' or 'delegateExpression' or 'expression' must be specified in task headers for job :"
+ job);
}

CompleteJobCommandStep1 completeCommand = client.newCompleteCommand(job.getKey());
if (resultPayload != null) {
completeCommand.variables(resultPayload);
if (endListener != null) {
ExecutionListener executionListener =
(ExecutionListener) expressionResolver.evaluate(endListener, execution);
executionListener.notify(execution);
}

CompleteJobCommandStep1 completeCommand = client.newCompleteCommand(job.getKey());
completeCommand.variables(execution.getVariables());
completeCommand.send().join();
} catch (BpmnError e) {
throw new ZeebeBpmnError(e.getErrorCode(), e.getMessage() == null ? "" : e.getMessage());
}
}

private DelegateExecution wrapDelegateExecution(ActivatedJob job) {
return new ZeebeJobDelegateExecution(job);
}

private JavaDelegate loadJavaDelegate(String delegateName) {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Class<? extends JavaDelegate> clazz =
(Class<? extends JavaDelegate>) contextClassLoader.loadClass(delegateName);
return artifactFactory.getArtifact(clazz);
} catch (Exception e) {
throw new RuntimeException(
"Could not load delegation class '" + delegateName + "': " + e.getMessage(), e);
}
}
}
Loading

0 comments on commit cd3da69

Please sign in to comment.