diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt index f78f2d81..8a040b1a 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt @@ -14,6 +14,7 @@ class DataUpdatesPublisher { private val variableListeners = mutableListOf>() private val incidentListeners = mutableListOf>() private val jobListeners = mutableListOf>() + private val decisionEvaluationListeners = mutableListOf>() fun onProcessUpdated(process: Process) { processListeners.forEach { it.accept(process) } @@ -43,6 +44,10 @@ class DataUpdatesPublisher { jobListeners.forEach { it.accept(job) } } + fun onDecisionEvaluationUpdated(decisionEvaluation: DecisionEvaluation) { + decisionEvaluationListeners.forEach { it.accept(decisionEvaluation) } + } + fun registerProcessListener(listener: Consumer) { processListeners.add(listener) } @@ -70,4 +75,8 @@ class DataUpdatesPublisher { fun registerJobListener(listener: Consumer) { jobListeners.add(listener) } + + fun registerDecisionEvaluationListener(listener: Consumer) { + decisionEvaluationListeners.add(listener) + } } \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt index 08c4dd6b..8cf20f2f 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt @@ -1,6 +1,7 @@ package io.zeebe.zeeqs.data.reactive import io.zeebe.zeeqs.data.entity.Decision +import io.zeebe.zeeqs.data.entity.DecisionEvaluation import io.zeebe.zeeqs.data.entity.Process import org.springframework.stereotype.Component import reactor.core.publisher.Flux @@ -74,4 +75,10 @@ class DataUpdatesSubscription(private val publisher: DataUpdatesPublisher) { } } + fun decisionEvaluationSubscription(): Flux { + return Flux.create { sink -> + publisher.registerDecisionEvaluationListener { sink.next(it) } + } + } + } \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/subscription/DecisionEvaluationSubscriptionMapping.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/subscription/DecisionEvaluationSubscriptionMapping.kt new file mode 100644 index 00000000..4b39d330 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/subscription/DecisionEvaluationSubscriptionMapping.kt @@ -0,0 +1,31 @@ +package io.zeebe.zeeqs.graphql.resolvers.subscription + +import io.zeebe.zeeqs.data.entity.DecisionEvaluation +import io.zeebe.zeeqs.data.reactive.DataUpdatesSubscription +import org.springframework.graphql.data.method.annotation.Argument +import org.springframework.graphql.data.method.annotation.SubscriptionMapping +import org.springframework.stereotype.Controller +import reactor.core.publisher.Flux + +@Controller +class DecisionEvaluationSubscriptionMapping(private val subscription: DataUpdatesSubscription) { + + @SubscriptionMapping + fun decisionEvaluationUpdates( + @Argument filter: DecisionEvaluationUpdateFilter? + ): Flux { + return subscription.decisionEvaluationSubscription() + .filter { + filter == null || ( + (filter.decisionKey == null || filter.decisionKey == it.decisionKey) + && (filter.decisionRequirementsKey == null || filter.decisionRequirementsKey == it.decisionRequirementsKey) + ) + } + } + + data class DecisionEvaluationUpdateFilter( + val decisionKey: Long?, + val decisionRequirementsKey: Long? + ) + +} \ No newline at end of file diff --git a/graphql-api/src/main/resources/graphql/DecisionEvaluation.graphqls b/graphql-api/src/main/resources/graphql/DecisionEvaluation.graphqls index c36d54da..5e830437 100644 --- a/graphql-api/src/main/resources/graphql/DecisionEvaluation.graphqls +++ b/graphql-api/src/main/resources/graphql/DecisionEvaluation.graphqls @@ -76,3 +76,18 @@ type DecisionEvaluationConnection { totalCount: Int! nodes: [DecisionEvaluation!]! } + +extend type Subscription { + # Subscribe to updates of decision evaluations (i.e. a decision was evaluated). + decisionEvaluationUpdates( + # Limit the updates by the given filter. + filter: DecisionEvaluationUpdateFilter = null): DecisionEvaluation! +} + +# A filter to limit the decision evaluation updates. +input DecisionEvaluationUpdateFilter { + # Limit the updates to evaluations of the given decision. + decisionKey: ID + # Limit the updates to evaluations that belong to a decision of the given DRG. + decisionRequirementsKey: ID +} diff --git a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastDecisionImporter.kt b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastDecisionImporter.kt index ef975e0c..2f387a6f 100644 --- a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastDecisionImporter.kt +++ b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastDecisionImporter.kt @@ -115,6 +115,8 @@ class HazelcastDecisionImporter( } } } + + dataUpdatesPublisher.onDecisionEvaluationUpdated(entity) } private fun createDecisionEvaluation(decisionEvaluation: Schema.DecisionEvaluationRecord): DecisionEvaluation {