From 56ab5a949c9dae4ae245b6e761a0373c1f134fe9 Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Fri, 21 Dec 2018 11:19:49 +0100 Subject: [PATCH] feat(worker): context is available for scripting * the context contains the current job and the Zeebe client * get rid of Spring Boot --- README.md | 13 +- pom.xml | 58 ++++++--- .../java/io/zeebe/script/ScriptEvaluator.java | 2 - .../io/zeebe/script/ScriptJobHandler.java | 24 ++-- .../io/zeebe/script/ZeebeScriptWorker.java | 49 +++++++ .../script/ZeebeScriptWorkerApplication.java | 41 +++--- src/main/resources/log4j2.xml | 19 +++ .../java/io/zeebe/script/WorkflowTest.java | 120 ++++++++++++++++++ 8 files changed, 268 insertions(+), 58 deletions(-) create mode 100644 src/main/java/io/zeebe/script/ZeebeScriptWorker.java create mode 100644 src/main/resources/log4j2.xml create mode 100644 src/test/java/io/zeebe/script/WorkflowTest.java diff --git a/README.md b/README.md index f3e0efd..895a63d 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,21 @@ # zeebe-script-worker -This is a Zeebe worker to evaluate scripts. Scripts are useful to create/modify the payload, to do (simple) calculations or for prototyping. +A Zeebe worker to evaluate scripts. Scripts are useful to create/modify the payload, to do (simple) calculations or for prototyping. * the worker is registered for the type `script` * required custom headers: * `language` (String) - the name of the script language * `script` (String) - the script to evaluate * output payload contains `result` - the result of the evaluation +* available context in script: + * `job` (ActivatedJob) - the current job + * `zeebeClient` (ZeebeClient) - the client of the worker Available script languages: * javascript (Oracle Nashorn) * [groovy](http://groovy-lang.org/) * [feel](https://github.com/camunda/feel-scala) -_This is a community project meant for playing around with Zeebe. It is not officially supported by the Zeebe Team (i.e. no gurantees). Everybody is invited to contribute!_ - ## Usage The service task: @@ -55,11 +56,9 @@ Execute the JAR file via ## How to configure -The worker can be configured via environment variables or a properties file `application.properties`. +You can set the following environment variables to configure the worker. -``` -zeebe.client.broker.contactPoint=127.0.0.1:26500 -``` +* `zeebe.client.broker.contactPoint`- default: `127.0.0.1:26500` ## Examples Some examples for common use cases: diff --git a/pom.xml b/pom.xml index 3f468b1..de8cd4a 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,6 @@ - + 4.0.0 Zeebe Script Worker @@ -38,13 +40,6 @@ import pom - - org.springframework.boot - spring-boot-dependencies - 2.0.3.RELEASE - pom - import - @@ -55,11 +50,6 @@ zeebe-client-java - - org.springframework.boot - spring-boot-starter - - org.codehaus.groovy @@ -73,11 +63,25 @@ ${version.feel} + + + org.apache.logging.log4j + log4j-core + 2.11.1 + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.11.1 + + junit junit test + 4.12 @@ -87,25 +91,43 @@ test + + io.zeebe + zeebe-test + test + + - org.springframework.boot - spring-boot-maven-plugin - 2.0.3.RELEASE + maven-assembly-plugin + + + jar-with-dependencies + + + + io.zeebe.script.ZeebeScriptWorkerApplication + + + ${project.artifactId}-${project.version} + false + + make-assembly + package - repackage + single - + zeebe diff --git a/src/main/java/io/zeebe/script/ScriptEvaluator.java b/src/main/java/io/zeebe/script/ScriptEvaluator.java index bf27c84..4b6540d 100644 --- a/src/main/java/io/zeebe/script/ScriptEvaluator.java +++ b/src/main/java/io/zeebe/script/ScriptEvaluator.java @@ -22,9 +22,7 @@ import javax.script.ScriptEngine; import javax.script.ScriptEngineManager; import javax.script.ScriptException; -import org.springframework.stereotype.Component; -@Component public class ScriptEvaluator { private final ScriptEngineManager scriptEngineManager = new ScriptEngineManager(); diff --git a/src/main/java/io/zeebe/script/ScriptJobHandler.java b/src/main/java/io/zeebe/script/ScriptJobHandler.java index 68d212e..4b8e9fb 100644 --- a/src/main/java/io/zeebe/script/ScriptJobHandler.java +++ b/src/main/java/io/zeebe/script/ScriptJobHandler.java @@ -15,24 +15,28 @@ */ package io.zeebe.script; +import io.zeebe.client.ZeebeClient; import io.zeebe.client.api.clients.JobClient; import io.zeebe.client.api.response.ActivatedJob; import io.zeebe.client.api.subscription.JobHandler; import java.util.Collections; import java.util.Map; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -@Component public class ScriptJobHandler implements JobHandler { private static final String HEADER_LANGUAGE = "language"; private static final String HEADER_SCRIPT = "script"; - @Autowired private ScriptEvaluator scriptEvaluator; + private final ScriptEvaluator scriptEvaluator = new ScriptEvaluator(); + + private final ZeebeClient zeebeClient; + + public ScriptJobHandler(ZeebeClient zeebeClient) { + this.zeebeClient = zeebeClient; + } @Override - public void handle(JobClient client, ActivatedJob job) { + public void handle(JobClient jobClient, ActivatedJob job) { final Map customHeaders = job.getCustomHeaders(); final String language = getLanguage(customHeaders); @@ -40,9 +44,13 @@ public void handle(JobClient client, ActivatedJob job) { final Map payload = job.getPayloadAsMap(); + // add context + payload.put("job", job); + payload.put("zeebeClient", zeebeClient); + final Object result = scriptEvaluator.evaluate(language, script, payload); - client + jobClient .newCompleteCommand(job.getKey()) .payload(Collections.singletonMap("result", result)) .send(); @@ -52,7 +60,7 @@ private String getLanguage(Map customHeaders) { final Object language = customHeaders.get(HEADER_LANGUAGE); if (language == null) { throw new RuntimeException( - String.format("Missing required custom header '%'", HEADER_LANGUAGE)); + String.format("Missing required custom header '%s'", HEADER_LANGUAGE)); } else { return String.valueOf(language); } @@ -62,7 +70,7 @@ private String getScript(Map customHeaders) { final Object script = customHeaders.get(HEADER_SCRIPT); if (script == null) { throw new RuntimeException( - String.format("Missing required custom header '%'", HEADER_SCRIPT)); + String.format("Missing required custom header '%s'", HEADER_SCRIPT)); } else { return String.valueOf(script); } diff --git a/src/main/java/io/zeebe/script/ZeebeScriptWorker.java b/src/main/java/io/zeebe/script/ZeebeScriptWorker.java new file mode 100644 index 0000000..e6c08fb --- /dev/null +++ b/src/main/java/io/zeebe/script/ZeebeScriptWorker.java @@ -0,0 +1,49 @@ +/* + * Copyright © 2017 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.script; + +import io.zeebe.client.ZeebeClient; +import io.zeebe.client.api.subscription.JobWorker; +import java.time.Duration; + +public class ZeebeScriptWorker { + + private final String contactPoint; + + private JobWorker jobWorker; + + public ZeebeScriptWorker(String contactPoint) { + this.contactPoint = contactPoint; + } + + public void start() { + final ZeebeClient client = + ZeebeClient.newClientBuilder() + .brokerContactPoint(contactPoint) + .defaultJobWorkerName("script-worker") + .defaultJobTimeout(Duration.ofSeconds(10)) + .build(); + + final ScriptJobHandler jobHandler = new ScriptJobHandler(client); + jobWorker = client.jobClient().newWorker().jobType("script").handler(jobHandler).open(); + } + + public void stop() { + if (jobWorker != null) { + jobWorker.close(); + } + } +} diff --git a/src/main/java/io/zeebe/script/ZeebeScriptWorkerApplication.java b/src/main/java/io/zeebe/script/ZeebeScriptWorkerApplication.java index 091abae..581090f 100644 --- a/src/main/java/io/zeebe/script/ZeebeScriptWorkerApplication.java +++ b/src/main/java/io/zeebe/script/ZeebeScriptWorkerApplication.java @@ -15,36 +15,31 @@ */ package io.zeebe.script; -import io.zeebe.client.ZeebeClient; -import java.time.Duration; -import javax.annotation.PostConstruct; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ZeebeScriptWorkerApplication { - @Value("${zeebe.client.broker.contactPoint:127.0.0.1:26500}") - private String contactPoint; + public static final String ENV_CONTACT_POINT = "zeebe.client.broker.contactPoint"; + private static final String DEFAULT_CONTACT_POINT = "127.0.0.1:26500"; - @Autowired private ScriptJobHandler jobHandler; + private static Logger LOG = LoggerFactory.getLogger("zeebe-script-worker"); public static void main(String[] args) { - SpringApplication.run(ZeebeScriptWorkerApplication.class, args); - } - @PostConstruct - public void start() { + final String contactPoint = + Optional.ofNullable(System.getenv(ENV_CONTACT_POINT)).orElse(DEFAULT_CONTACT_POINT); + + LOG.info("Connecting worker to {}", contactPoint); - final ZeebeClient client = - ZeebeClient.newClientBuilder() - .brokerContactPoint(contactPoint) - .defaultJobWorkerName("script-worker") - .defaultJobTimeout(Duration.ofSeconds(10)) - .build(); + final ZeebeScriptWorker worker = new ZeebeScriptWorker(contactPoint); + worker.start(); - client.jobClient().newWorker().jobType("script").handler(jobHandler).open(); + try { + new CountDownLatch(1).await(); + } catch (InterruptedException e) { + } } } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..6256e04 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + + + diff --git a/src/test/java/io/zeebe/script/WorkflowTest.java b/src/test/java/io/zeebe/script/WorkflowTest.java new file mode 100644 index 0000000..8e578a3 --- /dev/null +++ b/src/test/java/io/zeebe/script/WorkflowTest.java @@ -0,0 +1,120 @@ +package io.zeebe.script; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.zeebe.client.ZeebeClient; +import io.zeebe.client.api.events.WorkflowInstanceEvent; +import io.zeebe.exporter.record.value.MessageRecordValue; +import io.zeebe.model.bpmn.Bpmn; +import io.zeebe.model.bpmn.BpmnModelInstance; +import io.zeebe.protocol.intent.MessageIntent; +import io.zeebe.test.ZeebeTestRule; +import io.zeebe.test.util.record.RecordingExporter; +import java.util.Collections; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowTest { + + @Rule public final ZeebeTestRule testRule = new ZeebeTestRule(); + + private ZeebeClient client; + private ZeebeScriptWorker worker; + + @Before + public void init() { + client = testRule.getClient(); + + worker = new ZeebeScriptWorker(client.getConfiguration().getBrokerContactPoint()); + worker.start(); + } + + @After + public void cleanUp() { + worker.stop(); + } + + @Test + public void shouldReturnResult() { + + final BpmnModelInstance workflow = + Bpmn.createExecutableProcess("process") + .startEvent() + .serviceTask( + "task", t -> t.zeebeTaskType("script").zeebeTaskHeader("language", "groovy")) + .zeebeTaskHeader("script", "x + 1") + .done(); + + final WorkflowInstanceEvent workflowInstance = + deployAndCreateInstance(workflow, Collections.singletonMap("x", 2)); + + ZeebeTestRule.assertThat(workflowInstance).isEnded().hasElementPayload("task", "result", 3); + } + + @Test + public void shouldGetCurrentJob() { + + final BpmnModelInstance workflow = + Bpmn.createExecutableProcess("process") + .startEvent() + .serviceTask( + "task", t -> t.zeebeTaskType("script").zeebeTaskHeader("language", "groovy")) + .zeebeTaskHeader("script", "job.headers.workflowInstanceKey") + .done(); + + final WorkflowInstanceEvent workflowInstance = + deployAndCreateInstance(workflow, Collections.emptyMap()); + + ZeebeTestRule.assertThat(workflowInstance) + .isEnded() + .hasElementPayload("task", "result", (int) workflowInstance.getWorkflowInstanceKey()); + } + + @Test + public void shouldUseZeebeClient() { + + final BpmnModelInstance workflow = + Bpmn.createExecutableProcess("process") + .startEvent() + .serviceTask( + "task", t -> t.zeebeTaskType("script").zeebeTaskHeader("language", "groovy")) + .zeebeTaskHeader( + "script", + "zeebeClient.workflowClient().newPublishMessageCommand().messageName('foo').correlationKey('bar').send().join()") + .done(); + + final WorkflowInstanceEvent workflowInstance = + deployAndCreateInstance(workflow, Collections.emptyMap()); + + ZeebeTestRule.assertThat(workflowInstance).isEnded(); + + final MessageRecordValue publishedMessage = + RecordingExporter.messageRecords(MessageIntent.PUBLISHED).getFirst().getValue(); + assertThat(publishedMessage.getName()).isEqualTo("foo"); + assertThat(publishedMessage.getCorrelationKey()).isEqualTo("bar"); + } + + private WorkflowInstanceEvent deployAndCreateInstance( + final BpmnModelInstance workflow, Map payload) { + client + .workflowClient() + .newDeployCommand() + .addWorkflowModel(workflow, "process.bpmn") + .send() + .join(); + + final WorkflowInstanceEvent workflowInstance = + client + .workflowClient() + .newCreateInstanceCommand() + .bpmnProcessId("process") + .latestVersion() + .payload(payload) + .send() + .join(); + return workflowInstance; + } +}