Skip to content

Commit

Permalink
Merge pull request #344 from camunda-community-hub/decision-evaluatio…
Browse files Browse the repository at this point in the history
…n-subscription

feat: Subscribe to decision evaluation updates
  • Loading branch information
saig0 authored Mar 2, 2023
2 parents 61d1fa3 + 77d6e1e commit 5def3ef
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class DataUpdatesPublisher {
private val variableListeners = mutableListOf<Consumer<Variable>>()
private val incidentListeners = mutableListOf<Consumer<Incident>>()
private val jobListeners = mutableListOf<Consumer<Job>>()
private val decisionEvaluationListeners = mutableListOf<Consumer<DecisionEvaluation>>()

fun onProcessUpdated(process: Process) {
processListeners.forEach { it.accept(process) }
Expand Down Expand Up @@ -43,6 +44,10 @@ class DataUpdatesPublisher {
jobListeners.forEach { it.accept(job) }
}

fun onDecisionEvaluationUpdated(decisionEvaluation: DecisionEvaluation) {
decisionEvaluationListeners.forEach { it.accept(decisionEvaluation) }
}

fun registerProcessListener(listener: Consumer<Process>) {
processListeners.add(listener)
}
Expand Down Expand Up @@ -70,4 +75,8 @@ class DataUpdatesPublisher {
fun registerJobListener(listener: Consumer<Job>) {
jobListeners.add(listener)
}

fun registerDecisionEvaluationListener(listener: Consumer<DecisionEvaluation>) {
decisionEvaluationListeners.add(listener)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.zeebe.zeeqs.data.reactive

import io.zeebe.zeeqs.data.entity.Decision
import io.zeebe.zeeqs.data.entity.DecisionEvaluation
import io.zeebe.zeeqs.data.entity.Process
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -74,4 +75,10 @@ class DataUpdatesSubscription(private val publisher: DataUpdatesPublisher) {
}
}

fun decisionEvaluationSubscription(): Flux<DecisionEvaluation> {
return Flux.create { sink ->
publisher.registerDecisionEvaluationListener { sink.next(it) }
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.zeebe.zeeqs.graphql.resolvers.subscription

import io.zeebe.zeeqs.data.entity.DecisionEvaluation
import io.zeebe.zeeqs.data.reactive.DataUpdatesSubscription
import org.springframework.graphql.data.method.annotation.Argument
import org.springframework.graphql.data.method.annotation.SubscriptionMapping
import org.springframework.stereotype.Controller
import reactor.core.publisher.Flux

@Controller
class DecisionEvaluationSubscriptionMapping(private val subscription: DataUpdatesSubscription) {

@SubscriptionMapping
fun decisionEvaluationUpdates(
@Argument filter: DecisionEvaluationUpdateFilter?
): Flux<DecisionEvaluation> {
return subscription.decisionEvaluationSubscription()
.filter {
filter == null || (
(filter.decisionKey == null || filter.decisionKey == it.decisionKey)
&& (filter.decisionRequirementsKey == null || filter.decisionRequirementsKey == it.decisionRequirementsKey)
)
}
}

data class DecisionEvaluationUpdateFilter(
val decisionKey: Long?,
val decisionRequirementsKey: Long?
)

}
15 changes: 15 additions & 0 deletions graphql-api/src/main/resources/graphql/DecisionEvaluation.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,18 @@ type DecisionEvaluationConnection {
totalCount: Int!
nodes: [DecisionEvaluation!]!
}

extend type Subscription {
# Subscribe to updates of decision evaluations (i.e. a decision was evaluated).
decisionEvaluationUpdates(
# Limit the updates by the given filter.
filter: DecisionEvaluationUpdateFilter = null): DecisionEvaluation!
}

# A filter to limit the decision evaluation updates.
input DecisionEvaluationUpdateFilter {
# Limit the updates to evaluations of the given decision.
decisionKey: ID
# Limit the updates to evaluations that belong to a decision of the given DRG.
decisionRequirementsKey: ID
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class HazelcastDecisionImporter(
}
}
}

dataUpdatesPublisher.onDecisionEvaluationUpdated(entity)
}

private fun createDecisionEvaluation(decisionEvaluation: Schema.DecisionEvaluationRecord): DecisionEvaluation {
Expand Down

0 comments on commit 5def3ef

Please sign in to comment.