Skip to content

Commit

Permalink
add worker template
Browse files Browse the repository at this point in the history
  • Loading branch information
pihme committed Jan 5, 2022
1 parent 31aad87 commit 421ac20
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar
/.idea/
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,34 @@
# zeebe-worker-java-template
Minimal template for a Zeebe Java worker
# Zeebe Worker Java Template

Minimal template for a [Zeebe](https://github.com/camunda-cloud/zeebe)
Java [worker](https://docs.camunda.io/docs/components/concepts/job-workers/). This template adds only the bare minimum
of dependencies.

If you want something more convenient, have a look
at [Spring Zeebe](https://github.com/camunda-community-hub/spring-zeebe).

## How to Use

The main method is in `Worker.java`. It requires a couple of environment variables to run.

### Connection Setup

#### Connect to Cluster Camunda Cloud

1. Follow the [Getting Started Guid](https://docs.camunda.io/docs/guides/getting-started/) to create an account, a
cluster and client credentials
2. Use the client credentials to fill the following environment variables:
* `ZEEBE_ADDRESS`: Address where your cluster can be reached.
* `ZEEBE_CLIENT_ID` and `ZEEBE_CLIENT_SECRET`: Credentials to request a new access token.
* `ZEEBE_AUTHORIZATION_SERVER_URL`: A new token can be requested at this address, using the credentials.
3. Run `Worker`

#### Connect to local Installation

For a local installation (without authentication) you only need to set `ZEEBE_ADDRESS`

### Workflow

Either you deploy `process.bpmn` or you design your own process with a service task with the `greet` job type.


44 changes: 44 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.camunda</groupId>
<artifactId>zeebe-worker-java-template</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<zeebe.version>1.2.9</zeebe.version>
<junit.jupiter.version>5.8.1</junit.jupiter.version>
</properties>

<dependencies>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-client-java</artifactId>
<version>${zeebe.version}</version>
</dependency>
<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zeebe-worker-java-testutils</artifactId>
<version>${zeebe.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
47 changes: 47 additions & 0 deletions process.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1shoozx" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="4.11.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="1.1.0">
<bpmn:process id="greet-process" name="Greet" isExecutable="true">
<bpmn:startEvent id="start-event" name="start">
<bpmn:outgoing>Flow_16yhcf2</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_16yhcf2" sourceRef="start-event" targetRef="greet-task" />
<bpmn:serviceTask id="greet-task" name="Greet Task">
<bpmn:extensionElements>
<zeebe:taskDefinition type="greet" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_16yhcf2</bpmn:incoming>
<bpmn:outgoing>Flow_1b2mrjt</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:endEvent id="end-event" name="end">
<bpmn:incoming>Flow_1b2mrjt</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1b2mrjt" sourceRef="greet-task" targetRef="end-event" />
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="greet-process">
<bpmndi:BPMNEdge id="Flow_16yhcf2_di" bpmnElement="Flow_16yhcf2">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1b2mrjt_di" bpmnElement="Flow_1b2mrjt">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="start-event">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="186" y="142" width="22" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1gst4kg_di" bpmnElement="greet-task">
<dc:Bounds x="270" y="77" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0ryqdq5_di" bpmnElement="end-event">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="441" y="142" width="19" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
90 changes: 90 additions & 0 deletions src/main/java/Worker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.worker.JobWorker;

import java.util.Map;
import java.util.concurrent.CountDownLatch;

/**
* Example application that connects to a cluster on Camunda Cloud, or a locally deployed cluster.
*
* <p>When connecting to a cluster in Camunda Cloud, this application assumes that the following
* environment variables are set:
*
* <ul>
* <li>ZEEBE_ADDRESS
* <li>ZEEBE_CLIENT_ID (implicitly required by {@code ZeebeClient} if authorization is enabled)
* <li>ZEEBE_CLIENT_SECRET (implicitly required by {@code ZeebeClient} if authorization is enabled)
* <li>ZEEBE_AUTHORIZATION_SERVER_URL (implicitly required by {@code ZeebeClient} if authorization is enabled)
* </ul>
*
* <p><strong>Hint:</strong> When you create client credentials in Camunda Cloud you have the option
* to download a file with above lines filled out for you.
*
* <p>When connecting to a local cluster, you only need to set {@code ZEEBE_ADDRESS}.
* This application also assumes that authentication is disabled for a locally deployed clusterL
*/
public class Worker {

private static final String JOB_TYPE = "greet";

public static void main(String[] args) throws InterruptedException {
System.out.println("Starting worker...");

final String zeebeAddress = getEnvironmentVariable("ZEEBE_ADDRESS");

System.out.println("Connecting to " + zeebeAddress);
ZeebeClient client = createZeebeClient(zeebeAddress);

System.out.println("Registering worker for jobType:" + JOB_TYPE);
final JobWorker jobWorker = client.newWorker().jobType(JOB_TYPE).handler(new WorkerJobHandler()).open();

final CountDownLatch countDownLatch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(
new Thread(() -> {

System.out.println("Closing worker for jobType:" + JOB_TYPE);
jobWorker.close();

System.out.println("Closing client connected to " + zeebeAddress);
client.close();

System.out.println("Worker Shutdown Complete");
countDownLatch.countDown();
})
);

countDownLatch.await();
}

private static ZeebeClient createZeebeClient(String gatewayAddress) {
if (gatewayAddress.contains("zeebe.camunda.io")) {
checkEnvVars("ZEEBE_CLIENT_ID", "ZEEBE_CLIENT_SECRET", "ZEEBE_AUTHORIZATION_SERVER_URL");
/* Connect to Camunda Cloud Cluster, assumes that credentials are set in environment variables.
* See JavaDoc on class level for details
*/
return ZeebeClient.newClientBuilder().gatewayAddress(gatewayAddress).build();
} else {
// connect to local deployment; assumes that authentication is disabled
return ZeebeClient.newClientBuilder().gatewayAddress(gatewayAddress).usePlaintext().build();
}
}

private static String getEnvironmentVariable(final String key) {
checkEnvVars(key);

final Map<String, String> envVars = System.getenv();

return envVars.get(key);
}

private static void checkEnvVars(String... keys) {
final Map<String, String> envVars = System.getenv();

for (String key : keys) {
if (!envVars.containsKey(key)) {
throw new IllegalStateException("Unable to find mandatory environment variable " + key);
}
}
}
}
18 changes: 18 additions & 0 deletions src/main/java/WorkerJobHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.JobClient;
import io.camunda.zeebe.client.api.worker.JobHandler;

import java.util.Collections;

public class WorkerJobHandler implements JobHandler {

@Override
public void handle(JobClient client, ActivatedJob job) {
final String greeting = job.getCustomHeaders().getOrDefault("greeting", "Hello");
final String name = (String) job.getVariablesAsMap().getOrDefault("name", "Zeebe user");

final String message = String.format("%s %s!", greeting, name);

client.newCompleteCommand(job.getKey()).variables(Collections.singletonMap("message", message)).send().join();
}
}
42 changes: 42 additions & 0 deletions src/test/java/WorkerJobHandlerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import org.camunda.community.zeebe.testutils.stubs.ActivatedJobStub;
import org.camunda.community.zeebe.testutils.stubs.JobClientStub;
import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.assertj.core.data.MapEntry.entry;
import static org.camunda.community.zeebe.testutils.ZeebeWorkerAssertions.assertThat;

class WorkerJobHandlerTest {

private final WorkerJobHandler sutJubHandler = new WorkerJobHandler();

@Test
public void testDefaultBehavior() {
// given
final JobClientStub jobClient = new JobClientStub();
final ActivatedJobStub activatedJob = jobClient.createActivatedJob();

// when
sutJubHandler.handle(jobClient, activatedJob);

// then
assertThat(activatedJob).completed().extractingOutput().containsOnly(entry("message", "Hello Zeebe user!"));
}

@Test
public void testMessageGeneration() {
// given
final JobClientStub jobClient = new JobClientStub();
final ActivatedJobStub activatedJob = jobClient.createActivatedJob();

activatedJob.setCustomHeaders(Collections.singletonMap("greeting", "Howdy"));
activatedJob.setInputVariables(Collections.singletonMap("name", "ladies and gentlemen"));

// when
sutJubHandler.handle(jobClient, activatedJob);

// then
assertThat(activatedJob).completed().extractingOutput().containsOnly(entry("message", "Howdy ladies and gentlemen!"));
}
}

0 comments on commit 421ac20

Please sign in to comment.