From 45655e43f117b03ea78351cacd2fe5c8f47efa4c Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Mon, 3 Apr 2023 09:51:21 +0200 Subject: [PATCH] feat: add signals + subscriptions to GraphQL API Expose signals and signal subscriptions via GraphQL API. --- .../io/zeebe/zeeqs/data/entity/Signal.kt | 13 ++ .../zeeqs/data/entity/SignalSubscription.kt | 17 ++ .../data/entity/SignalSubscriptionState.kt | 6 + .../zeebe/zeeqs/data/entity/SignalVariable.kt | 15 ++ .../zeeqs/data/repository/SignalRepository.kt | 10 + .../SignalSubscriptionRepository.kt | 17 ++ .../repository/SignalVariableRepository.kt | 12 ++ .../zeeqs/data/service/BpmnElementMetadata.kt | 17 +- .../zeeqs/data/service/ProcessService.kt | 181 ++++++++++-------- .../resolvers/connection/SignalConnection.kt | 8 + .../connection/SignalConnectionResolver.kt | 20 ++ .../resolvers/query/SignalQueryResolver.kt | 37 ++++ .../graphql/resolvers/type/ProcessResolver.kt | 80 +++++--- .../graphql/resolvers/type/SignalResolver.kt | 28 +++ .../type/SignalSubscriptionResolver.kt | 37 ++++ .../main/resources/graphql/Element.graphqls | 2 + .../main/resources/graphql/Process.graphqls | 2 + .../main/resources/graphql/Signal.graphqls | 35 ++++ .../graphql/SignalSubscription.graphqls | 23 +++ .../importer/hazelcast/HazelcastImporter.kt | 35 +--- .../hazelcast/HazelcastSignalImporter.kt | 83 ++++++++ .../importer/hazelcast/ProtobufTransformer.kt | 31 +++ 22 files changed, 561 insertions(+), 148 deletions(-) create mode 100644 data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Signal.kt create mode 100644 data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscription.kt create mode 100644 data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscriptionState.kt create mode 100644 data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalVariable.kt create mode 100644 data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalRepository.kt create mode 100644 data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalSubscriptionRepository.kt create mode 100644 data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalVariableRepository.kt create mode 100644 graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnection.kt create mode 100644 graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnectionResolver.kt create mode 100644 graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/SignalQueryResolver.kt create mode 100644 graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalResolver.kt create mode 100644 graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalSubscriptionResolver.kt create mode 100644 graphql-api/src/main/resources/graphql/Signal.graphqls create mode 100644 graphql-api/src/main/resources/graphql/SignalSubscription.graphqls create mode 100644 hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastSignalImporter.kt create mode 100644 hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/ProtobufTransformer.kt diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Signal.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Signal.kt new file mode 100644 index 00000000..138d8662 --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Signal.kt @@ -0,0 +1,13 @@ +package io.zeebe.zeeqs.data.entity + +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id + +@Entity +class Signal( + @Id @Column(name = "key_") val key: Long, + val position: Long, + val name: String, + var timestamp: Long +) \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscription.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscription.kt new file mode 100644 index 00000000..0a9bc0bb --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscription.kt @@ -0,0 +1,17 @@ +package io.zeebe.zeeqs.data.entity + +import javax.persistence.* + +@Entity +class SignalSubscription( + @Id @Column(name = "key_") val key: Long, + val position: Long, + val signalName: String, + val processDefinitionKey: Long, + val elementId: String +) { + + @Enumerated(EnumType.STRING) + var state: SignalSubscriptionState = SignalSubscriptionState.CREATED + var timestamp: Long = -1 +} \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscriptionState.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscriptionState.kt new file mode 100644 index 00000000..8e6125f9 --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalSubscriptionState.kt @@ -0,0 +1,6 @@ +package io.zeebe.zeeqs.data.entity + +enum class SignalSubscriptionState { + CREATED, + DELETED +} \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalVariable.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalVariable.kt new file mode 100644 index 00000000..4d7f8b1d --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalVariable.kt @@ -0,0 +1,15 @@ +package io.zeebe.zeeqs.data.entity + +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.Lob + +@Entity +class SignalVariable( + @Id val id: String, + val name: String, + @Lob @Column(name = "value_") val value: String, + val signalKey: Long, + val position: Long +) \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalRepository.kt new file mode 100644 index 00000000..3317b504 --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalRepository.kt @@ -0,0 +1,10 @@ +package io.zeebe.zeeqs.data.repository + +import io.zeebe.zeeqs.data.entity.Signal +import org.springframework.data.repository.PagingAndSortingRepository +import org.springframework.stereotype.Repository + +@Repository +interface SignalRepository : PagingAndSortingRepository { + +} \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalSubscriptionRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalSubscriptionRepository.kt new file mode 100644 index 00000000..49dbe341 --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalSubscriptionRepository.kt @@ -0,0 +1,17 @@ +package io.zeebe.zeeqs.data.repository + +import io.zeebe.zeeqs.data.entity.SignalSubscription +import org.springframework.data.repository.PagingAndSortingRepository +import org.springframework.stereotype.Repository + +@Repository +interface SignalSubscriptionRepository : PagingAndSortingRepository { + + fun findByProcessDefinitionKey(processDefinitionKey: Long): List + + fun findByProcessDefinitionKeyAndSignalName( + processDefinitionKey: Long, + signalName: String + ): SignalSubscription? + +} \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalVariableRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalVariableRepository.kt new file mode 100644 index 00000000..00b6a4a0 --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/SignalVariableRepository.kt @@ -0,0 +1,12 @@ +package io.zeebe.zeeqs.data.repository + +import io.zeebe.zeeqs.data.entity.SignalVariable +import org.springframework.data.repository.PagingAndSortingRepository +import org.springframework.stereotype.Repository + +@Repository +interface SignalVariableRepository : PagingAndSortingRepository { + + fun findBySignalKey(signalKey: Long): List + +} \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/service/BpmnElementMetadata.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/service/BpmnElementMetadata.kt index 080c815e..1e4499af 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/service/BpmnElementMetadata.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/service/BpmnElementMetadata.kt @@ -1,12 +1,13 @@ package io.zeebe.zeeqs.data.service data class BpmnElementMetadata( - val jobType: String? = null, - val conditionExpression: String? = null, - val timerDefinition: String? = null, - val errorCode: String? = null, - val calledProcessId: String? = null, - val messageSubscriptionDefinition: MessageSubscriptionDefinition? = null, - val userTaskAssignmentDefinition: UserTaskAssignmentDefinition? = null, - val userTaskForm: UserTaskForm? = null + val jobType: String? = null, + val conditionExpression: String? = null, + val timerDefinition: String? = null, + val errorCode: String? = null, + val calledProcessId: String? = null, + val messageSubscriptionDefinition: MessageSubscriptionDefinition? = null, + val userTaskAssignmentDefinition: UserTaskAssignmentDefinition? = null, + val userTaskForm: UserTaskForm? = null, + val signalName: String? = null ) diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/service/ProcessService.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/service/ProcessService.kt index 70961f9e..794a692f 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/service/ProcessService.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/service/ProcessService.kt @@ -20,23 +20,25 @@ class ProcessService(val processRepository: ProcessRepository) { @Cacheable(cacheNames = ["bpmnElementInfo"]) fun getBpmnElementInfo(processDefinitionKey: Long): Map? { return getBpmnModel(processDefinitionKey) - ?.let { it.getModelElementsByType(FlowElement::class.java) } - ?.map { flowElement -> - Pair(flowElement.id, BpmnElementInfo( - elementId = flowElement.id, - elementName = flowElement.name, - elementType = getBpmnElementType(flowElement), - metadata = getMetadata(flowElement) - )) - } - ?.toMap() + ?.let { it.getModelElementsByType(FlowElement::class.java) } + ?.map { flowElement -> + Pair( + flowElement.id, BpmnElementInfo( + elementId = flowElement.id, + elementName = flowElement.name, + elementType = getBpmnElementType(flowElement), + metadata = getMetadata(flowElement) + ) + ) + } + ?.toMap() } private fun getBpmnModel(processDefinitionKey: Long): BpmnModelInstance? { return processRepository.findByIdOrNull(processDefinitionKey) - ?.bpmnXML - ?.byteInputStream() - ?.let { Bpmn.readModelFromStream(it) } + ?.bpmnXML + ?.byteInputStream() + ?.let { Bpmn.readModelFromStream(it) } } private fun getBpmnElementType(element: FlowElement): BpmnElementType { @@ -66,97 +68,106 @@ class ProcessService(val processRepository: ProcessRepository) { } private fun getBpmnSubprocessType(element: FlowElement) = - if (element is SubProcess) { - if (element.triggeredByEvent()) { - BpmnElementType.EVENT_SUB_PROCESS - } else { - BpmnElementType.SUB_PROCESS - } + if (element is SubProcess) { + if (element.triggeredByEvent()) { + BpmnElementType.EVENT_SUB_PROCESS } else { - BpmnElementType.UNKNOWN + BpmnElementType.SUB_PROCESS } + } else { + BpmnElementType.UNKNOWN + } private fun getMetadata(element: FlowElement): BpmnElementMetadata { return BpmnElementMetadata( - jobType = element - .getSingleExtensionElement(ZeebeTaskDefinition::class.java) - ?.type, - conditionExpression = when (element) { - is SequenceFlow -> element.conditionExpression?.textContent - else -> null - }, - timerDefinition = when (element) { - is CatchEvent -> element.eventDefinitions - ?.filterIsInstance(TimerEventDefinition::class.java) - ?.firstOrNull() - ?.let { it.timeCycle ?: it.timeDate ?: it.timeDuration } - ?.textContent - - else -> null - }, - errorCode = when (element) { - is CatchEvent -> element.eventDefinitions - ?.filterIsInstance(ErrorEventDefinition::class.java) - ?.firstOrNull() - ?.error - ?.errorCode - - else -> null + jobType = element + .getSingleExtensionElement(ZeebeTaskDefinition::class.java) + ?.type, + conditionExpression = when (element) { + is SequenceFlow -> element.conditionExpression?.textContent + else -> null + }, + timerDefinition = when (element) { + is CatchEvent -> element.eventDefinitions + ?.filterIsInstance(TimerEventDefinition::class.java) + ?.firstOrNull() + ?.let { it.timeCycle ?: it.timeDate ?: it.timeDuration } + ?.textContent + + else -> null + }, + errorCode = when (element) { + is CatchEvent -> element.eventDefinitions + ?.filterIsInstance(ErrorEventDefinition::class.java) + ?.firstOrNull() + ?.error + ?.errorCode + + else -> null + }, + calledProcessId = element + .getSingleExtensionElement(ZeebeCalledElement::class.java) + ?.processId, + messageSubscriptionDefinition = when (element) { + is CatchEvent -> element.eventDefinitions + ?.filterIsInstance(MessageEventDefinition::class.java) + ?.firstOrNull() + ?.message + ?.let { + MessageSubscriptionDefinition( + messageName = it.name, + messageCorrelationKey = it + .getSingleExtensionElement(ZeebeSubscription::class.java) + ?.correlationKey + ) + } + + else -> null + }, + userTaskAssignmentDefinition = element + .getSingleExtensionElement(ZeebeAssignmentDefinition::class.java) + ?.let { + UserTaskAssignmentDefinition( + assignee = it.assignee, + candidateGroups = it.candidateGroups + ) }, - calledProcessId = element - .getSingleExtensionElement(ZeebeCalledElement::class.java) - ?.processId, - messageSubscriptionDefinition = when (element) { - is CatchEvent -> element.eventDefinitions - ?.filterIsInstance(MessageEventDefinition::class.java) - ?.firstOrNull() - ?.message - ?.let { - MessageSubscriptionDefinition( - messageName = it.name, - messageCorrelationKey = it - .getSingleExtensionElement(ZeebeSubscription::class.java) - ?.correlationKey - ) - } - - else -> null + userTaskForm = element + .getSingleExtensionElement(ZeebeFormDefinition::class.java) + ?.formKey + ?.let { formKey -> + UserTaskForm( + key = formKey, + resource = getForm( + model = element.modelInstance, + formKey = formKey + ) + ) }, - userTaskAssignmentDefinition = element - .getSingleExtensionElement(ZeebeAssignmentDefinition::class.java) - ?.let { - UserTaskAssignmentDefinition( - assignee = it.assignee, - candidateGroups = it.candidateGroups - ) - }, - userTaskForm = element - .getSingleExtensionElement(ZeebeFormDefinition::class.java) - ?.formKey - ?.let { formKey -> - UserTaskForm( - key = formKey, - resource = getForm( - model = element.modelInstance, - formKey = formKey - ) - ) - } + signalName = when (element) { + is CatchEvent -> element.eventDefinitions + ?.filterIsInstance(SignalEventDefinition::class.java) + ?.firstOrNull() + ?.signal + ?.name + + else -> null + } ) } @Cacheable(cacheNames = ["userTaskForm"]) fun getForm(processDefinitionKey: Long, formKey: String): String? { return getBpmnModel(processDefinitionKey) - ?.let { getForm(model = it, formKey = formKey) } + ?.let { getForm(model = it, formKey = formKey) } } private fun getForm(model: ModelInstance, formKey: String): String? { val normalizedFormKey = formKey.replace(CAMUNDA_FORM_KEY_PREFIX, "") return model.getModelElementsByType(ZeebeUserTaskForm::class.java) - ?.firstOrNull { it.id == normalizedFormKey } - ?.textContent + ?.firstOrNull { it.id == normalizedFormKey } + ?.textContent } } diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnection.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnection.kt new file mode 100644 index 00000000..3c5a3f0e --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnection.kt @@ -0,0 +1,8 @@ +package io.zeebe.zeeqs.graphql.resolvers.connection + +import io.zeebe.zeeqs.data.entity.Signal + +class SignalConnection( + val getItems: () -> List, + val getCount: () -> Long +) \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnectionResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnectionResolver.kt new file mode 100644 index 00000000..fda36d08 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/SignalConnectionResolver.kt @@ -0,0 +1,20 @@ +package io.zeebe.zeeqs.graphql.resolvers.connection + +import io.zeebe.zeeqs.data.entity.Signal +import org.springframework.graphql.data.method.annotation.SchemaMapping +import org.springframework.stereotype.Controller + +@Controller +class SignalConnectionResolver { + + @SchemaMapping(typeName = "SignalConnection", field = "nodes") + fun nodes(connection: SignalConnection): List { + return connection.getItems() + } + + @SchemaMapping(typeName = "SignalConnection", field = "totalCount") + fun totalCount(connection: SignalConnection): Long { + return connection.getCount() + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/SignalQueryResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/SignalQueryResolver.kt new file mode 100644 index 00000000..1c19d94d --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/SignalQueryResolver.kt @@ -0,0 +1,37 @@ +package io.zeebe.zeeqs.graphql.resolvers.query + +import io.zeebe.zeeqs.data.entity.Signal +import io.zeebe.zeeqs.data.repository.SignalRepository +import io.zeebe.zeeqs.graphql.resolvers.connection.SignalConnection +import org.springframework.data.domain.PageRequest +import org.springframework.data.repository.findByIdOrNull +import org.springframework.graphql.data.method.annotation.Argument +import org.springframework.graphql.data.method.annotation.QueryMapping +import org.springframework.stereotype.Controller + +@Controller +class SignalQueryResolver( + val signalRepository: SignalRepository +) { + + @QueryMapping + fun signals( + @Argument perPage: Int, + @Argument page: Int + ): SignalConnection { + return SignalConnection( + getItems = { + signalRepository.findAll(PageRequest.of(page, perPage)).toList() + }, + getCount = { + signalRepository.count() + } + ) + } + + @QueryMapping + fun signal(@Argument key: Long): Signal? { + return signalRepository.findByIdOrNull(key) + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/ProcessResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/ProcessResolver.kt index dcbe1b5f..2a10a279 100644 --- a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/ProcessResolver.kt +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/ProcessResolver.kt @@ -3,6 +3,7 @@ package io.zeebe.zeeqs.graphql.resolvers.type import io.zeebe.zeeqs.data.entity.* import io.zeebe.zeeqs.data.repository.MessageSubscriptionRepository import io.zeebe.zeeqs.data.repository.ProcessInstanceRepository +import io.zeebe.zeeqs.data.repository.SignalSubscriptionRepository import io.zeebe.zeeqs.data.repository.TimerRepository import io.zeebe.zeeqs.data.service.BpmnElementInfo import io.zeebe.zeeqs.data.service.ProcessService @@ -14,29 +15,41 @@ import org.springframework.stereotype.Controller @Controller class ProcessResolver( - val processInstanceRepository: ProcessInstanceRepository, - val timerRepository: TimerRepository, - val messageSubscriptionRepository: MessageSubscriptionRepository, - val processService: ProcessService + val processInstanceRepository: ProcessInstanceRepository, + val timerRepository: TimerRepository, + val messageSubscriptionRepository: MessageSubscriptionRepository, + val processService: ProcessService, + private val signalSubscriptionRepository: SignalSubscriptionRepository ) { @SchemaMapping(typeName = "Process", field = "processInstances") fun processInstances( - process: Process, - @Argument perPage: Int, - @Argument page: Int, - @Argument stateIn: List + process: Process, + @Argument perPage: Int, + @Argument page: Int, + @Argument stateIn: List ): ProcessInstanceConnection { return ProcessInstanceConnection( - getItems = { processInstanceRepository.findByProcessDefinitionKeyAndStateIn(process.key, stateIn, PageRequest.of(page, perPage)).toList() }, - getCount = { processInstanceRepository.countByProcessDefinitionKeyAndStateIn(process.key, stateIn) } + getItems = { + processInstanceRepository.findByProcessDefinitionKeyAndStateIn( + process.key, + stateIn, + PageRequest.of(page, perPage) + ).toList() + }, + getCount = { + processInstanceRepository.countByProcessDefinitionKeyAndStateIn( + process.key, + stateIn + ) + } ) } @SchemaMapping(typeName = "Process", field = "deployTime") fun deployTime( - process: Process, - @Argument zoneId: String + process: Process, + @Argument zoneId: String ): String? { return process.deployTime.let { ResolverExtension.timestampToString(it, zoneId) } } @@ -48,38 +61,45 @@ class ProcessResolver( @SchemaMapping(typeName = "Process", field = "messageSubscriptions") fun messageSubscriptions(process: Process): List { - return messageSubscriptionRepository.findByProcessDefinitionKeyAndElementInstanceKeyIsNull(process.key) + return messageSubscriptionRepository.findByProcessDefinitionKeyAndElementInstanceKeyIsNull( + process.key + ) + } + + @SchemaMapping(typeName = "Process", field = "signalSubscriptions") + fun signalSubscriptions(process: Process): List { + return signalSubscriptionRepository.findByProcessDefinitionKey(processDefinitionKey = process.key) } @SchemaMapping(typeName = "Process", field = "elements") fun elements( - process: Process, - @Argument elementTypeIn: List + process: Process, + @Argument elementTypeIn: List ): List { return processService - .getBpmnElementInfo(process.key) - ?.values - ?.filter { elementTypeIn.isEmpty() || elementTypeIn.contains(it.elementType) } - ?.map { asBpmnElement(process, it) } - ?: emptyList() + .getBpmnElementInfo(process.key) + ?.values + ?.filter { elementTypeIn.isEmpty() || elementTypeIn.contains(it.elementType) } + ?.map { asBpmnElement(process, it) } + ?: emptyList() } private fun asBpmnElement(process: Process, it: BpmnElementInfo) = - BpmnElement( - processDefinitionKey = process.key, - elementId = it.elementId, - elementType = it.elementType - ) + BpmnElement( + processDefinitionKey = process.key, + elementId = it.elementId, + elementType = it.elementType + ) @SchemaMapping(typeName = "Process", field = "element") fun element( - process: Process, - @Argument elementId: String + process: Process, + @Argument elementId: String ): BpmnElement? { return processService - .getBpmnElementInfo(process.key) - ?.get(elementId) - ?.let { asBpmnElement(process, it) } + .getBpmnElementInfo(process.key) + ?.get(elementId) + ?.let { asBpmnElement(process, it) } } } \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalResolver.kt new file mode 100644 index 00000000..ed667a1e --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalResolver.kt @@ -0,0 +1,28 @@ +package io.zeebe.zeeqs.graphql.resolvers.type + +import io.zeebe.zeeqs.data.entity.Signal +import io.zeebe.zeeqs.data.entity.SignalVariable +import io.zeebe.zeeqs.data.repository.SignalVariableRepository +import org.springframework.graphql.data.method.annotation.Argument +import org.springframework.graphql.data.method.annotation.SchemaMapping +import org.springframework.stereotype.Controller + +@Controller +class SignalResolver( + val signalVariableRepository: SignalVariableRepository +) { + + @SchemaMapping(typeName = "Signal", field = "timestamp") + fun timestamp( + signal: Signal, + @Argument zoneId: String + ): String? { + return signal.timestamp.let { ResolverExtension.timestampToString(it, zoneId) } + } + + @SchemaMapping(typeName = "Signal", field = "variables") + fun variables(signal: Signal): List { + return signalVariableRepository.findBySignalKey(signalKey = signal.key) + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalSubscriptionResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalSubscriptionResolver.kt new file mode 100644 index 00000000..545f1f65 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/SignalSubscriptionResolver.kt @@ -0,0 +1,37 @@ +package io.zeebe.zeeqs.graphql.resolvers.type + +import io.zeebe.zeeqs.data.entity.Process +import io.zeebe.zeeqs.data.entity.SignalSubscription +import io.zeebe.zeeqs.data.repository.ProcessRepository +import org.springframework.data.repository.findByIdOrNull +import org.springframework.graphql.data.method.annotation.Argument +import org.springframework.graphql.data.method.annotation.SchemaMapping +import org.springframework.stereotype.Controller + +@Controller +class SignalSubscriptionResolver( + val processRepository: ProcessRepository +) { + + @SchemaMapping(typeName = "SignalSubscription", field = "timestamp") + fun timestamp( + signalSubscription: SignalSubscription, + @Argument zoneId: String + ): String? { + return signalSubscription.timestamp.let { ResolverExtension.timestampToString(it, zoneId) } + } + + @SchemaMapping(typeName = "SignalSubscription", field = "process") + fun process(signalSubscription: SignalSubscription): Process? { + return processRepository.findByIdOrNull(id = signalSubscription.processDefinitionKey) + } + + @SchemaMapping(typeName = "SignalSubscription", field = "element") + fun element(signalSubscription: SignalSubscription): BpmnElement? { + return BpmnElement( + processDefinitionKey = signalSubscription.processDefinitionKey, + elementId = signalSubscription.elementId + ) + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/resources/graphql/Element.graphqls b/graphql-api/src/main/resources/graphql/Element.graphqls index 0d03fea9..4e84b9e2 100644 --- a/graphql-api/src/main/resources/graphql/Element.graphqls +++ b/graphql-api/src/main/resources/graphql/Element.graphqls @@ -66,6 +66,8 @@ type BpmnElementMetadata { userTaskAssignmentDefinition: UserTaskAssignmentDefinition # the user form if the element is a user task userTaskForm: UserTaskForm + # The name of the signal if the element is a signal event. + signalName: String } # The definition of a message subscription from a BPMN element. diff --git a/graphql-api/src/main/resources/graphql/Process.graphqls b/graphql-api/src/main/resources/graphql/Process.graphqls index 7e730585..24dd7c93 100644 --- a/graphql-api/src/main/resources/graphql/Process.graphqls +++ b/graphql-api/src/main/resources/graphql/Process.graphqls @@ -18,6 +18,8 @@ type Process { timers: [Timer!] # the opened message subscriptions of the message start events of the process messageSubscriptions: [MessageSubscription!] + # The subscriptions of the signal start events of the process. + signalSubscriptions: [SignalSubscription!] # all BPMN elements of the process elements( # Filter the BPMN elements by the given types. If empty, return all elements. diff --git a/graphql-api/src/main/resources/graphql/Signal.graphqls b/graphql-api/src/main/resources/graphql/Signal.graphqls new file mode 100644 index 00000000..7dbb56d6 --- /dev/null +++ b/graphql-api/src/main/resources/graphql/Signal.graphqls @@ -0,0 +1,35 @@ +# A signal that was broadcasted. +type Signal { + # The key of the signal. + key: ID! + # The name of the signal. + name: String! + # The time when the signal was broadcasted. + timestamp(zoneId: String = "Z"): String + # The variables of the signal. + variables: [SignalVariable!] +} + +type SignalConnection { + totalCount: Int! + nodes: [Signal!]! +} + +# A variable of a signal. +type SignalVariable { + # The name of the variable. + name: String! + # The value of the variable as JSON. + value: String! +} + +extend type Query { + # Find a signal by its key. + signal(key: ID!): Signal + + # Fetch signals. + signals( + perPage: Int = 10, + page: Int = 0 + ): SignalConnection! +} diff --git a/graphql-api/src/main/resources/graphql/SignalSubscription.graphqls b/graphql-api/src/main/resources/graphql/SignalSubscription.graphqls new file mode 100644 index 00000000..ad3f748b --- /dev/null +++ b/graphql-api/src/main/resources/graphql/SignalSubscription.graphqls @@ -0,0 +1,23 @@ +# A subscription of a signal event. +type SignalSubscription { + # The key of the subscription. + key: ID! + # The name of the signal that it subscribed to. + signalName: String! + # The process that belongs to the subscription. + process: Process + # The current state of the subscription. + state: SignalSubscriptionState + # The time when the subscription was updated last. + timestamp(zoneId: String = "Z"): String + # The BPMN element that belong to the subscription. + element: BpmnElement +} + +# A state of a signal subscription. +enum SignalSubscriptionState { + # The subscription was created. + CREATED + # The subscription was deleted. + DELETED +} diff --git a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt index 028bfcde..985de32b 100644 --- a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt +++ b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt @@ -1,7 +1,5 @@ package io.zeebe.zeeqs.importer.hazelcast -import com.google.protobuf.Struct -import com.google.protobuf.Value import com.hazelcast.client.HazelcastClient import com.hazelcast.client.config.ClientConfig import io.camunda.zeebe.protocol.Protocol @@ -11,6 +9,7 @@ import io.zeebe.hazelcast.connect.java.ZeebeHazelcast import io.zeebe.zeeqs.data.entity.* import io.zeebe.zeeqs.data.reactive.DataUpdatesPublisher import io.zeebe.zeeqs.data.repository.* +import io.zeebe.zeeqs.importer.hazelcast.ProtobufTransformer.structToMap import org.springframework.stereotype.Component import java.time.Duration @@ -33,6 +32,7 @@ class HazelcastImporter( val messageCorrelationRepository: MessageCorrelationRepository, val errorRepository: ErrorRepository, private val decisionEvaluationImporter: HazelcastDecisionImporter, + private val signalImporter: HazelcastSignalImporter, private val dataUpdatesPublisher: DataUpdatesPublisher ) { @@ -122,6 +122,14 @@ class HazelcastImporter( it.takeIf { it.metadata.recordType == RecordType.EVENT } ?.let(decisionEvaluationImporter::importDecisionEvaluation) } + .addSignalListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(signalImporter::importSignal) + } + .addSignalSubscriptionListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(signalImporter::importSignalSubscription) + } .addErrorListener(this::importError) .postProcessListener(updateSequence) @@ -594,29 +602,6 @@ class HazelcastImporter( } } - private fun structToMap(struct: Struct): Map { - return struct.fieldsMap.mapValues { (_, value) -> valueToString(value) } - } - - private fun valueToString(value: Value): String { - return when (value.kindCase) { - Value.KindCase.NULL_VALUE -> "null" - Value.KindCase.BOOL_VALUE -> value.boolValue.toString() - Value.KindCase.NUMBER_VALUE -> value.numberValue.toString() - Value.KindCase.STRING_VALUE -> "\"${value.stringValue}\"" - Value.KindCase.LIST_VALUE -> value.listValue.valuesList.map { valueToString(it) } - .joinToString(separator = ",", prefix = "[", postfix = "]") - - Value.KindCase.STRUCT_VALUE -> value.structValue.fieldsMap.map { (key, value) -> - "\"$key\":" + valueToString( - value - ) - }.joinToString(separator = ",", prefix = "{", postfix = "}") - - else -> value.toString() - } - } - private fun importMessageStartEventSubscriptionRecord(record: Schema.MessageStartEventSubscriptionRecord) { val entity = messageSubscriptionRepository .findById(record.metadata.key) diff --git a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastSignalImporter.kt b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastSignalImporter.kt new file mode 100644 index 00000000..fa3fbe85 --- /dev/null +++ b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastSignalImporter.kt @@ -0,0 +1,83 @@ +package io.zeebe.zeeqs.importer.hazelcast + +import io.zeebe.exporter.proto.Schema +import io.zeebe.exporter.proto.Schema.SignalSubscriptionRecord +import io.zeebe.zeeqs.data.entity.Signal +import io.zeebe.zeeqs.data.entity.SignalSubscription +import io.zeebe.zeeqs.data.entity.SignalSubscriptionState +import io.zeebe.zeeqs.data.entity.SignalVariable +import io.zeebe.zeeqs.data.repository.SignalRepository +import io.zeebe.zeeqs.data.repository.SignalSubscriptionRepository +import io.zeebe.zeeqs.data.repository.SignalVariableRepository +import io.zeebe.zeeqs.importer.hazelcast.ProtobufTransformer.structToMap +import org.springframework.data.repository.findByIdOrNull +import org.springframework.stereotype.Component + +@Component +class HazelcastSignalImporter( + private val signalRepository: SignalRepository, + private val signalSubscriptionRepository: SignalSubscriptionRepository, + private val signalVariableRepository: SignalVariableRepository +) { + + fun importSignal(signal: Schema.SignalRecord) { + val entity = signalRepository.findByIdOrNull(signal.metadata.key) + ?: createSignal(signal) + + signalRepository.save(entity) + + importSignalVariables(signal) + } + + private fun createSignal(signal: Schema.SignalRecord): Signal { + return Signal( + key = signal.metadata.key, + position = signal.metadata.position, + name = signal.signalName, + timestamp = signal.metadata.timestamp + ) + } + + private fun importSignalVariables(signal: Schema.SignalRecord) { + val signalKey = signal.metadata.key + + val entities = structToMap(signal.variables).map { + val variableName = it.key + + SignalVariable( + id = "$signalKey-$variableName", + name = variableName, + value = it.value, + signalKey = signalKey, + position = signal.metadata.position + ) + } + + signalVariableRepository.saveAll(entities) + } + + fun importSignalSubscription(signalSubscription: Schema.SignalSubscriptionRecord) { + val entity = signalSubscriptionRepository.findByIdOrNull(signalSubscription.metadata.key) + ?: createSignalSubscription(signalSubscription) + + when (signalSubscription.metadata.intent) { + "CREATED" -> entity.state = SignalSubscriptionState.CREATED + "DELETED" -> entity.state = SignalSubscriptionState.DELETED + } + + entity.timestamp = signalSubscription.metadata.timestamp + + signalSubscriptionRepository.save(entity) + } + + private fun createSignalSubscription(signalSubscription: SignalSubscriptionRecord): SignalSubscription { + return SignalSubscription( + key = signalSubscription.metadata.key, + position = signalSubscription.metadata.position, + signalName = signalSubscription.signalName, + processDefinitionKey = signalSubscription.processDefinitionKey, + elementId = signalSubscription.catchEventId + ) + } + +} \ No newline at end of file diff --git a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/ProtobufTransformer.kt b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/ProtobufTransformer.kt new file mode 100644 index 00000000..4ecd3858 --- /dev/null +++ b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/ProtobufTransformer.kt @@ -0,0 +1,31 @@ +package io.zeebe.zeeqs.importer.hazelcast + +import com.google.protobuf.Struct +import com.google.protobuf.Value + +object ProtobufTransformer { + + fun structToMap(struct: Struct): Map { + return struct.fieldsMap.mapValues { (_, value) -> valueToString(value) } + } + + private fun valueToString(value: Value): String { + return when (value.kindCase) { + Value.KindCase.NULL_VALUE -> "null" + Value.KindCase.BOOL_VALUE -> value.boolValue.toString() + Value.KindCase.NUMBER_VALUE -> value.numberValue.toString() + Value.KindCase.STRING_VALUE -> "\"${value.stringValue}\"" + Value.KindCase.LIST_VALUE -> value.listValue.valuesList.map { valueToString(it) } + .joinToString(separator = ",", prefix = "[", postfix = "]") + + Value.KindCase.STRUCT_VALUE -> value.structValue.fieldsMap.map { (key, value) -> + "\"$key\":" + valueToString( + value + ) + }.joinToString(separator = ",", prefix = "{", postfix = "}") + + else -> value.toString() + } + } + +} \ No newline at end of file