Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrated example #315

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions custom-tasklist-exporter-based/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Custom Tasklist for User Tasks (Camunda 7 and 8)

## Components

![Architecture](./documentation/architecture/architecture-overview.drawio.png)

### C8
#### Exporter
[Exporter Module](./c8-exporter/)
Exporting User Tasks and Deployment Events to Kafka.
#### Job Worker
[Job Worker Module](./c8-jobworker/)
Job Worker for topic "io.camunda.zeebe:userTask".
Includes Kafka Listeners to complete UserTasks, once message is received.

### C7
#### Engine Plugin
[C7 Exporter](./c7-exporter/)

### Tasklist
#### Backend
[Tasklist Module](./tasklist/)
Spring Boot Application with GraphQL API

#### Frontend
[Frontend](./tasklist/client/)
Written in React


## How to run this project
``
mvn clean install
``

``
docker compose up
``

Exporter and Engine Plugin jars need to be mounted to the respective Engine.
For the Tasklist & Tasklist Worker Docker Images are available:
- [Tasklist](https://hub.docker.com/repository/docker/camundomanu/tasklist/general)
- [Tasklist Worker](https://hub.docker.com/repository/docker/camundomanu/tasklist-worker/general)

The Docker Compose File also includes Components that are not required, but helped me a lot during development.
- mongo-express
- provectuslabs/kafka-ui
152 changes: 152 additions & 0 deletions custom-tasklist-exporter-based/c7-exporter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.camunda.consulting</groupId>
<artifactId>tasklist-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<groupId>com.camunda.consulting</groupId>
<artifactId>c7-exporter</artifactId>

<name>c7-exporter</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>

<dependency>
<groupId>com.camunda.consulting</groupId>
<artifactId>model</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

<!-- process engine, needs to be provided -->
<groupId>org.camunda.bpm</groupId>
<artifactId>camunda-engine</artifactId>
<version>${c7.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter</artifactId>
<version>${c7.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.10</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.1</version>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.camunda.bpm</groupId>
<artifactId>camunda-bpm-junit5</artifactId>
<version>7.17.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.8.10</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.9.2</version>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.springframework.kafka:spring-kafka</include>
<include>org.springframework:spring-messaging</include>
<include>org.apache.kafka:kafka-clients</include>
<include>org.springframework.retry:spring-retry</include>
<include>com.camunda.consulting:model</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.camunda.bpm.run;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.TaskService;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.cfg.ProcessEnginePlugin;
import org.camunda.bpm.engine.impl.persistence.deploy.Deployer;
import org.camunda.bpm.run.consumer.ActivityHandler;
import org.camunda.bpm.run.listener.BpmnDeployListener;
import org.camunda.bpm.run.producer.UserTaskProducer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

public class EventExporter extends ExporterConfiguration implements ProcessEnginePlugin {

private static KafkaConsumer consumer;

private static UserTaskProducer userTaskProducer;


@Override
public void preInit(ProcessEngineConfigurationImpl processEngineConfiguration) {
List<Deployer> customPostDeployers = processEngineConfiguration.getCustomPostDeployers();

if (customPostDeployers == null) {
customPostDeployers = new ArrayList<>();
}

customPostDeployers.add(new BpmnDeployListener());
processEngineConfiguration.setCustomPostDeployers(customPostDeployers);

createConsumer();
createProducer();
}

@Override
public void postInit(ProcessEngineConfigurationImpl processEngineConfiguration) {

}

@Override
public void postProcessEngineBuild(ProcessEngine processEngine) {
TaskService taskService = processEngine.getTaskService();
ActivityHandler activityHandler = new ActivityHandler(taskService, consumer, this.sourceId);
Thread thread = new Thread(activityHandler);
thread.start();

}

private void createConsumer() {
Properties config = new Properties();
config.put("bootstrap.servers", bootstrapServer);
config.put("client.id", clientId);
config.put("group.id", groupId);
config.put("key.deserializer", consumer_key_deserializer);
config.put("value.deserializer", consumer_value_deserializer);
config.put("spring.json.use.type.headers", useTypeHeaders);
config.put("spring.json.value.default.type", defaultType);
consumer = new KafkaConsumer<String, Object>(config);
}

private void createProducer() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer_key_serializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer_value_serializer);
KafkaTemplate template = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
userTaskProducer = new UserTaskProducer(sourceId, userTaskTopic, deployTopic, template);
}

public static UserTaskProducer getUserTaskProducer() {
return userTaskProducer;
}

public static KafkaConsumer getConsumer() {
return consumer;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.camunda.bpm.run;


import lombok.Data;

@Data
public class ExporterConfiguration {

protected String bootstrapServer = "";
protected String userTaskTopic = "usertask-info";
protected String deployTopic = "deploy-info";
protected String sourceId = "";
protected String clientId = "";
protected String groupId = "";
protected String producer_value_serializer = "org.springframework.kafka.support.serializer.JsonSerializer";
protected String producer_key_serializer = "org.apache.kafka.common.serialization.StringSerializer";
protected String consumer_value_deserializer = "org.springframework.kafka.support.serializer.JsonDeserializer";
protected String consumer_key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
protected boolean useTypeHeaders = false;
protected String defaultType = "com.camunda.consulting.impl.CompletedTaskMessage";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.camunda.bpm.run.consumer;

import static java.util.Collections.singletonList;

import com.camunda.consulting.BpmnErrorAction;
import com.camunda.consulting.CompleteAction;
import com.camunda.consulting.MessageAction;
import com.camunda.consulting.TaskHandler;
import com.camunda.consulting.impl.CompletedTaskMessage;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.camunda.bpm.engine.TaskService;

public class ActivityHandler implements TaskHandler, Runnable {

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final TaskService taskService;

private final KafkaConsumer consumer;
private final String sourceId;

private final Logger LOGGER = Logger.getLogger(ActivityHandler.class.getName());

public ActivityHandler(TaskService taskService, KafkaConsumer consumer, String sourceId) {
this.taskService = taskService;
this.consumer = consumer;
this.sourceId = sourceId;
}

@Override
public void run() {
try {
this.consumer.subscribe(singletonList(this.sourceId));
while (!shutdown.get()) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
CompletedTaskMessage message = (CompletedTaskMessage) record.value();
completeTask(message);
});
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}

@Override
public void completeTask(CompleteAction completeAction) {
try {
taskService.complete(completeAction.getId(), completeAction.getVariables());
} catch (Exception e) {
LOGGER.severe("Error completing user task: " + completeAction.getId() + " " + e.getMessage());
e.printStackTrace();
}
}

@Override
public void throwError(BpmnErrorAction bpmnErrorAction) {

}

@Override
public void correlateMessage(MessageAction messageAction) {

}
}
Loading