Skip to content

Commit

Permalink
feat: query messages by state (#247)
Browse files Browse the repository at this point in the history
* fix: change entity id type from long to string

* feat: query messages by state
  • Loading branch information
saig0 authored Jun 7, 2022
1 parent 0ff1995 commit 8440cf2
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.springframework.data.repository.PagingAndSortingRepository
import org.springframework.stereotype.Repository

@Repository
interface ElementInstanceStateTransitionRepository : PagingAndSortingRepository<ElementInstanceStateTransition, Long> {
interface ElementInstanceStateTransitionRepository : PagingAndSortingRepository<ElementInstanceStateTransition, String> {

fun findByElementInstanceKey(elementInstanceKey: Long): List<ElementInstanceStateTransition>
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.springframework.data.repository.PagingAndSortingRepository
import org.springframework.stereotype.Repository

@Repository
interface MessageCorrelationRepository : PagingAndSortingRepository<MessageCorrelation, Long> {
interface MessageCorrelationRepository : PagingAndSortingRepository<MessageCorrelation, String> {

fun findByMessageNameAndElementInstanceKey(messageName: String, elementInstanceKey: Long): List<MessageCorrelation>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.zeebe.zeeqs.data.repository

import io.zeebe.zeeqs.data.entity.Message
import io.zeebe.zeeqs.data.entity.MessageState
import org.springframework.data.domain.Pageable
import org.springframework.data.repository.PagingAndSortingRepository
import org.springframework.stereotype.Repository

@Repository
interface MessageRepository : PagingAndSortingRepository<Message, Long> {

fun findByStateIn(stateIn: List<MessageState>, pageable: Pageable): List<Message>

fun countByStateIn(stateIn: List<MessageState>): Long

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.springframework.stereotype.Repository
import org.springframework.transaction.annotation.Transactional

@Repository
interface VariableUpdateRepository : PagingAndSortingRepository<VariableUpdate, Long> {
interface VariableUpdateRepository : PagingAndSortingRepository<VariableUpdate, String> {

@Transactional(readOnly = true)
fun findByProcessInstanceKey(processInstanceKey: Long): List<VariableUpdate>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.zeebe.zeeqs.data.resolvers

import graphql.kickstart.tools.GraphQLQueryResolver
import io.zeebe.zeeqs.data.entity.Message
import io.zeebe.zeeqs.data.entity.MessageState
import io.zeebe.zeeqs.data.repository.MessageRepository
import io.zeebe.zeeqs.graphql.resolvers.connection.MessageConnection
import org.springframework.data.domain.PageRequest
Expand All @@ -13,10 +14,19 @@ class MessageQueryResolver(
val messageRepository: MessageRepository
) : GraphQLQueryResolver {

fun messages(perPage: Int, page: Int): MessageConnection {
fun messages(perPage: Int, page: Int, stateIn: List<MessageState>): MessageConnection {
return MessageConnection(
getItems = { messageRepository.findAll(PageRequest.of(page, perPage)).toList() },
getCount = { messageRepository.count() }
getItems = {
messageRepository.findByStateIn(
stateIn = stateIn,
pageable = PageRequest.of(page, perPage)
)
},
getCount = {
messageRepository.countByStateIn(
stateIn = stateIn
)
}
)
}

Expand Down
4 changes: 3 additions & 1 deletion graphql-api/src/main/resources/graphql/Message.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ extend type Query {

messages(
perPage: Int = 10,
page: Int = 0): MessageConnection!
page: Int = 0,
stateIn: [MessageState] = [PUBLISHED, EXPIRED]
): MessageConnection!
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,12 @@ class HazelcastImporter(

val state = getElementInstanceState(record)

val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata)
val entity = elementInstanceStateTransitionRepository
.findById(record.metadata.position)
.findById(partitionIdWithPosition)
.orElse(
ElementInstanceStateTransition(
partitionIdWithPosition = getPartitionIdWithPosition(record.metadata),
partitionIdWithPosition = partitionIdWithPosition,
elementInstanceKey = record.metadata.key,
timestamp = record.metadata.timestamp,
state = state
Expand Down Expand Up @@ -317,11 +318,12 @@ class HazelcastImporter(

private fun importVariableUpdate(record: Schema.VariableRecord) {

val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata)
val entity = variableUpdateRepository
.findById(record.metadata.position)
.findById(partitionIdWithPosition)
.orElse(
VariableUpdate(
partitionIdWithPosition = getPartitionIdWithPosition(record.metadata),
partitionIdWithPosition = partitionIdWithPosition,
variableKey = record.metadata.key,
name = record.name,
value = record.value,
Expand Down Expand Up @@ -588,11 +590,12 @@ class HazelcastImporter(

private fun importMessageCorrelation(record: Schema.ProcessMessageSubscriptionRecord) {

val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata)
val entity = messageCorrelationRepository
.findById(record.metadata.position)
.findById(partitionIdWithPosition)
.orElse(
MessageCorrelation(
partitionIdWithPosition = getPartitionIdWithPosition(record.metadata),
partitionIdWithPosition = partitionIdWithPosition,
messageKey = record.messageKey,
messageName = record.messageName,
elementInstanceKey = record.elementInstanceKey,
Expand All @@ -608,11 +611,12 @@ class HazelcastImporter(

private fun importMessageCorrelation(record: Schema.MessageStartEventSubscriptionRecord) {

val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata)
val entity = messageCorrelationRepository
.findById(record.metadata.position)
.findById(partitionIdWithPosition)
.orElse(
MessageCorrelation(
partitionIdWithPosition = getPartitionIdWithPosition(record.metadata),
partitionIdWithPosition = partitionIdWithPosition,
messageKey = record.messageKey,
messageName = record.messageName,
elementInstanceKey = null,
Expand Down

0 comments on commit 8440cf2

Please sign in to comment.