diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/ElementInstanceStateTransitionRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/ElementInstanceStateTransitionRepository.kt index 3c49bb69..edc395bb 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/ElementInstanceStateTransitionRepository.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/ElementInstanceStateTransitionRepository.kt @@ -5,7 +5,7 @@ import org.springframework.data.repository.PagingAndSortingRepository import org.springframework.stereotype.Repository @Repository -interface ElementInstanceStateTransitionRepository : PagingAndSortingRepository { +interface ElementInstanceStateTransitionRepository : PagingAndSortingRepository { fun findByElementInstanceKey(elementInstanceKey: Long): List } \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageCorrelationRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageCorrelationRepository.kt index 1b98563e..6f777920 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageCorrelationRepository.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageCorrelationRepository.kt @@ -5,7 +5,7 @@ import org.springframework.data.repository.PagingAndSortingRepository import org.springframework.stereotype.Repository @Repository -interface MessageCorrelationRepository : PagingAndSortingRepository { +interface MessageCorrelationRepository : PagingAndSortingRepository { fun findByMessageNameAndElementInstanceKey(messageName: String, elementInstanceKey: Long): List diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageRepository.kt index aa147f7e..fab50e2d 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageRepository.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/MessageRepository.kt @@ -1,11 +1,16 @@ package io.zeebe.zeeqs.data.repository import io.zeebe.zeeqs.data.entity.Message +import io.zeebe.zeeqs.data.entity.MessageState +import org.springframework.data.domain.Pageable import org.springframework.data.repository.PagingAndSortingRepository import org.springframework.stereotype.Repository @Repository interface MessageRepository : PagingAndSortingRepository { + fun findByStateIn(stateIn: List, pageable: Pageable): List + + fun countByStateIn(stateIn: List): Long } \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/VariableUpdateRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/VariableUpdateRepository.kt index 3c066b12..68657f6e 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/VariableUpdateRepository.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/VariableUpdateRepository.kt @@ -6,7 +6,7 @@ import org.springframework.stereotype.Repository import org.springframework.transaction.annotation.Transactional @Repository -interface VariableUpdateRepository : PagingAndSortingRepository { +interface VariableUpdateRepository : PagingAndSortingRepository { @Transactional(readOnly = true) fun findByProcessInstanceKey(processInstanceKey: Long): List diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/MessageQueryResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/MessageQueryResolver.kt index a884cb6e..f6e93fba 100644 --- a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/MessageQueryResolver.kt +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/MessageQueryResolver.kt @@ -2,6 +2,7 @@ package io.zeebe.zeeqs.data.resolvers import graphql.kickstart.tools.GraphQLQueryResolver import io.zeebe.zeeqs.data.entity.Message +import io.zeebe.zeeqs.data.entity.MessageState import io.zeebe.zeeqs.data.repository.MessageRepository import io.zeebe.zeeqs.graphql.resolvers.connection.MessageConnection import org.springframework.data.domain.PageRequest @@ -13,10 +14,19 @@ class MessageQueryResolver( val messageRepository: MessageRepository ) : GraphQLQueryResolver { - fun messages(perPage: Int, page: Int): MessageConnection { + fun messages(perPage: Int, page: Int, stateIn: List): MessageConnection { return MessageConnection( - getItems = { messageRepository.findAll(PageRequest.of(page, perPage)).toList() }, - getCount = { messageRepository.count() } + getItems = { + messageRepository.findByStateIn( + stateIn = stateIn, + pageable = PageRequest.of(page, perPage) + ) + }, + getCount = { + messageRepository.countByStateIn( + stateIn = stateIn + ) + } ) } diff --git a/graphql-api/src/main/resources/graphql/Message.graphqls b/graphql-api/src/main/resources/graphql/Message.graphqls index d0605505..1b83764d 100644 --- a/graphql-api/src/main/resources/graphql/Message.graphqls +++ b/graphql-api/src/main/resources/graphql/Message.graphqls @@ -34,5 +34,7 @@ extend type Query { messages( perPage: Int = 10, - page: Int = 0): MessageConnection! + page: Int = 0, + stateIn: [MessageState] = [PUBLISHED, EXPIRED] + ): MessageConnection! } diff --git a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt index d402f1b8..7eb6b94e 100644 --- a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt +++ b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt @@ -272,11 +272,12 @@ class HazelcastImporter( val state = getElementInstanceState(record) + val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = elementInstanceStateTransitionRepository - .findById(record.metadata.position) + .findById(partitionIdWithPosition) .orElse( ElementInstanceStateTransition( - partitionIdWithPosition = getPartitionIdWithPosition(record.metadata), + partitionIdWithPosition = partitionIdWithPosition, elementInstanceKey = record.metadata.key, timestamp = record.metadata.timestamp, state = state @@ -317,11 +318,12 @@ class HazelcastImporter( private fun importVariableUpdate(record: Schema.VariableRecord) { + val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = variableUpdateRepository - .findById(record.metadata.position) + .findById(partitionIdWithPosition) .orElse( VariableUpdate( - partitionIdWithPosition = getPartitionIdWithPosition(record.metadata), + partitionIdWithPosition = partitionIdWithPosition, variableKey = record.metadata.key, name = record.name, value = record.value, @@ -588,11 +590,12 @@ class HazelcastImporter( private fun importMessageCorrelation(record: Schema.ProcessMessageSubscriptionRecord) { + val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = messageCorrelationRepository - .findById(record.metadata.position) + .findById(partitionIdWithPosition) .orElse( MessageCorrelation( - partitionIdWithPosition = getPartitionIdWithPosition(record.metadata), + partitionIdWithPosition = partitionIdWithPosition, messageKey = record.messageKey, messageName = record.messageName, elementInstanceKey = record.elementInstanceKey, @@ -608,11 +611,12 @@ class HazelcastImporter( private fun importMessageCorrelation(record: Schema.MessageStartEventSubscriptionRecord) { + val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = messageCorrelationRepository - .findById(record.metadata.position) + .findById(partitionIdWithPosition) .orElse( MessageCorrelation( - partitionIdWithPosition = getPartitionIdWithPosition(record.metadata), + partitionIdWithPosition = partitionIdWithPosition, messageKey = record.messageKey, messageName = record.messageName, elementInstanceKey = null,