Skip to content

Commit

Permalink
Merge pull request #59 from strawhat5/hazelcast-configurable
Browse files Browse the repository at this point in the history
Ringbuffer and Exponential retry made user-configurable
  • Loading branch information
saig0 authored Sep 14, 2020
2 parents d57bceb + 5f8d93f commit 2fff90c
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 26 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,15 @@ By default, the port is set to `9000` and the database is only in-memory (i.e. n

```
zeebe:
hazelcast:
connection: localhost:5701
connectionTimeout: PT30S
client:
worker:
hazelcast:
connection: "localhost:5701"
connectionTimeout: "PT1M"
ringbuffer: "zeebe"
connectionInitialBackoff: "PT15S"
connectionBackoffMultiplier: 2.0
connectionMaxBackoff: "PT30S"
spring:
Expand Down
11 changes: 0 additions & 11 deletions app/src/main/kotlin/io/zeebe/zeeqs/HazelcastProperties.kt

This file was deleted.

12 changes: 5 additions & 7 deletions app/src/main/kotlin/io/zeebe/zeeqs/ZeeqlApplication.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.zeebe.zeeqs

import io.zeebe.zeeqs.importer.hazelcast.HazelcastImporter
import io.zeebe.zeeqs.importer.hazelcast.HazelcastProperties
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.cache.annotation.EnableCaching
import java.time.Duration
import javax.annotation.PostConstruct

@SpringBootApplication
Expand All @@ -20,14 +20,12 @@ class ZeeqlApplication(

@PostConstruct
fun init() {
val connection = hazelcastProperties.connection
val connectionTimeout = Duration.parse(hazelcastProperties.connectionTimeout)

logger.info("connect to Hazelcast: '$connection'")
hazelcastImporter.start(connection, connectionTimeout)
logger.info("Connecting to Hazelcast: '$hazelcastProperties'")
hazelcastImporter.start(hazelcastProperties)
logger.info("Connected to Hazelcast!")
}
}

fun main(args: Array<String>) {
runApplication<ZeeqlApplication>(*args)
}
}
11 changes: 11 additions & 0 deletions app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
server:
port: 9000

zeebe:
client:
worker:
hazelcast:
connection: "localhost:5701"
connectionTimeout: "PT1M"
ringbuffer: "zeebe"
connectionInitialBackoff: "PT15S"
connectionBackoffMultiplier: 2.0
connectionMaxBackoff: "PT30S"
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ class HazelcastImporter(

var zeebeHazelcast: ZeebeHazelcast? = null

fun start(hazelcastConnection: String, hazelcastConnectionTimeout: Duration) {
fun start(hazelcastProperties: HazelcastProperties) {

val hazelcastConnection = hazelcastProperties.connection
val hazelcastConnectionTimeout = Duration.parse(hazelcastProperties.connectionTimeout)
val hazelcastRingbuffer = hazelcastProperties.ringbuffer
val hazelcastConnectionInitialBackoff = Duration.parse(hazelcastProperties.connectionInitialBackoff)
val hazelcastConnectionBackoffMultiplier = hazelcastProperties.connectionBackoffMultiplier
val hazelcastConnectionMaxBackoff = Duration.parse(hazelcastProperties.connectionMaxBackoff)

val hazelcastConfig = hazelcastConfigRepository.findById(hazelcastConnection)
.orElse(HazelcastConfig(
Expand All @@ -46,10 +53,14 @@ class HazelcastImporter(

val connectionRetryConfig = clientConfig.connectionStrategyConfig.connectionRetryConfig
connectionRetryConfig.clusterConnectTimeoutMillis = hazelcastConnectionTimeout.toMillis()
// These retry configs can be user-configured in application.yml
connectionRetryConfig.initialBackoffMillis = hazelcastConnectionInitialBackoff.toMillis().toInt()
connectionRetryConfig.multiplier = hazelcastConnectionBackoffMultiplier
connectionRetryConfig.maxBackoffMillis = hazelcastConnectionMaxBackoff.toMillis().toInt()

val hazelcast = HazelcastClient.newHazelcastClient(clientConfig)

val builder = ZeebeHazelcast.newBuilder(hazelcast)
val builder = ZeebeHazelcast.newBuilder(hazelcast).name(hazelcastRingbuffer)
.addDeploymentListener { it.takeIf { it.metadata.key > 0 }?.let(this::importDeploymentRecord) }
.addWorkflowInstanceListener { it.takeIf { it.metadata.key > 0 }?.let(this::importWorkflowInstanceRecord) }
.addVariableListener { it.takeIf { it.metadata.key > 0 }?.let(this::importVariableRecord) }
Expand Down Expand Up @@ -490,4 +501,4 @@ class HazelcastImporter(

messageCorrelationRepository.save(entity)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.zeebe.zeeqs.importer.hazelcast

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding

@ConstructorBinding
@ConfigurationProperties("zeebe.client.worker.hazelcast")
data class HazelcastProperties(
val connection: String = "localhost:5701",
val connectionTimeout: String = "PT1M",
val ringbuffer: String = "zeebe",
val connectionInitialBackoff: String = "PT15S",
val connectionBackoffMultiplier: Double = 2.0,
val connectionMaxBackoff: String = "PT30S"
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.zeebe.containers.ZeebePort
import io.zeebe.model.bpmn.Bpmn
import io.zeebe.zeeqs.data.repository.WorkflowRepository
import io.zeebe.zeeqs.importer.hazelcast.HazelcastImporter
import io.zeebe.zeeqs.importer.hazelcast.HazelcastProperties
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -46,7 +47,9 @@ class HazelcastImporterTest(
fun `should import workflow`() {
// given
val port = zeebe.getMappedPort(hazelcastPort)
importer.start("localhost:$port", Duration.ofSeconds(10))
val hazelcastProperties = HazelcastProperties(
"localhost:$port", "PT10S", "zeebe")
importer.start(hazelcastProperties)

val client = ZeebeClient.newClientBuilder()
.brokerContactPoint(zeebe.getExternalAddress(ZeebePort.GATEWAY))
Expand Down Expand Up @@ -89,4 +92,3 @@ class HazelcastImporterTest(
class TestConfiguration
}


0 comments on commit 2fff90c

Please sign in to comment.