diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 884719d..f36ac67 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,7 +13,7 @@ jobs: - name: Java setup uses: actions/setup-java@e54a62b3df9364d4b4c1c29c7225e57fe605d7dd # pin@v1 with: - java-version: 11 + java-version: 17 - name: Cache uses: actions/cache@99d99cd262b87f5f8671407a1e5c1ddfa36ad5ba # pin@v1 with: diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 244cb5c..d3242a0 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -25,7 +25,7 @@ jobs: - name: Set up Java environment uses: actions/setup-java@e54a62b3df9364d4b4c1c29c7225e57fe605d7dd # pin@v1 with: - java-version: 11 + java-version: 17 gpg-private-key: ${{ secrets.MAVEN_CENTRAL_GPG_SIGNING_KEY_SEC }} gpg-passphrase: MAVEN_CENTRAL_GPG_PASSPHRASE - name: Deploy SNAPSHOT / Release diff --git a/core/src/main/kotlin/io/zeebe/bpmnspec/SpecRunner.kt b/core/src/main/kotlin/io/zeebe/bpmnspec/SpecRunner.kt index 93c9b3b..8d7177a 100644 --- a/core/src/main/kotlin/io/zeebe/bpmnspec/SpecRunner.kt +++ b/core/src/main/kotlin/io/zeebe/bpmnspec/SpecRunner.kt @@ -78,8 +78,9 @@ class SpecRunner( resources.joinToString() ) resources.forEach { resourceName -> - val resourceStream = resourceResolver.getResource(resourceName) - testRunner.deployProcess(resourceName, resourceStream) + resourceResolver.getResource(resourceName).use { + testRunner.deployProcess(resourceName, it) + } } logger.debug( diff --git a/pom.xml b/pom.xml index 769425c..5c4db8b 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,8 @@ 1.0.0 + 1.0.2 + 1.2.1 3.5.2 1.17.5 @@ -79,6 +81,18 @@ import + + org.camunda.community + eze + ${eze.version} + + + + io.zeebe.hazelcast + zeebe-hazelcast-exporter + ${zeebe-hazelcast-exporter.version} + + io.zeebe zeebe-test-container @@ -115,6 +129,12 @@ ${junit.version} + + io.github.microutils + kotlin-logging-jvm + 3.0.2 + + org.apache.logging.log4j log4j-slf4j-impl diff --git a/zeebe-test-runner/pom.xml b/zeebe-test-runner/pom.xml index e0e7764..5d34a0d 100644 --- a/zeebe-test-runner/pom.xml +++ b/zeebe-test-runner/pom.xml @@ -21,6 +21,16 @@ zeebe-client-java + + org.camunda.community + eze + + + + io.zeebe.hazelcast + zeebe-hazelcast-exporter + + io.zeebe zeebe-test-container @@ -41,6 +51,11 @@ jackson-module-kotlin + + io.github.microutils + kotlin-logging-jvm + + org.junit.jupiter junit-jupiter-api @@ -77,6 +92,11 @@ awaitility-kotlin test + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/AbstractTestRunner.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/AbstractTestRunner.kt new file mode 100644 index 0000000..f05ab2d --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/AbstractTestRunner.kt @@ -0,0 +1,124 @@ +package io.zeebe.bpmnspec.runner.zeebe + +import io.zeebe.bpmnspec.api.ProcessInstanceContext +import io.zeebe.bpmnspec.api.runner.ElementInstance +import io.zeebe.bpmnspec.api.runner.Incident +import io.zeebe.bpmnspec.api.runner.ProcessInstanceState +import io.zeebe.bpmnspec.api.runner.ProcessInstanceVariable +import io.zeebe.bpmnspec.api.runner.TestRunner +import mu.KLogger +import java.io.InputStream + +abstract class AbstractTestRunner( + protected val environment: ZeebeEnvironment, + protected val reuseEnvironment: Boolean, + protected val beforeEachCallback: (ZeebeTestContext) -> Unit, + protected val afterEachCallback: (ZeebeTestContext) -> Unit, + protected val logger: KLogger +) : TestRunner { + override fun beforeAll() { + if (reuseEnvironment) { + environment.setup() + } + } + + override fun beforeEach() { + if (!reuseEnvironment || !environment.isRunning) { + environment.setup() + } + val testContext = ZeebeTestContext(zeebeClient = environment.zeebeClient) + beforeEachCallback(testContext) + } + + override fun afterEach() { + val testContext = ZeebeTestContext(zeebeClient = environment.zeebeClient) + afterEachCallback(testContext) + + if (!reuseEnvironment) { + environment.cleanUp() + } + } + + override fun afterAll() { + if (reuseEnvironment) { + environment.cleanUp() + } + } + + override fun deployProcess(name: String, bpmnXml: InputStream) { + logger.debug { "Deploying a BPMN. [name: $name]" } + + environment.zeebeService.deployProcess(name, bpmnXml) + } + + override fun createProcessInstance( + bpmnProcessId: String, + variables: String + ): ProcessInstanceContext { + logger.debug { "Creating a process instance. [BPMN-process-id: $bpmnProcessId, variables: $variables]" } + + val response = environment.zeebeService.createProcessInstance(bpmnProcessId, variables) + + return ZeebeProcessInstanceContext(processInstanceKey = response) + } + + override fun completeTask(jobType: String, variables: String) { + logger.debug { "Starting a job worker to complete jobs. [job-type: $jobType, variables: $variables]" } + + environment.zeebeService.completeTask(jobType, variables) + } + + override fun publishMessage(messageName: String, correlationKey: String, variables: String) { + logger.debug { + "Publishing a message. [name: $messageName, correlation-key: $correlationKey, variables: $variables]" + } + + environment.zeebeService.publishMessage(messageName, correlationKey, variables) + } + + override fun throwError(jobType: String, errorCode: String, errorMessage: String) { + logger.debug { + "Starting a job worker to throw errors. [job-type: $jobType, error-code: $errorCode, error-message: $errorMessage]" + } + + environment.zeebeService.throwError(jobType, errorCode, errorMessage) + } + + override fun cancelProcessInstance(context: ProcessInstanceContext) { + val wfContext = context as ZeebeProcessInstanceContext + + logger.debug { + "Cancelling a process instance. [key: ${wfContext.processInstanceKey}]" + } + + environment.zeebeService.cancelProcessInstance(wfContext.processInstanceKey) + } + + override fun getProcessInstanceContexts(): List { + return environment.zeebeEventRepository.getProcessInstanceKeys().map { ZeebeProcessInstanceContext(it) } + } + + override fun getProcessInstanceState(context: ProcessInstanceContext): ProcessInstanceState { + val wfContext = context as ZeebeProcessInstanceContext + + return environment.zeebeEventRepository.getProcessInstanceState(wfContext.processInstanceKey) + } + + override fun getElementInstances(context: ProcessInstanceContext): List { + val wfContext = context as ZeebeProcessInstanceContext + + return environment.zeebeEventRepository.getElementInstances(wfContext.processInstanceKey) + } + + override fun getProcessInstanceVariables(context: ProcessInstanceContext): List { + val wfContext = context as ZeebeProcessInstanceContext + + return environment.zeebeEventRepository.getProcessInstanceVariables(wfContext.processInstanceKey) + } + + override fun getIncidents(context: ProcessInstanceContext): List { + val wfContext = context as ZeebeProcessInstanceContext + + return environment.zeebeEventRepository.getIncidents(wfContext.processInstanceKey) + } +} diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/DefaultZeebeEnvironment.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/DefaultZeebeEnvironment.kt new file mode 100644 index 0000000..51252c4 --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/DefaultZeebeEnvironment.kt @@ -0,0 +1,140 @@ +package io.zeebe.bpmnspec.runner.zeebe + +import io.camunda.zeebe.client.ZeebeClient +import io.zeebe.bpmnspec.runner.zeebe.zeeqs.ZeeqsClient +import io.zeebe.bpmnspec.runner.zeebe.zeeqs.ZeeqsZeebeEventRepository +import io.zeebe.containers.ZeebeContainer +import org.slf4j.LoggerFactory +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.Network +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.utility.DockerImageName + +class DefaultZeebeEnvironment( + val zeebeImage: String = System.getProperty( + "zeebeImage", + "ghcr.io/camunda-community-hub/zeebe-with-hazelcast-exporter" + ), + val zeebeImageVersion: String = System.getProperty("zeebeImageVersion", "8.0.6"), + private val zeebeEventRepositoryProvider: (zeeqsClient: ZeeqsClient) -> ZeebeEventRepository = { + ZeeqsZeebeEventRepository( + it + ) + }, + private val zeebeServiceProvider: (zeebeClient: ZeebeClient) -> ZeebeService = { ZeebeClientZeebeService(it) } +) : ZeebeEnvironment { + + private val logger = LoggerFactory.getLogger(ZeebeTestRunner::class.java) + + val zeeqsImage = "ghcr.io/camunda-community-hub/zeeqs" + val zeeqsImageVersion: String = "2.4.0" + + private val zeebeHost = "zeebe" + private val hazelcastPort = 5701 + private val zeeqsGraphqlPort = 9000 + + private val closingSteps = mutableListOf() + + override lateinit var zeebeClient: ZeebeClient + override lateinit var zeeqsClient: ZeeqsClient + override lateinit var zeebeEventRepository: ZeebeEventRepository + override lateinit var zeebeService: ZeebeService + + override var isRunning = false + + override fun setup() { + val network = Network.newNetwork()!! + closingSteps.add(network) + + listOf(this::startZeebeContainer, this::startZeeqsContainer) + .parallelStream() + .forEach { it.invoke(network) } + + isRunning = true + } + + private fun startZeebeContainer(network: Network) { + val zeebeImageName = DockerImageName.parse("$zeebeImage:$zeebeImageVersion") + val zeebeContainer = ZeebeContainer(zeebeImageName) + .withAdditionalExposedPort(hazelcastPort) + .withNetwork(network) + .withNetworkAliases(zeebeHost) + + logger.debug("Starting the Zeebe container [image: {}]", zeebeContainer.dockerImageName) + try { + zeebeContainer.start() + } catch (e: Exception) { + logger.error("Failed to start the Zeebe container", e) + logger.debug("Zeebe container output: {}", zeebeContainer.logs) + + throw RuntimeException("Failed to start the Zeebe container", e) + } + + logger.debug("Started the Zeebe container") + closingSteps.add(zeebeContainer) + + val zeebeGatewayPort = zeebeContainer.externalGatewayAddress + + zeebeClient = ZeebeClient + .newClientBuilder() + .gatewayAddress(zeebeGatewayPort) + .usePlaintext() + .build() + + zeebeService = zeebeServiceProvider(zeebeClient) + + closingSteps.add(zeebeClient) + closingSteps.add(zeebeService) + + // verify that the client is connected + try { + zeebeClient.newTopologyRequest().send().join() + } catch (e: Exception) { + logger.error("Failed to connect the Zeebe client", e) + logger.debug("Zeebe container output: {}", zeebeContainer.logs) + + throw RuntimeException("Failed to connect the Zeebe client", e) + } + } + + private fun startZeeqsContainer(network: Network) { + val zeeqsContainer = ZeeqsContainer(zeeqsImage, zeeqsImageVersion) + .withEnv("zeebe.client.worker.hazelcast.connection", "$zeebeHost:$hazelcastPort") + .withExposedPorts(zeeqsGraphqlPort) + .waitingFor(Wait.forHttp("/actuator/health")) + .withNetwork(network) + .withNetworkAliases("zeeqs") + + logger.debug("Starting the ZeeQS container [image: {}]", zeeqsContainer.dockerImageName) + try { + zeeqsContainer.start() + } catch (e: Exception) { + logger.error("Failed to start the ZeeQS container", e) + logger.debug("ZeeQS container output: {}", zeeqsContainer.logs) + + throw RuntimeException("Failed to start the ZeeQS container", e) + } + + logger.debug("Started the ZeeQS container") + closingSteps.add(zeeqsContainer) + + val zeeqsContainerHost = zeeqsContainer.host + val zeeqsContainerPort = zeeqsContainer.getMappedPort(zeeqsGraphqlPort) + + zeeqsClient = ZeeqsClient(zeeqsEndpoint = "$zeeqsContainerHost:$zeeqsContainerPort/graphql") + zeebeEventRepository = zeebeEventRepositoryProvider(zeeqsClient) + } + + override fun cleanUp() { + logger.debug("Closing resources") + closingSteps.toList().reversed().forEach(AutoCloseable::close) + + logger.debug("Closed resources") + + isRunning = false + } + + class ZeeqsContainer(imageName: String, version: String) : + GenericContainer("$imageName:$version") + +} \ No newline at end of file diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/TestEnvironment.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/TestEnvironment.kt new file mode 100644 index 0000000..2411463 --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/TestEnvironment.kt @@ -0,0 +1,17 @@ +package io.zeebe.bpmnspec.runner.zeebe + +import io.camunda.zeebe.client.ZeebeClient +import io.zeebe.bpmnspec.runner.zeebe.zeeqs.ZeeqsClient + +interface TestEnvironment { + val zeebeClient: ZeebeClient + val zeeqsClient: ZeeqsClient + val zeebeEventRepository: ZeebeEventRepository + val zeebeService: ZeebeService + + val isRunning: Boolean + + fun setup() + + fun cleanUp() +} diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeClientZeebeService.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeClientZeebeService.kt new file mode 100644 index 0000000..a98a2f3 --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeClientZeebeService.kt @@ -0,0 +1,89 @@ +package io.zeebe.bpmnspec.runner.zeebe + +import io.camunda.zeebe.client.ZeebeClient +import io.camunda.zeebe.client.api.worker.JobWorker +import java.io.InputStream +import java.time.Duration +import java.util.concurrent.locks.ReentrantLock + +class ZeebeClientZeebeService( + private val zeebeClient: ZeebeClient +) : ZeebeService { + + private val lock: ReentrantLock = ReentrantLock() + private val workers: MutableList = object : ArrayList() { + override fun add(element: JobWorker): Boolean { + lock.lock() + try { + return super.add(element) + } finally { + lock.unlock() + } + } + } + + override fun deployProcess(name: String, bpmnXml: InputStream) { + zeebeClient.newDeployResourceCommand() + .addResourceStream(bpmnXml, name) + .send() + .join() + } + + override fun createProcessInstance(bpmnProcessId: String, variables: String): Long { + return zeebeClient.newCreateInstanceCommand() + .bpmnProcessId(bpmnProcessId) + .latestVersion().variables(variables) + .send() + .join() + .processInstanceKey + } + + override fun completeTask(jobType: String, variables: String) { + val jobWorker = zeebeClient.newWorker() + .jobType(jobType) + .handler { jobClient, job -> + jobClient.newCompleteCommand(job.key) + .variables(variables) + .send() + .join() + } + .timeout(Duration.ofSeconds(1)) + .open() + + workers.add(jobWorker) + } + + override fun publishMessage(messageName: String, correlationKey: String, variables: String) { + zeebeClient.newPublishMessageCommand() + .messageName(messageName) + .correlationKey(correlationKey) + .variables(variables) + .timeToLive(Duration.ofSeconds(10)) + .send() + .join() + } + + override fun throwError(jobType: String, errorCode: String, errorMessage: String) { + val jobWorker = zeebeClient.newWorker() + .jobType(jobType) + .handler { jobClient, job -> + jobClient.newThrowErrorCommand(job.key) + .errorCode(errorCode) + .errorMessage(errorMessage) + .send() + .join() + } + .timeout(Duration.ofSeconds(1)) + .open() + + workers.add(jobWorker) + } + + override fun cancelProcessInstance(processInstanceKey: Long) { + zeebeClient.newCancelInstanceCommand(processInstanceKey) + .send() + .join() + } + + override fun close() = workers.forEach { it.close() } +} diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeEnvironment.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeEnvironment.kt index b4091c5..46b4e34 100644 --- a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeEnvironment.kt +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeEnvironment.kt @@ -1,127 +1,3 @@ package io.zeebe.bpmnspec.runner.zeebe -import io.camunda.zeebe.client.ZeebeClient -import io.zeebe.bpmnspec.runner.zeebe.zeeqs.ZeeqsClient -import io.zeebe.containers.ZeebeContainer -import org.slf4j.LoggerFactory -import org.testcontainers.containers.GenericContainer -import org.testcontainers.containers.Network -import org.testcontainers.containers.wait.strategy.Wait -import org.testcontainers.utility.DockerImageName - -class ZeebeEnvironment( - val zeebeImage: String = System.getProperty( - "zeebeImage", - "ghcr.io/camunda-community-hub/zeebe-with-hazelcast-exporter" - ), - val zeebeImageVersion: String = System.getProperty("zeebeImageVersion", "1.0.0-1.0.0") -) { - - private val logger = LoggerFactory.getLogger(ZeebeTestRunner::class.java) - - val zeeqsImage = "ghcr.io/camunda-community-hub/zeeqs" - val zeeqsImageVersion: String = "2.0.0" - - private val zeebeHost = "zeebe" - private val hazelcastPort = 5701 - private val zeeqsGraphqlPort = 9000 - - private val closingSteps = mutableListOf() - - lateinit var zeebeClient: ZeebeClient - lateinit var zeeqsClient: ZeeqsClient - - var isRunning = false - - fun setup() { - val network = Network.newNetwork()!! - closingSteps.add(network) - - listOf(this::startZeebeContainer, this::startZeeqsContainer) - .parallelStream() - .forEach { it.invoke(network) } - - isRunning = true - } - - private fun startZeebeContainer(network: Network) { - val zeebeImageName = DockerImageName.parse("$zeebeImage:$zeebeImageVersion") - val zeebeContainer = ZeebeContainer(zeebeImageName) - .withAdditionalExposedPort(hazelcastPort) - .withNetwork(network) - .withNetworkAliases(zeebeHost) - - logger.debug("Starting the Zeebe container [image: {}]", zeebeContainer.dockerImageName) - try { - zeebeContainer.start() - } catch (e: Exception) { - logger.error("Failed to start the Zeebe container", e) - logger.debug("Zeebe container output: {}", zeebeContainer.logs) - - throw RuntimeException("Failed to start the Zeebe container", e) - } - - logger.debug("Started the Zeebe container") - closingSteps.add(zeebeContainer) - - val zeebeGatewayPort = zeebeContainer.externalGatewayAddress - - zeebeClient = ZeebeClient - .newClientBuilder() - .gatewayAddress(zeebeGatewayPort) - .usePlaintext() - .build() - - closingSteps.add(zeebeClient) - - // verify that the client is connected - try { - zeebeClient.newTopologyRequest().send().join() - } catch (e: Exception) { - logger.error("Failed to connect the Zeebe client", e) - logger.debug("Zeebe container output: {}", zeebeContainer.logs) - - throw RuntimeException("Failed to connect the Zeebe client", e) - } - } - - private fun startZeeqsContainer(network: Network) { - val zeeqsContainer = ZeeqsContainer(zeeqsImage, zeeqsImageVersion) - .withEnv("zeebe.client.worker.hazelcast.connection", "$zeebeHost:$hazelcastPort") - .withExposedPorts(zeeqsGraphqlPort) - .waitingFor(Wait.forHttp("/actuator/health")) - .withNetwork(network) - .withNetworkAliases("zeeqs") - - logger.debug("Starting the ZeeQS container [image: {}]", zeeqsContainer.dockerImageName) - try { - zeeqsContainer.start() - } catch (e: Exception) { - logger.error("Failed to start the ZeeQS container", e) - logger.debug("ZeeQS container output: {}", zeeqsContainer.logs) - - throw RuntimeException("Failed to start the ZeeQS container", e) - } - - logger.debug("Started the ZeeQS container") - closingSteps.add(zeeqsContainer) - - val zeeqsContainerHost = zeeqsContainer.host - val zeeqsContainerPort = zeeqsContainer.getMappedPort(zeeqsGraphqlPort) - - zeeqsClient = ZeeqsClient(zeeqsEndpoint = "$zeeqsContainerHost:$zeeqsContainerPort/graphql") - } - - fun cleanUp() { - logger.debug("Closing resources") - closingSteps.toList().reversed().forEach(AutoCloseable::close) - - logger.debug("Closed resources") - - isRunning = false - } - - class ZeeqsContainer(imageName: String, version: String) : - GenericContainer("$imageName:$version") - -} \ No newline at end of file +interface ZeebeEnvironment : TestEnvironment diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeEventRepository.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeEventRepository.kt new file mode 100644 index 0000000..148421a --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeEventRepository.kt @@ -0,0 +1,19 @@ +package io.zeebe.bpmnspec.runner.zeebe + +import io.zeebe.bpmnspec.api.runner.ElementInstance +import io.zeebe.bpmnspec.api.runner.Incident +import io.zeebe.bpmnspec.api.runner.ProcessInstanceState +import io.zeebe.bpmnspec.api.runner.ProcessInstanceVariable + +interface ZeebeEventRepository { + + fun getProcessInstanceKeys(): List + + fun getProcessInstanceState(processInstanceKey: Long): ProcessInstanceState + + fun getElementInstances(processInstanceKey: Long): List + + fun getProcessInstanceVariables(processInstanceKey: Long): List + + fun getIncidents(processInstanceKey: Long): List +} \ No newline at end of file diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeService.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeService.kt new file mode 100644 index 0000000..9747d66 --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeService.kt @@ -0,0 +1,17 @@ +package io.zeebe.bpmnspec.runner.zeebe + +import java.io.InputStream + +interface ZeebeService : AutoCloseable { + fun deployProcess(name: String, bpmnXml: InputStream) + + fun createProcessInstance(bpmnProcessId: String, variables: String): Long + + fun completeTask(jobType: String, variables: String) + + fun publishMessage(messageName: String, correlationKey: String, variables: String) + + fun throwError(jobType: String, errorCode: String, errorMessage: String) + + fun cancelProcessInstance(processInstanceKey: Long) +} diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeTestRunner.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeTestRunner.kt index 7e4d7af..94761a4 100644 --- a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeTestRunner.kt +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/ZeebeTestRunner.kt @@ -1,219 +1,14 @@ package io.zeebe.bpmnspec.runner.zeebe -import io.zeebe.bpmnspec.api.ProcessInstanceContext -import io.zeebe.bpmnspec.api.runner.* -import io.camunda.zeebe.client.api.worker.JobWorker -import org.slf4j.LoggerFactory -import java.io.InputStream -import java.time.Duration +import mu.KLogging class ZeebeTestRunner( - private val environment: ZeebeEnvironment = ZeebeEnvironment(), - private val reuseEnvironment: Boolean = false, - private val beforeEachCallback: (ZeebeTestContext) -> Unit = {}, - private val afterEachCallback: (ZeebeTestContext) -> Unit = {} -) : TestRunner { + environment: ZeebeEnvironment = DefaultZeebeEnvironment(), + reuseEnvironment: Boolean = false, + beforeEachCallback: (ZeebeTestContext) -> Unit = {}, + afterEachCallback: (ZeebeTestContext) -> Unit = {} +) : AbstractTestRunner(environment, reuseEnvironment, beforeEachCallback, afterEachCallback, logger) { - private val logger = LoggerFactory.getLogger(ZeebeTestRunner::class.java) + companion object: KLogging() - private val jobWorkers = mutableListOf() - - override fun beforeAll() { - if (reuseEnvironment) { - environment.setup() - } - } - - override fun beforeEach() { - if (!reuseEnvironment || !environment.isRunning) { - environment.setup() - } - val testContext = ZeebeTestContext(zeebeClient = environment.zeebeClient) - beforeEachCallback(testContext) - } - - override fun afterEach() { - jobWorkers.map(JobWorker::close) - - val testContext = ZeebeTestContext(zeebeClient = environment.zeebeClient) - afterEachCallback(testContext) - - if (!reuseEnvironment) { - environment.cleanUp() - } - } - - override fun afterAll() { - if (reuseEnvironment) { - environment.cleanUp() - } - } - - override fun deployProcess(name: String, bpmnXml: InputStream) { - logger.debug("Deploying a BPMN. [name: {}]", name) - - environment.zeebeClient.newDeployCommand() - .addResourceStream(bpmnXml, name) - .send() - .join() - } - - override fun createProcessInstance( - bpmnProcessId: String, - variables: String - ): ProcessInstanceContext { - logger.debug( - "Creating a process instance. [BPMN-process-id: {}, variables: {}]", - bpmnProcessId, - variables - ) - - val response = environment.zeebeClient.newCreateInstanceCommand() - .bpmnProcessId(bpmnProcessId) - .latestVersion().variables(variables) - .send() - .join() - - return ZeebeProcessInstanceContext( - processInstanceKey = response.processInstanceKey - ) - } - - override fun completeTask(jobType: String, variables: String) { - logger.debug( - "Starting a job worker to complete jobs. [job-type: {}, variables: {}]", - jobType, - variables - ) - - val jobWorker = environment.zeebeClient.newWorker() - .jobType(jobType) - .handler { jobClient, job -> - jobClient.newCompleteCommand(job.key) - .variables(variables) - .send() - .join() - } - .timeout(Duration.ofSeconds(1)) - .open() - - jobWorkers.add(jobWorker) - } - - override fun publishMessage(messageName: String, correlationKey: String, variables: String) { - logger.debug( - "Publishing a message. [name: {}, correlation-key: {}, variables: {}]", - messageName, correlationKey, variables - ) - - environment.zeebeClient.newPublishMessageCommand() - .messageName(messageName) - .correlationKey(correlationKey) - .variables(variables) - .timeToLive(Duration.ofSeconds(10)) - .send() - .join() - } - - override fun throwError(jobType: String, errorCode: String, errorMessage: String) { - logger.debug( - "Starting a job worker to throw errors. [job-type: {}, error-code: {}, error-message: {}]", - jobType, errorCode, errorMessage - ) - - val jobWorker = environment.zeebeClient.newWorker() - .jobType(jobType) - .handler { jobClient, job -> - jobClient.newThrowErrorCommand(job.key) - .errorCode(errorCode) - .errorMessage(errorMessage) - .send() - .join() - } - .timeout(Duration.ofSeconds(1)) - .open() - - jobWorkers.add(jobWorker) - } - - override fun cancelProcessInstance(context: ProcessInstanceContext) { - val wfContext = context as ZeebeProcessInstanceContext - - logger.debug("Cancelling a process instance. [key: {}]", wfContext.processInstanceKey) - - environment.zeebeClient.newCancelInstanceCommand(wfContext.processInstanceKey) - .send() - .join() - } - - override fun getProcessInstanceContexts(): List { - - return environment.zeeqsClient.getProcessInstanceKeys() - .map { ZeebeProcessInstanceContext(processInstanceKey = it) } - } - - override fun getProcessInstanceState(context: ProcessInstanceContext): ProcessInstanceState { - val wfContext = context as ZeebeProcessInstanceContext - - val state = environment.zeeqsClient.getProcessInstanceState(wfContext.processInstanceKey) - return when (state) { - "COMPLETED" -> ProcessInstanceState.COMPLETED - "TERMINATED" -> ProcessInstanceState.TERMINATED - "ACTIVATED" -> ProcessInstanceState.ACTIVATED - else -> ProcessInstanceState.UNKNOWN - } - } - - override fun getElementInstances(context: ProcessInstanceContext): List { - val wfContext = context as ZeebeProcessInstanceContext - - return environment.zeeqsClient.getElementInstances(processInstanceKey = wfContext.processInstanceKey) - .map { - ElementInstance( - elementId = it.elementId, - elementName = it.elementName, - state = when (it.state) { - "ACTIVATED" -> ElementInstanceState.ACTIVATED - "COMPLETED" -> ElementInstanceState.COMPLETED - "TERMINATED" -> ElementInstanceState.TERMINATED - "TAKEN" -> ElementInstanceState.TAKEN - else -> ElementInstanceState.UNKNOWN - } - ) - } - } - - override fun getProcessInstanceVariables(context: ProcessInstanceContext): List { - val wfContext = context as ZeebeProcessInstanceContext - - return environment.zeeqsClient.getProcessInstanceVariables(processInstanceKey = wfContext.processInstanceKey) - .map { - ProcessInstanceVariable( - variableName = it.name, - variableValue = it.value, - scopeElementId = it.scope?.elementId ?: "", - scopeElementName = it.scope?.elementName ?: "" - ) - } - } - - override fun getIncidents(context: ProcessInstanceContext): List { - val wfContext = context as ZeebeProcessInstanceContext - - return environment.zeeqsClient.getIncidents(processInstanceKey = wfContext.processInstanceKey) - .map { - Incident( - errorType = it.errorType, - errorMessage = it.errorMessage, - state = when (it.state) { - "CREATED" -> IncidentState.CREATED - "RESOLVED" -> IncidentState.RESOLVED - else -> IncidentState.UNKNOWN - }, - elementId = it.elementInstance?.elementId ?: "", - elementName = it.elementInstance?.elementName ?: "" - ) - } - } - -} \ No newline at end of file +} diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/DefaultEzeTestEnvironment.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/DefaultEzeTestEnvironment.kt new file mode 100644 index 0000000..f92c74c --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/DefaultEzeTestEnvironment.kt @@ -0,0 +1,61 @@ +package io.zeebe.bpmnspec.runner.zeebe.eze + +import io.camunda.zeebe.client.ZeebeClient +import io.zeebe.bpmnspec.runner.zeebe.TestEnvironment +import io.zeebe.bpmnspec.runner.zeebe.DefaultZeebeEnvironment +import io.zeebe.bpmnspec.runner.zeebe.ZeebeClientZeebeService +import io.zeebe.bpmnspec.runner.zeebe.ZeebeEventRepository +import io.zeebe.bpmnspec.runner.zeebe.ZeebeService +import io.zeebe.bpmnspec.runner.zeebe.zeeqs.ZeeqsClient +import io.zeebe.bpmnspec.runner.zeebe.zeeqs.ZeeqsZeebeEventRepository +import io.zeebe.hazelcast.exporter.HazelcastExporter +import org.camunda.community.eze.EngineFactory +import org.camunda.community.eze.ZeebeEngine +import org.testcontainers.Testcontainers +import org.testcontainers.containers.wait.strategy.Wait + +class DefaultEzeTestEnvironment( + private val hazelcastExporter: HazelcastExporter = HazelcastExporter(), + private val zeebeEngine: ZeebeEngine = EngineFactory.create(listOf(hazelcastExporter)), + private val zeeqsGraphqlPort: Int = 9000, + private val zeeqsImage: String = "ghcr.io/camunda-community-hub/zeeqs", + private val zeeqsImageVersion: String = "2.4.0", + private val zeeqsContainer: DefaultZeebeEnvironment.ZeeqsContainer = DefaultZeebeEnvironment + .ZeeqsContainer(zeeqsImage, zeeqsImageVersion) + .withEnv("zeebe.client.worker.hazelcast.connection", "host.testcontainers.internal:5701") + .withExposedPorts(zeeqsGraphqlPort) + .waitingFor(Wait.forHttp("/actuator/health")), + private val zeebeEventRepositoryProvider: (zeeqsClient: ZeeqsClient) -> ZeebeEventRepository = { + ZeeqsZeebeEventRepository( + it + ) + }, + private val zeebeServiceProvider: (zeebeClient: ZeebeClient) -> ZeebeService = { ZeebeClientZeebeService(it) } +) : EzeEnvironment { + override lateinit var zeebeClient: ZeebeClient + override lateinit var zeeqsClient: ZeeqsClient + override lateinit var zeebeEventRepository: ZeebeEventRepository + override lateinit var zeebeService: ZeebeService + override var isRunning: Boolean = false + private val closingSteps = mutableListOf() + + override fun setup() { + zeebeEngine.start() + Testcontainers.exposeHostPorts(5701) + zeebeClient = zeebeEngine.createClient() + zeebeService = zeebeServiceProvider(zeebeClient) + zeeqsContainer.start() + zeeqsClient = ZeeqsClient("localhost:${zeeqsContainer.getMappedPort(zeeqsGraphqlPort)}/graphql") + zeebeEventRepository = zeebeEventRepositoryProvider(zeeqsClient) + closingSteps.add(zeebeClient) + closingSteps.add(zeebeService) + closingSteps.add(AutoCloseable { zeebeEngine.stop() }) + closingSteps.add(zeeqsContainer) + isRunning = true + } + + override fun cleanUp() { + closingSteps.forEach(AutoCloseable::close) + isRunning = false + } +} diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeEnvironment.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeEnvironment.kt new file mode 100644 index 0000000..db715a0 --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeEnvironment.kt @@ -0,0 +1,5 @@ +package io.zeebe.bpmnspec.runner.zeebe.eze + +import io.zeebe.bpmnspec.runner.zeebe.ZeebeEnvironment + +interface EzeEnvironment: ZeebeEnvironment diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeTestRunner.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeTestRunner.kt new file mode 100644 index 0000000..f3a7bae --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeTestRunner.kt @@ -0,0 +1,15 @@ +package io.zeebe.bpmnspec.runner.zeebe.eze + +import io.zeebe.bpmnspec.runner.zeebe.AbstractTestRunner +import io.zeebe.bpmnspec.runner.zeebe.ZeebeTestContext +import mu.KLogging + +class EzeTestRunner( + ezeEnvironment: EzeEnvironment = DefaultEzeTestEnvironment(), + reuseEnvironment: Boolean = false, + beforeEachCallback: (ZeebeTestContext) -> Unit = {}, + afterEachCallback: (ZeebeTestContext) -> Unit = {} +): AbstractTestRunner(environment = ezeEnvironment, reuseEnvironment, beforeEachCallback, afterEachCallback, logger) { + + companion object: KLogging() +} diff --git a/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/zeeqs/ZeeqsZeebeEventRepository.kt b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/zeeqs/ZeeqsZeebeEventRepository.kt new file mode 100644 index 0000000..71ce224 --- /dev/null +++ b/zeebe-test-runner/src/main/kotlin/io/zeebe/bpmnspec/runner/zeebe/zeeqs/ZeeqsZeebeEventRepository.kt @@ -0,0 +1,71 @@ +package io.zeebe.bpmnspec.runner.zeebe.zeeqs + +import io.zeebe.bpmnspec.api.runner.ElementInstance +import io.zeebe.bpmnspec.api.runner.ElementInstanceState +import io.zeebe.bpmnspec.api.runner.Incident +import io.zeebe.bpmnspec.api.runner.IncidentState +import io.zeebe.bpmnspec.api.runner.ProcessInstanceState +import io.zeebe.bpmnspec.api.runner.ProcessInstanceVariable +import io.zeebe.bpmnspec.runner.zeebe.ZeebeEventRepository + +class ZeeqsZeebeEventRepository( + private val zeeqsClient: ZeeqsClient +) : ZeebeEventRepository { + + override fun getProcessInstanceKeys() = zeeqsClient.getProcessInstanceKeys() + + override fun getProcessInstanceState( + processInstanceKey: Long + ) = when (zeeqsClient.getProcessInstanceState(processInstanceKey)) { + "COMPLETED" -> ProcessInstanceState.COMPLETED + "TERMINATED" -> ProcessInstanceState.TERMINATED + "ACTIVATED" -> ProcessInstanceState.ACTIVATED + else -> ProcessInstanceState.UNKNOWN + } + + override fun getElementInstances( + processInstanceKey: Long + ) = zeeqsClient.getElementInstances(processInstanceKey = processInstanceKey) + .map { + ElementInstance( + elementId = it.elementId, + elementName = it.elementName, + state = when (it.state) { + "ACTIVATED" -> ElementInstanceState.ACTIVATED + "COMPLETED" -> ElementInstanceState.COMPLETED + "TERMINATED" -> ElementInstanceState.TERMINATED + "TAKEN" -> ElementInstanceState.TAKEN + else -> ElementInstanceState.UNKNOWN + } + ) + } + + override fun getProcessInstanceVariables( + processInstanceKey: Long + ) = zeeqsClient.getProcessInstanceVariables(processInstanceKey = processInstanceKey) + .map { + ProcessInstanceVariable( + variableName = it.name, + variableValue = it.value, + scopeElementId = it.scope?.elementId ?: "", + scopeElementName = it.scope?.elementName ?: "" + ) + } + + override fun getIncidents( + processInstanceKey: Long + ) = zeeqsClient.getIncidents(processInstanceKey = processInstanceKey) + .map { + Incident( + errorType = it.errorType, + errorMessage = it.errorMessage, + state = when (it.state) { + "CREATED" -> IncidentState.CREATED + "RESOLVED" -> IncidentState.RESOLVED + else -> IncidentState.UNKNOWN + }, + elementId = it.elementInstance?.elementId ?: "", + elementName = it.elementInstance?.elementName ?: "" + ) + } +} \ No newline at end of file diff --git a/zeebe-test-runner/src/test/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeTestRunnerTest.kt b/zeebe-test-runner/src/test/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeTestRunnerTest.kt new file mode 100644 index 0000000..c397de0 --- /dev/null +++ b/zeebe-test-runner/src/test/kotlin/io/zeebe/bpmnspec/runner/zeebe/eze/EzeTestRunnerTest.kt @@ -0,0 +1,52 @@ +package io.zeebe.bpmnspec.runner.zeebe.eze + +import io.zeebe.bpmnspec.ClasspathResourceResolver +import io.zeebe.bpmnspec.SpecRunner +import io.zeebe.bpmnspec.api.runner.ProcessInstanceState +import io.zeebe.bpmnspec.runner.zeebe.ZeebeTestRunnerTest +import org.assertj.core.api.Assertions +import org.awaitility.kotlin.await +import org.junit.jupiter.api.Test + +internal class EzeTestRunnerTest { + + private val resourceResolver = + ClasspathResourceResolver(classLoader = EzeTestRunnerTest::class.java.classLoader) + private val specRunner = SpecRunner( + testRunner = EzeTestRunner(), + resourceResolver = resourceResolver + ) + + @Test + internal fun `EzeRunner should work standalone`() { + + val runner = EzeTestRunner() + + runner.beforeEach() + + val bpmnXml = EzeTestRunnerTest::class.java.getResourceAsStream("/demo.bpmn") + runner.deployProcess("demo.bpmn", bpmnXml) + + val wfContext = runner.createProcessInstance("demo", "{}") + + runner.completeTask("a", "{}") + runner.completeTask("b", "{}") + runner.completeTask("c", "{}") + + await.untilAsserted { + Assertions.assertThat(runner.getProcessInstanceState(wfContext)) + .isEqualTo(ProcessInstanceState.COMPLETED) + } + + runner.afterEach() + } + + @Test + internal fun `Runner with EzeTestRunner should run the YAML spec`() { + + val spec = ZeebeTestRunnerTest::class.java.getResourceAsStream("/demo.yaml") + val result = specRunner.runSpec(spec) + + Assertions.assertThat(result.testResults).hasSize(1) + } +}