Skip to content

Commit

Permalink
feat: add signals + subscriptions to GraphQL API
Browse files Browse the repository at this point in the history
Expose signals and signal subscriptions via GraphQL API.
  • Loading branch information
saig0 committed Apr 6, 2023
1 parent 20c8d17 commit 45655e4
Show file tree
Hide file tree
Showing 22 changed files with 561 additions and 148 deletions.
13 changes: 13 additions & 0 deletions data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Signal.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.zeebe.zeeqs.data.entity

import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id

@Entity
class Signal(
@Id @Column(name = "key_") val key: Long,
val position: Long,
val name: String,
var timestamp: Long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.zeebe.zeeqs.data.entity

import javax.persistence.*

@Entity
class SignalSubscription(
@Id @Column(name = "key_") val key: Long,
val position: Long,
val signalName: String,
val processDefinitionKey: Long,
val elementId: String
) {

@Enumerated(EnumType.STRING)
var state: SignalSubscriptionState = SignalSubscriptionState.CREATED
var timestamp: Long = -1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.zeebe.zeeqs.data.entity

enum class SignalSubscriptionState {
CREATED,
DELETED
}
15 changes: 15 additions & 0 deletions data/src/main/kotlin/io/zeebe/zeeqs/data/entity/SignalVariable.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.zeebe.zeeqs.data.entity

import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob

@Entity
class SignalVariable(
@Id val id: String,
val name: String,
@Lob @Column(name = "value_") val value: String,
val signalKey: Long,
val position: Long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.zeebe.zeeqs.data.repository

import io.zeebe.zeeqs.data.entity.Signal
import org.springframework.data.repository.PagingAndSortingRepository
import org.springframework.stereotype.Repository

@Repository
interface SignalRepository : PagingAndSortingRepository<Signal, Long> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.zeebe.zeeqs.data.repository

import io.zeebe.zeeqs.data.entity.SignalSubscription
import org.springframework.data.repository.PagingAndSortingRepository
import org.springframework.stereotype.Repository

@Repository
interface SignalSubscriptionRepository : PagingAndSortingRepository<SignalSubscription, Long> {

fun findByProcessDefinitionKey(processDefinitionKey: Long): List<SignalSubscription>

fun findByProcessDefinitionKeyAndSignalName(
processDefinitionKey: Long,
signalName: String
): SignalSubscription?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.zeebe.zeeqs.data.repository

import io.zeebe.zeeqs.data.entity.SignalVariable
import org.springframework.data.repository.PagingAndSortingRepository
import org.springframework.stereotype.Repository

@Repository
interface SignalVariableRepository : PagingAndSortingRepository<SignalVariable, String> {

fun findBySignalKey(signalKey: Long): List<SignalVariable>

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.zeebe.zeeqs.data.service

data class BpmnElementMetadata(
val jobType: String? = null,
val conditionExpression: String? = null,
val timerDefinition: String? = null,
val errorCode: String? = null,
val calledProcessId: String? = null,
val messageSubscriptionDefinition: MessageSubscriptionDefinition? = null,
val userTaskAssignmentDefinition: UserTaskAssignmentDefinition? = null,
val userTaskForm: UserTaskForm? = null
val jobType: String? = null,
val conditionExpression: String? = null,
val timerDefinition: String? = null,
val errorCode: String? = null,
val calledProcessId: String? = null,
val messageSubscriptionDefinition: MessageSubscriptionDefinition? = null,
val userTaskAssignmentDefinition: UserTaskAssignmentDefinition? = null,
val userTaskForm: UserTaskForm? = null,
val signalName: String? = null
)
181 changes: 96 additions & 85 deletions data/src/main/kotlin/io/zeebe/zeeqs/data/service/ProcessService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@ class ProcessService(val processRepository: ProcessRepository) {
@Cacheable(cacheNames = ["bpmnElementInfo"])
fun getBpmnElementInfo(processDefinitionKey: Long): Map<String, BpmnElementInfo>? {
return getBpmnModel(processDefinitionKey)
?.let { it.getModelElementsByType(FlowElement::class.java) }
?.map { flowElement ->
Pair(flowElement.id, BpmnElementInfo(
elementId = flowElement.id,
elementName = flowElement.name,
elementType = getBpmnElementType(flowElement),
metadata = getMetadata(flowElement)
))
}
?.toMap()
?.let { it.getModelElementsByType(FlowElement::class.java) }
?.map { flowElement ->
Pair(
flowElement.id, BpmnElementInfo(
elementId = flowElement.id,
elementName = flowElement.name,
elementType = getBpmnElementType(flowElement),
metadata = getMetadata(flowElement)
)
)
}
?.toMap()
}

private fun getBpmnModel(processDefinitionKey: Long): BpmnModelInstance? {
return processRepository.findByIdOrNull(processDefinitionKey)
?.bpmnXML
?.byteInputStream()
?.let { Bpmn.readModelFromStream(it) }
?.bpmnXML
?.byteInputStream()
?.let { Bpmn.readModelFromStream(it) }
}

private fun getBpmnElementType(element: FlowElement): BpmnElementType {
Expand Down Expand Up @@ -66,97 +68,106 @@ class ProcessService(val processRepository: ProcessRepository) {
}

private fun getBpmnSubprocessType(element: FlowElement) =
if (element is SubProcess) {
if (element.triggeredByEvent()) {
BpmnElementType.EVENT_SUB_PROCESS
} else {
BpmnElementType.SUB_PROCESS
}
if (element is SubProcess) {
if (element.triggeredByEvent()) {
BpmnElementType.EVENT_SUB_PROCESS
} else {
BpmnElementType.UNKNOWN
BpmnElementType.SUB_PROCESS
}
} else {
BpmnElementType.UNKNOWN
}

private fun getMetadata(element: FlowElement): BpmnElementMetadata {
return BpmnElementMetadata(
jobType = element
.getSingleExtensionElement(ZeebeTaskDefinition::class.java)
?.type,
conditionExpression = when (element) {
is SequenceFlow -> element.conditionExpression?.textContent
else -> null
},
timerDefinition = when (element) {
is CatchEvent -> element.eventDefinitions
?.filterIsInstance(TimerEventDefinition::class.java)
?.firstOrNull()
?.let { it.timeCycle ?: it.timeDate ?: it.timeDuration }
?.textContent

else -> null
},
errorCode = when (element) {
is CatchEvent -> element.eventDefinitions
?.filterIsInstance(ErrorEventDefinition::class.java)
?.firstOrNull()
?.error
?.errorCode

else -> null
jobType = element
.getSingleExtensionElement(ZeebeTaskDefinition::class.java)
?.type,
conditionExpression = when (element) {
is SequenceFlow -> element.conditionExpression?.textContent
else -> null
},
timerDefinition = when (element) {
is CatchEvent -> element.eventDefinitions
?.filterIsInstance(TimerEventDefinition::class.java)
?.firstOrNull()
?.let { it.timeCycle ?: it.timeDate ?: it.timeDuration }
?.textContent

else -> null
},
errorCode = when (element) {
is CatchEvent -> element.eventDefinitions
?.filterIsInstance(ErrorEventDefinition::class.java)
?.firstOrNull()
?.error
?.errorCode

else -> null
},
calledProcessId = element
.getSingleExtensionElement(ZeebeCalledElement::class.java)
?.processId,
messageSubscriptionDefinition = when (element) {
is CatchEvent -> element.eventDefinitions
?.filterIsInstance(MessageEventDefinition::class.java)
?.firstOrNull()
?.message
?.let {
MessageSubscriptionDefinition(
messageName = it.name,
messageCorrelationKey = it
.getSingleExtensionElement(ZeebeSubscription::class.java)
?.correlationKey
)
}

else -> null
},
userTaskAssignmentDefinition = element
.getSingleExtensionElement(ZeebeAssignmentDefinition::class.java)
?.let {
UserTaskAssignmentDefinition(
assignee = it.assignee,
candidateGroups = it.candidateGroups
)
},
calledProcessId = element
.getSingleExtensionElement(ZeebeCalledElement::class.java)
?.processId,
messageSubscriptionDefinition = when (element) {
is CatchEvent -> element.eventDefinitions
?.filterIsInstance(MessageEventDefinition::class.java)
?.firstOrNull()
?.message
?.let {
MessageSubscriptionDefinition(
messageName = it.name,
messageCorrelationKey = it
.getSingleExtensionElement(ZeebeSubscription::class.java)
?.correlationKey
)
}

else -> null
userTaskForm = element
.getSingleExtensionElement(ZeebeFormDefinition::class.java)
?.formKey
?.let { formKey ->
UserTaskForm(
key = formKey,
resource = getForm(
model = element.modelInstance,
formKey = formKey
)
)
},
userTaskAssignmentDefinition = element
.getSingleExtensionElement(ZeebeAssignmentDefinition::class.java)
?.let {
UserTaskAssignmentDefinition(
assignee = it.assignee,
candidateGroups = it.candidateGroups
)
},
userTaskForm = element
.getSingleExtensionElement(ZeebeFormDefinition::class.java)
?.formKey
?.let { formKey ->
UserTaskForm(
key = formKey,
resource = getForm(
model = element.modelInstance,
formKey = formKey
)
)
}
signalName = when (element) {
is CatchEvent -> element.eventDefinitions
?.filterIsInstance(SignalEventDefinition::class.java)
?.firstOrNull()
?.signal
?.name

else -> null
}
)
}

@Cacheable(cacheNames = ["userTaskForm"])
fun getForm(processDefinitionKey: Long, formKey: String): String? {
return getBpmnModel(processDefinitionKey)
?.let { getForm(model = it, formKey = formKey) }
?.let { getForm(model = it, formKey = formKey) }
}

private fun getForm(model: ModelInstance, formKey: String): String? {
val normalizedFormKey = formKey.replace(CAMUNDA_FORM_KEY_PREFIX, "")

return model.getModelElementsByType(ZeebeUserTaskForm::class.java)
?.firstOrNull { it.id == normalizedFormKey }
?.textContent
?.firstOrNull { it.id == normalizedFormKey }
?.textContent
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.zeebe.zeeqs.graphql.resolvers.connection

import io.zeebe.zeeqs.data.entity.Signal

class SignalConnection(
val getItems: () -> List<Signal>,
val getCount: () -> Long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.zeebe.zeeqs.graphql.resolvers.connection

import io.zeebe.zeeqs.data.entity.Signal
import org.springframework.graphql.data.method.annotation.SchemaMapping
import org.springframework.stereotype.Controller

@Controller
class SignalConnectionResolver {

@SchemaMapping(typeName = "SignalConnection", field = "nodes")
fun nodes(connection: SignalConnection): List<Signal> {
return connection.getItems()
}

@SchemaMapping(typeName = "SignalConnection", field = "totalCount")
fun totalCount(connection: SignalConnection): Long {
return connection.getCount()
}

}
Loading

0 comments on commit 45655e4

Please sign in to comment.