diff --git a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt index b66e5e5b056..5b289ed5e95 100644 --- a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt +++ b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt @@ -48,7 +48,7 @@ class VaultNamedQueryFactoryProvider @Activate constructor( private fun registerPlatformQueries(vaultNamedQueryBuilderFactory: VaultNamedQueryBuilderFactory) { vaultNamedQueryBuilderFactory .create(FIND_UNCONSUMED_STATES_BY_EXACT_TYPE) - .whereJson("WHERE visible_states.type = :type") + .whereJson("WHERE visible_states.type = :type AND visible_states.consumed IS NULL") .register() } } diff --git a/libs/membership/membership-impl/build.gradle b/libs/membership/membership-impl/build.gradle index 96285545f43..6965bd5bd8b 100644 --- a/libs/membership/membership-impl/build.gradle +++ b/libs/membership/membership-impl/build.gradle @@ -17,7 +17,9 @@ dependencies { implementation project(':libs:crypto:crypto-core') implementation project(":libs:membership:membership-common") implementation project(":libs:utilities") + implementation project(':libs:sandbox-types') implementation project(":libs:serialization:serialization-avro") + implementation project(':libs:serialization:serialization-internal') implementation "net.corda:corda-avro-schema" implementation "net.corda:corda-base" diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt new file mode 100644 index 00000000000..b287f9aed0b --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MGMContextImpl +import net.corda.sandbox.type.SandboxConstants +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [SandboxConstants.CORDA_UNINJECTABLE_SERVICE], + scope = ServiceScope.PROTOTYPE +) +class MGMContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MGMContextImpl): MGMContextProxy { + return MGMContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MGMContextProxy): MGMContextImpl { + return when(proxy.version) { + VERSION_1 -> + MGMContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MGMContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MGMContextProxy::class.java + + override val type: Class + get() = MGMContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MGMContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MGMContextImpl] serialization. + */ + val map: Map +) diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt new file mode 100644 index 00000000000..a6b795e0bff --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MemberContextImpl +import net.corda.sandbox.type.SandboxConstants.CORDA_UNINJECTABLE_SERVICE +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [ CORDA_UNINJECTABLE_SERVICE ], + scope = ServiceScope.PROTOTYPE +) +class MemberContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MemberContextImpl): MemberContextProxy { + return MemberContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MemberContextProxy): MemberContextImpl { + return when(proxy.version) { + VERSION_1 -> + MemberContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MemberContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MemberContextProxy::class.java + + override val type: Class + get() = MemberContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MemberContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MemberContextImpl] serialization. + */ + val map: Map +) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt index 116ca1579dd..8b2c43f8818 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt @@ -2,12 +2,14 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.util.concurrent.CompletableFuture class MessageBusClient( override val id: String, @@ -18,9 +20,31 @@ class MessageBusClient( private val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - override fun send(message: MediatorMessage<*>): MediatorMessage<*>? { - producer.send(message.toCordaProducerRecord(), null) - return null + override fun send(message: MediatorMessage<*>): MediatorMessage<*> { + val future = CompletableFuture() + val record = message.toCordaProducerRecord() + producer.send(record) { ex -> + setFutureFromResponse(ex, future, record.topic) + } + + return MediatorMessage(future) + } + + /** + * Helper function to set a [future] result based on the presence of an [exception] + */ + private fun setFutureFromResponse( + exception: Exception?, + future: CompletableFuture, + topic: String + ) { + if (exception == null) { + future.complete(Unit) + } else { + val message = "Producer clientId $id for topic $topic failed to send." + log.warn(message, exception) + future.completeExceptionally(CordaMessageAPIFatalException(message, exception)) + } } override fun close() { @@ -34,6 +58,9 @@ class MessageBusClient( } } +/** + * Helper function to convert a [MediatorMessage] of a specific format to a [CordaProducerRecord] + */ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, *> { return CordaProducerRecord( topic = this.getProperty(MSG_PROP_ENDPOINT), @@ -43,5 +70,8 @@ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, * ) } +/** + * Helper function to extract headers from message props + */ private fun Map.toHeaders() = map { (key, value) -> (key to value.toString()) } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt index 850c334f4e9..9eedf95a393 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt @@ -254,7 +254,7 @@ internal class CordaPublisherImpl( if (!config.transactional) { future.complete(Unit) } else { - log.debug { "Asynchronous send completed completed successfully." } + log.debug { "Asynchronous send completed successfully." } } } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt index 58559d25ba4..1cf81e3a2e6 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt @@ -2,19 +2,26 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.v5.base.exceptions.CordaRuntimeException +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.mockito.Mockito +import org.mockito.Mockito.any +import org.mockito.Mockito.doAnswer import org.mockito.Mockito.times import org.mockito.kotlin.eq -import org.mockito.kotlin.isNull import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.util.concurrent.CompletableFuture class MessageBusClientTest { private companion object { @@ -31,6 +38,12 @@ class MessageBusClientTest { MSG_PROP_KEY to TEST_KEY, ) private val message: MediatorMessage = MediatorMessage("value", messageProps) + private val record: CordaProducerRecord<*, *> = CordaProducerRecord( + TEST_ENDPOINT, + TEST_KEY, + message.payload, + messageProps.toHeaders(), + ) @BeforeEach @@ -39,22 +52,26 @@ class MessageBusClientTest { messageBusClient = MessageBusClient("client-id", cordaProducer) } + @Suppress("UNCHECKED_CAST") @Test - fun testSend() { - messageBusClient.send(message) + fun `test send`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(null) + }.whenever(cordaProducer).send(eq(record), any()) - val expected = CordaProducerRecord( - TEST_ENDPOINT, - TEST_KEY, - message.payload, - messageProps.toHeaders(), - ) + val result = messageBusClient.send(message) as MediatorMessage> - verify(cordaProducer).send(eq(expected), isNull()) + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + result.payload?.let { + assertTrue(it.isDone) + assertFalse(it.isCompletedExceptionally) + } } @Test - fun testSendWithError() { + fun `send should handle synchronous error`() { val record = CordaProducerRecord( TEST_ENDPOINT, TEST_KEY, @@ -62,14 +79,56 @@ class MessageBusClientTest { messageProps.toHeaders(), ) - Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), isNull()) + Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), any()) assertThrows { messageBusClient.send(message) } } + @Suppress("UNCHECKED_CAST") + @Test + fun `send should handle asynchronous CordaMessageAPIFatalException`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaMessageAPIFatalException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + + @Suppress("UNCHECKED_CAST") + @Test + fun `send should wrap unknown exceptions`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaRuntimeException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + @Test - fun testClose() { + fun `test close`() { messageBusClient.close() verify(cordaProducer, times(1)).close() }