diff --git a/c8-1-instance-migration/README.md b/c8-1-instance-migration/README.md new file mode 100644 index 00000000..fbd99f6e --- /dev/null +++ b/c8-1-instance-migration/README.md @@ -0,0 +1,34 @@ +# Instance Migration Workaround for Camunda 8 + +Currently, Camunda 8 does not support instance migration. +This prototype shows how we can use the Zeebe and Operate API to move process instances from one model version to another: +1. The process instances must have entered a wait state, such as a user task or a service task for which the external task worker has been deactivated. +2. For each instance of the old version, an instance in the new version is created. + * Variables are copied + * The instance is modified to start at a predefined point (i.e., the wait state) +3. The old instance is canceled. + +This prototype considers the two latest versions of a model. Older versions are ignored. +This prototype is heavily inspired by the [Camunda 7 to 8 migration tooling](https://github.com/camunda-community-hub/camunda-7-to-8-migration). +## Configuration + +Everything is preconfigured for local testing (i.e., localhost addresses without encryption). +You can configure the connection to Zeebe as described [here](https://github.com/camunda-community-hub/spring-zeebe). +You can furthermore use the following properties to configure the connection to Operate: +``` +operate: + url: https://bru-2.operate.camunda.io/757dbc30-5127-4bed-XXXX-XXXXXXXXXXXX + # for self-managed setup configure + keycloak: + realm: camunda-platform + url: https://mykeycloak.example.com +``` + +## Version Information + +This prototype has been build for Camunda 8.1.8. +It is compatible with both self-managed and SaaS deployments. + +Camunda 8.2 provides additional API endpoints, which can be used to improve this prototype: +We can get the element ID of currently enabled flow nodes. +With this information, activity "wait until all processes reached waiting state" becomes obsolete. \ No newline at end of file diff --git a/c8-1-instance-migration/pom.xml b/c8-1-instance-migration/pom.xml new file mode 100644 index 00000000..2d914f43 --- /dev/null +++ b/c8-1-instance-migration/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + com.camunda.consulting + c8instancemigration + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + io.camunda + spring-zeebe-starter + 8.1.13 + + + io.camunda + camunda-operate-client-java + 8.1.7.0 + + + org.springframework.boot + spring-boot-starter-test + test + 2.7.7 + + + \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/java/com/camunda/consulting/Main.java b/c8-1-instance-migration/src/main/java/com/camunda/consulting/Main.java new file mode 100644 index 00000000..00a498e1 --- /dev/null +++ b/c8-1-instance-migration/src/main/java/com/camunda/consulting/Main.java @@ -0,0 +1,97 @@ +package com.camunda.consulting; + +import io.camunda.operate.CamundaOperateClient; +import io.camunda.operate.dto.ProcessDefinition; +import io.camunda.operate.dto.ProcessInstance; +import io.camunda.operate.dto.ProcessInstanceState; +import io.camunda.operate.exception.OperateException; +import io.camunda.operate.search.ProcessDefinitionFilter; +import io.camunda.operate.search.ProcessInstanceFilter; +import io.camunda.operate.search.SearchQuery; +import io.camunda.operate.search.Sort; +import io.camunda.operate.search.SortOrder; +import io.camunda.operate.search.VariableFilter; +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.spring.client.EnableZeebeClient; +import io.camunda.zeebe.spring.client.annotation.JobWorker; +import io.camunda.zeebe.spring.client.annotation.Variable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@SpringBootApplication +@EnableZeebeClient +public class Main { + public static void main(String[] args) { + SpringApplication.run(Main.class); + } + + private static Logger LOG = LoggerFactory.getLogger(Main.class); + + @Autowired + ZeebeClient client; + @Autowired + CamundaOperateClient operate; + + @JobWorker + public Map fetchInstances(@Variable String bpmnProcessId) throws OperateException { + List processInstanceIds = new ArrayList<>(); + ProcessDefinitionFilter filter = new ProcessDefinitionFilter.Builder().bpmnProcessId(bpmnProcessId).build(); + Sort sort = new Sort("version", SortOrder.DESC); + SearchQuery query = new SearchQuery.Builder().filter(filter).sort(sort).build(); + List definitions = operate.searchProcessDefinitions(query); + LOG.info("Found {} versions of process {}", definitions.size(), bpmnProcessId); + if (definitions.size() >= 2) { + ProcessInstanceFilter instanceFilter = new ProcessInstanceFilter.Builder().bpmnProcessId(bpmnProcessId).processVersion(definitions.get(1).getVersion()).state( + ProcessInstanceState.ACTIVE).build(); + processInstanceIds = operate.searchProcessInstances(new SearchQuery.Builder().filter(instanceFilter).build()).stream() + .map(ProcessInstance::getKey).toList(); + } + LOG.info("{} process instances will be migrated to latest version", processInstanceIds.size()); + return Map.of("processInstanceKeys", processInstanceIds); + } + + @JobWorker + public Map startInstance(@Variable Long processInstanceKey, @Variable String bpmnProcessId, @Variable String startBeforeElement) + throws OperateException { + Map variables = new HashMap<>(); + VariableFilter filter = new VariableFilter.Builder().processInstanceKey(processInstanceKey).build(); + List vars = operate.searchVariables(new SearchQuery.Builder().filter(filter).build()); + vars.forEach(var -> { + if (var.getTruncated()) { + try { + io.camunda.operate.dto.Variable fullVar = operate.getVariable(var.getKey()); + variables.put(fullVar.getName(), fullVar.getValue()); + } catch (OperateException e) { + throw new RuntimeException(e); + } + } else { + variables.put(var.getName(), var.getValue()); + } + }); + variables.put("oldInstanceKey", processInstanceKey); + ProcessInstanceEvent newInstance = client.newCreateInstanceCommand() + .bpmnProcessId(bpmnProcessId) + .latestVersion() + .startBeforeElement(startBeforeElement) + .variables(variables) + .send() + .join(); + LOG.info("Migration of instance {} completed. New instance ID is {}.", processInstanceKey, newInstance.getProcessInstanceKey()); + return Map.of("newInstanceKey", newInstance.getProcessInstanceKey()); + } + + @JobWorker + public void deleteOldInstance(@Variable Long processInstanceKey) { + client.newCancelInstanceCommand(processInstanceKey).send().join(); + LOG.info("Instance {} has been cancelled, because it has been migrated to a new process model version.", processInstanceKey); + } +} \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/java/com/camunda/consulting/OperateClientConfiguration.java b/c8-1-instance-migration/src/main/java/com/camunda/consulting/OperateClientConfiguration.java new file mode 100644 index 00000000..7ffe7f24 --- /dev/null +++ b/c8-1-instance-migration/src/main/java/com/camunda/consulting/OperateClientConfiguration.java @@ -0,0 +1,72 @@ +package com.camunda.consulting; + +import io.camunda.operate.CamundaOperateClient; +import io.camunda.operate.auth.AuthInterface; +import io.camunda.operate.auth.SaasAuthentication; +import io.camunda.operate.auth.SelfManagedAuthentication; +import io.camunda.operate.auth.SimpleAuthentication; +import io.camunda.operate.exception.OperateException; +import io.camunda.zeebe.spring.client.annotation.Deployment; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Objects; + +@Configuration +@Deployment(resources = {"classpath:c8-to-c8-instance_migration.bpmn"}) +public class OperateClientConfiguration { + @Value("${operate.keycloak.url:#{null}}") + String keycloakUrl; + @Value("${zeebe.client.cloud.client-id:#{null}}") + String clientId; + + @Value("${zeebe.client.cloud.client-secret:#{null}}") + String clientSecret; + + @Value("${operate.baseUrl:operate.camunda.io}") + String baseUrl; + + @Value("${operate.authUrl:https://login.cloud.camunda.io/oauth/token}") + String authUrl; + + @Value("${operate.url:http://localhost:8081}") + String operateUrl; + + @Value("${operate.user:demo}") + String operateUsr; + + @Value("${operate.password:demo}") + String password; + + @Value("${operate.keycloak.realm:camunda-platform}") + String keycloakRealm; + + @Bean + public CamundaOperateClient operate() { + AuthInterface auth = null; + if (Objects.nonNull(clientSecret) && Objects.nonNull(clientId)) { + if (Objects.nonNull(keycloakUrl)) { + auth = new SelfManagedAuthentication() + .clientId(clientId) + .clientSecret(clientSecret) + .keycloakRealm(keycloakRealm) + .keycloakUrl(keycloakUrl); + } else { + auth = new SaasAuthentication(authUrl, baseUrl, clientId, clientSecret); + } + } else { + auth = new SimpleAuthentication(operateUsr, password, operateUrl); + } + CamundaOperateClient client = null; + try { + client = new CamundaOperateClient.Builder().operateUrl(operateUrl) + .authentication(auth) + .build(); + } catch (OperateException e) { + throw new RuntimeException(e); + } + return client; + } +} diff --git a/c8-1-instance-migration/src/main/resources/application.yml b/c8-1-instance-migration/src/main/resources/application.yml new file mode 100644 index 00000000..f4ba17b0 --- /dev/null +++ b/c8-1-instance-migration/src/main/resources/application.yml @@ -0,0 +1,3 @@ +zeebe.client: + broker.gateway-address: 127.0.0.1:26500 + security.plaintext: true \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/resources/c8-to-c8-instance_migration.bpmn b/c8-1-instance-migration/src/main/resources/c8-to-c8-instance_migration.bpmn new file mode 100644 index 00000000..dbfc00ae --- /dev/null +++ b/c8-1-instance-migration/src/main/resources/c8-to-c8-instance_migration.bpmn @@ -0,0 +1,245 @@ + + + + + { + "components": [ + { + "text": "# Suspend Job Worker\nTo be migrated, all process instances must reach a waiting state, i.e., a user task or a service task whose job worker has been suspended. **If you do not want to wait for a user task, please suspend your jobworker now**.\n\nAfterwards, file the following form and complete the task.", + "type": "text", + "id": "Field_0zas7yp" + }, + { + "label": "BPMN Process ID", + "type": "textfield", + "id": "Field_0tmv9vc", + "key": "bpmnProcessId", + "description": "Specify the ID of the process that should be migrated to the latest version" + }, + { + "label": "Start before element", + "type": "textfield", + "id": "Field_0d9efd2", + "key": "startBeforeElement", + "description": "Element ID of the user task or suspended service task." + } + ], + "type": "default", + "id": "Form_0iz9juy", + "executionPlatform": "Camunda Cloud", + "executionPlatformVersion": "8.1.0", + "exporter": { + "name": "Camunda Modeler", + "version": "5.8.0" + }, + "schemaVersion": 7 +} + { + "components": [ + { + "text": "# Wait until waiting state\nPlease wait until all process instances have reached the desired waiting state. Afterward, complete the task to start the instance migration.", + "type": "text", + "id": "Field_0xdo7kr" + } + ], + "type": "default", + "id": "Form_064so1a", + "executionPlatform": "Camunda Cloud", + "executionPlatformVersion": "8.1.0", + "exporter": { + "name": "Camunda Modeler", + "version": "5.8.0" + }, + "schemaVersion": 7 +} + { + "components": [ + { + "text": "# Migration has been completed\nIf you suspended a job worker during the first step of the process, start it again.", + "type": "text", + "id": "Field_0vjvjyt" + } + ], + "type": "default", + "id": "Form_0ux0b2i", + "executionPlatform": "Camunda Cloud", + "executionPlatformVersion": "8.1.0", + "exporter": { + "name": "Camunda Modeler", + "version": "5.8.0" + }, + "schemaVersion": 7 +} + + + Flow_061o5iy + + + + Manually stop the job worker at which all instances should wait. If this can be done automatically, this task may be changed to a service task. + + + + Flow_061o5iy + Flow_0ehdnyx + + + + Restart the worker to continue the process execution + + + + Flow_11evvbe + Flow_0m3h3k8 + + + Flow_0m3h3k8 + + + + + + + + Flow_0ehdnyx + Flow_1hy8myh + + + Use Operate API to fetch all process instances of a specific process model version: https://docs.camunda.io/docs/apis-clients/operate-api/#process-instance +Use operate API to filter those instances that wait at the predefined task: +https://docs.camunda.io/docs/apis-clients/operate-api/#flow-node-instance + + + + Flow_1hy8myh + Flow_0s4tfom + + + Flow_0s4tfom + Flow_11evvbe + + + + + + + Flow_0kzdm7p + + + Create a new process instance via Zeebe API: +https://docs.camunda.io/docs/apis-clients/grpc/#createprocessinstancewithresult-rpc + +Use the operate API to get all variables and set the values appropriately. + + + + Flow_0kzdm7p + Flow_14ohcyf + + + Flow_0zvy8vi + + + Delete the old process instance: +https://docs.camunda.io/docs/apis-clients/operate-api/ + + + + Flow_14ohcyf + Flow_0zvy8vi + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/resources/start_job_worker.form b/c8-1-instance-migration/src/main/resources/start_job_worker.form new file mode 100644 index 00000000..6163439c --- /dev/null +++ b/c8-1-instance-migration/src/main/resources/start_job_worker.form @@ -0,0 +1,18 @@ +{ + "components": [ + { + "text": "# Migration has been completed\nIf you suspended a job worker during the first step of the process, start it again.", + "type": "text", + "id": "Field_0vjvjyt" + } + ], + "type": "default", + "id": "Form_0ux0b2i", + "executionPlatform": "Camunda Cloud", + "executionPlatformVersion": "8.1.0", + "exporter": { + "name": "Camunda Modeler", + "version": "5.8.0" + }, + "schemaVersion": 7 +} \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/resources/suspend_job_worker.form b/c8-1-instance-migration/src/main/resources/suspend_job_worker.form new file mode 100644 index 00000000..0ca6e10e --- /dev/null +++ b/c8-1-instance-migration/src/main/resources/suspend_job_worker.form @@ -0,0 +1,32 @@ +{ + "components": [ + { + "text": "# Suspend Job Worker\nTo be migrated, all process instances must reach a waiting state, i.e., a user task or a service task whose job worker has been suspended. **If you do not want to wait for a user task, please suspend your jobworker now**.\n\nAfterwards, file the following form and complete the task.", + "type": "text", + "id": "Field_0zas7yp" + }, + { + "label": "BPMN Process ID", + "type": "textfield", + "id": "Field_0tmv9vc", + "key": "bpmnProcessId", + "description": "Specify the ID of the process that should be migrated to the latest version" + }, + { + "label": "Start before element", + "type": "textfield", + "id": "Field_0d9efd2", + "key": "startBeforeElement", + "description": "Element ID of the user task or suspended service task." + } + ], + "type": "default", + "id": "Form_0iz9juy", + "executionPlatform": "Camunda Cloud", + "executionPlatformVersion": "8.1.0", + "exporter": { + "name": "Camunda Modeler", + "version": "5.8.0" + }, + "schemaVersion": 7 +} \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/resources/test_v1.bpmn b/c8-1-instance-migration/src/main/resources/test_v1.bpmn new file mode 100644 index 00000000..e0a514b6 --- /dev/null +++ b/c8-1-instance-migration/src/main/resources/test_v1.bpmn @@ -0,0 +1,39 @@ + + + + + Flow_0eow0rn + + + + Flow_0eow0rn + Flow_0jij4oq + + + Flow_0jij4oq + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/resources/test_v2.bpmn b/c8-1-instance-migration/src/main/resources/test_v2.bpmn new file mode 100644 index 00000000..bd5cd281 --- /dev/null +++ b/c8-1-instance-migration/src/main/resources/test_v2.bpmn @@ -0,0 +1,51 @@ + + + + + Flow_0eow0rn + + + + Flow_0eow0rn + Flow_0jij4oq + + + Flow_1em5bx3 + + + + + Flow_0jij4oq + Flow_1em5bx3 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/c8-1-instance-migration/src/main/resources/wait_for_instances.form b/c8-1-instance-migration/src/main/resources/wait_for_instances.form new file mode 100644 index 00000000..c3050dcc --- /dev/null +++ b/c8-1-instance-migration/src/main/resources/wait_for_instances.form @@ -0,0 +1,18 @@ +{ + "components": [ + { + "text": "# Wait until waiting state\nPlease wait until all process instances have reached the desired waiting state. Afterward, complete the task to start the instance migration.", + "type": "text", + "id": "Field_0xdo7kr" + } + ], + "type": "default", + "id": "Form_064so1a", + "executionPlatform": "Camunda Cloud", + "executionPlatformVersion": "8.1.0", + "exporter": { + "name": "Camunda Modeler", + "version": "5.8.0" + }, + "schemaVersion": 7 +} \ No newline at end of file diff --git a/c8-1-instance-migration/src/test/java/com/camunda/consulting/GenericTests.java b/c8-1-instance-migration/src/test/java/com/camunda/consulting/GenericTests.java new file mode 100644 index 00000000..8a8a4bb6 --- /dev/null +++ b/c8-1-instance-migration/src/test/java/com/camunda/consulting/GenericTests.java @@ -0,0 +1,39 @@ +package com.camunda.consulting; + +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.model.bpmn.instance.zeebe.ZeebeTaskDefinition; +import io.camunda.zeebe.spring.client.annotation.JobWorker; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +public class GenericTests { + + @ParameterizedTest + @ValueSource(strings = {"/c8-to-c8-instance_migration.bpmn"}) + public void thereIsAWorkerForEachTaskTypeInModel(String bpmnFileName) { + Class workerClass = Main.class; + Set jobTypesInModel = getJobTypesFromModel(bpmnFileName); + assert(Arrays.stream(workerClass.getDeclaredMethods()) + .map(method -> { + if (method.isAnnotationPresent(JobWorker.class)) { + String type = method.getAnnotation(JobWorker.class).type(); + return type.equals("") ? method.getName() : type; + } + return null; + }).collect(Collectors.toSet()) + .containsAll(jobTypesInModel)); + } + + private Set getJobTypesFromModel(String bpmnFile) { + BpmnModelInstance bpmn = Bpmn.readModelFromStream(this.getClass().getResourceAsStream(bpmnFile)); + return bpmn.getModelElementsByType(ZeebeTaskDefinition.class).stream() + .map(ZeebeTaskDefinition::getType) + .collect(Collectors.toSet()); + } + +}