Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][adapter]: Use execution listener #280

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ target
*.factorypath
.DS_Store
dist

test-example-worker/src/main/resources/application.yml
jangalinski marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class CamundaPlatform7AdapterConfig {
public ExternalTaskWorkerRegistration workerRegistration() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setBaseUrl("http://localhost");

return new ExternalTaskWorkerRegistration(configuration);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.camunda.community.migration.adapter.execution;

import java.util.Collections;
import java.util.Map;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.ProcessEngineServices;
Expand Down Expand Up @@ -27,10 +28,10 @@ public abstract class AbstractDelegateExecution extends SimpleVariableScope
protected RepositoryService repositoryService;

public AbstractDelegateExecution() {
super();
this(Collections.emptyMap());
}

public AbstractDelegateExecution(Map<String, ? extends Object> variables) {
public AbstractDelegateExecution(Map<String, Object> variables) {
super(variables);
}

Expand All @@ -41,7 +42,7 @@ public String getCurrentActivityName() {

@Override
public FlowElement getBpmnModelElementInstance() {
return getBpmnModelInstance().getModelElementById(getCurrentActivityId());
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,28 @@
import java.util.List;
import java.util.Map;
import org.camunda.bpm.engine.impl.core.variable.CoreVariableInstance;
import org.camunda.bpm.engine.impl.core.variable.scope.AbstractVariableScope;
import org.camunda.bpm.engine.impl.core.variable.scope.SimpleVariableInstance;
import org.camunda.bpm.engine.impl.core.variable.scope.SimpleVariableInstance.SimpleVariableInstanceFactory;
import org.camunda.bpm.engine.impl.core.variable.scope.VariableInstanceFactory;
import org.camunda.bpm.engine.impl.core.variable.scope.VariableInstanceLifecycleListener;
import org.camunda.bpm.engine.impl.core.variable.scope.VariableStore;
import org.camunda.bpm.engine.impl.core.variable.scope.*;

/**
* Simple VariableScope implementation that can be initialized with a Map and provides all variable
* methods required for implementing a DelegateExecution.
*
* @author Falko Menge (Camunda)
*/
public abstract class SimpleVariableScope extends AbstractVariableScope {
public class SimpleVariableScope extends AbstractVariableScope {

private static final long serialVersionUID = 1L;

protected VariableInstanceFactory<CoreVariableInstance> variableInstanceFactory =
jonathanlukas marked this conversation as resolved.
Show resolved Hide resolved
(name, value, isTransient) -> new SimpleVariableInstance(name, value);
protected VariableStore<SimpleVariableInstance> variableStore =
new VariableStore<SimpleVariableInstance>();

public SimpleVariableScope() {
super();
this(Collections.emptyMap());
}

public SimpleVariableScope(Map<String, ? extends Object> variables) {
public SimpleVariableScope(Map<String, ?> variables) {
super();
setVariables(variables);
}
Expand All @@ -39,7 +36,7 @@ protected VariableStore<CoreVariableInstance> getVariableStore() {

@Override
protected VariableInstanceFactory<CoreVariableInstance> getVariableInstanceFactory() {
return (VariableInstanceFactory) SimpleVariableInstanceFactory.INSTANCE;
return variableInstanceFactory;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.camunda.community.migration.adapter.execution;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import java.util.HashMap;

/**
* DelegateExecution implementation that can be initialized with an {@link ActivatedJob} and
Expand All @@ -12,10 +13,10 @@ public class ZeebeJobDelegateExecution extends AbstractDelegateExecution {

private static final long serialVersionUID = 1L;

private ActivatedJob job;
private final ActivatedJob job;

public ZeebeJobDelegateExecution(ActivatedJob job) {
super(job.getVariablesAsMap());
super(new HashMap<>(job.getVariablesAsMap()));
jonathanlukas marked this conversation as resolved.
Show resolved Hide resolved
this.job = job;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.camunda.community.migration.adapter.juel;

import org.camunda.bpm.engine.ArtifactFactory;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.springframework.stereotype.Component;

/**
* This wraps the access to {@link ClassLoader} and {@link ArtifactFactory} for loading of {@link
* org.camunda.bpm.engine.delegate.ExecutionListener} and {@link
* org.camunda.bpm.engine.delegate.JavaDelegate} by FQN String.
*/
@Component
public class ClassResolver {

private final ArtifactFactory artifactFactory;

public ClassResolver(ArtifactFactory artifactFactory) {
this.artifactFactory = artifactFactory;
}

public JavaDelegate loadJavaDelegate(String delegateName) {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Class<? extends JavaDelegate> clazz =
(Class<? extends JavaDelegate>) contextClassLoader.loadClass(delegateName);
return artifactFactory.getArtifact(clazz);
} catch (Exception e) {
throw new RuntimeException(
"Could not load delegation class '" + delegateName + "': " + e.getMessage(), e);
}
}

public ExecutionListener loadExecutionListener(String listenerName) {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Class<? extends ExecutionListener> clazz =
(Class<? extends ExecutionListener>) contextClassLoader.loadClass(listenerName);
return artifactFactory.getArtifact(clazz);
} catch (Exception e) {
throw new RuntimeException(
"Could not load listener class '" + listenerName + "': " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.camunda.community.migration.adapter.juel;

import org.camunda.bpm.engine.ProcessEngineException;
import org.camunda.bpm.engine.delegate.BaseDelegateExecution;
import org.camunda.bpm.engine.delegate.VariableScope;
import org.camunda.bpm.engine.impl.el.JuelExpression;
import org.camunda.bpm.engine.impl.el.JuelExpressionManager;
Expand All @@ -21,8 +20,7 @@ public EnginelessJuelExpression(
}

@Override
public Object getValue(VariableScope variableScope, BaseDelegateExecution contextExecution) {
variableScope.setVariable("execution", contextExecution);
public Object getValue(VariableScope variableScope) {
ELContext elContext = expressionManager.getElContext(variableScope);
try {
return valueExpression.getValue(elContext);
Expand All @@ -47,8 +45,6 @@ public Object getValue(VariableScope variableScope, BaseDelegateExecution contex
} catch (Exception e) {
throw new ProcessEngineException(
"Error while evaluating expression: " + expressionText + ". Cause: " + e.getMessage(), e);
} finally {
variableScope.removeVariable("execution");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.camunda.bpm.engine.impl.javax.el.ELContext;
import org.camunda.bpm.engine.impl.javax.el.ExpressionFactory;
import org.camunda.bpm.engine.impl.javax.el.ValueExpression;
import org.camunda.community.migration.adapter.execution.SimpleVariableScope;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -24,11 +25,15 @@ public JuelExpressionResolver(
this.expressionFactory = expressionFactory;
}

public Object evaluate(
String expressionString, VariableScope variableScope, DelegateExecution execution) {
public Object evaluate(String expressionString, DelegateExecution execution) {
ValueExpression valueExpression =
expressionFactory.createValueExpression(elContext, expressionString, Object.class);

// required because (in C7) we can use juel like `${execution.xxx()}`
VariableScope variableScope = new SimpleVariableScope(execution.getVariables());
variableScope.setVariable("execution", execution);

return new EnginelessJuelExpression(valueExpression, expressionManager, expressionString)
.getValue(variableScope, execution);
.getValue(variableScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@
import io.camunda.zeebe.client.api.worker.JobClient;
import io.camunda.zeebe.spring.client.annotation.JobWorker;
import io.camunda.zeebe.spring.client.exception.ZeebeBpmnError;
import java.util.HashMap;
import java.util.Map;
import org.camunda.bpm.engine.ArtifactFactory;
import org.camunda.bpm.engine.delegate.*;
import org.camunda.bpm.engine.delegate.BpmnError;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.camunda.bpm.engine.delegate.VariableScope;
import org.camunda.community.migration.adapter.execution.ZeebeJobDelegateExecution;
import org.camunda.community.migration.adapter.juel.ClassResolver;
import org.camunda.community.migration.adapter.juel.JuelExpressionResolver;
import org.springframework.stereotype.Component;

@Component
public class CamundaPlatform7DelegationWorker {

private final JuelExpressionResolver expressionResolver;
private final ArtifactFactory artifactFactory;
private final ClassResolver classResolver;

public CamundaPlatform7DelegationWorker(
JuelExpressionResolver expressionResolver, ArtifactFactory artifactFactory) {
JuelExpressionResolver expressionResolver, ClassResolver classResolver) {
this.expressionResolver = expressionResolver;
this.artifactFactory = artifactFactory;
this.classResolver = classResolver;
}

@JobWorker(type = "camunda-7-adapter", autoComplete = false)
Expand All @@ -36,58 +35,57 @@ public void delegateToCamundaPlatformCode(final JobClient client, final Activate
String delegateExpression = job.getCustomHeaders().get("delegateExpression");
String expression = job.getCustomHeaders().get("expression");
String resultVariable = job.getCustomHeaders().get("resultVariable");

String startListener = job.getCustomHeaders().get("executionListener.start");
String endListener = job.getCustomHeaders().get("executionListener.end");
// and delegate depending on exact way of implementation
Map<String, Object> resultPayload = null;
DelegateExecution execution = wrapDelegateExecution(job);
// this is required as we add the execution to the variables scope for expression evaluation
VariableScope variableScope = wrapDelegateExecution(job);

final DelegateExecution execution = new ZeebeJobDelegateExecution(job);

try {
if (startListener == null
&& endListener == null
&& delegateClass == null
&& delegateExpression == null
&& expression == null) {
throw new RuntimeException(
"Either 'executionListener.start' or 'class' or 'delegateExpression' or 'expression' or executionListener.end must be specified in task headers for job :"
+ job);
}

if (startListener != null) {
ExecutionListener executionListener =
(ExecutionListener) expressionResolver.evaluate(startListener, execution);

// modifies variables of execution
jangalinski marked this conversation as resolved.
Show resolved Hide resolved
executionListener.notify(execution);
}

if (delegateClass != null) {
JavaDelegate javaDelegate = loadJavaDelegate(delegateClass);
JavaDelegate javaDelegate = classResolver.loadJavaDelegate(delegateClass);
javaDelegate.execute(execution);
resultPayload = execution.getVariables();
} else if (delegateExpression != null) {
JavaDelegate javaDelegate =
(JavaDelegate)
expressionResolver.evaluate(delegateExpression, variableScope, execution);
(JavaDelegate) expressionResolver.evaluate(delegateExpression, execution);
javaDelegate.execute(execution);
resultPayload = execution.getVariables();
} else if (expression != null) {
Object result = expressionResolver.evaluate(expression, variableScope, execution);
Object result = expressionResolver.evaluate(expression, execution);

if (resultVariable != null) {
resultPayload = new HashMap<>();
resultPayload.put(resultVariable, result);
execution.setVariable(resultVariable, result);
}
} else {
throw new RuntimeException(
"Either 'class' or 'delegateExpression' or 'expression' must be specified in task headers for job :"
+ job);
}

CompleteJobCommandStep1 completeCommand = client.newCompleteCommand(job.getKey());
if (resultPayload != null) {
completeCommand.variables(resultPayload);
if (endListener != null) {
ExecutionListener executionListener =
(ExecutionListener) expressionResolver.evaluate(endListener, execution);
executionListener.notify(execution);
}

CompleteJobCommandStep1 completeCommand = client.newCompleteCommand(job.getKey());
completeCommand.variables(execution.getVariables());
completeCommand.send().join();
} catch (BpmnError e) {
throw new ZeebeBpmnError(e.getErrorCode(), e.getMessage() == null ? "" : e.getMessage());
}
}

private DelegateExecution wrapDelegateExecution(ActivatedJob job) {
return new ZeebeJobDelegateExecution(job);
}

private JavaDelegate loadJavaDelegate(String delegateName) {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Class<? extends JavaDelegate> clazz =
(Class<? extends JavaDelegate>) contextClassLoader.loadClass(delegateName);
return artifactFactory.getArtifact(clazz);
} catch (Exception e) {
throw new RuntimeException(
"Could not load delegation class '" + delegateName + "': " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.camunda.community.migration.adapter.execution;
jangalinski marked this conversation as resolved.
Show resolved Hide resolved

import io.camunda.zeebe.client.api.response.ActivatedJob;
import org.mockito.Mockito;

class ZeebeJobDelegateExecutionTest {

private final ActivatedJob job = Mockito.mock(ActivatedJob.class);
}
Loading