From fb8ddad51d90df99227c424ff4a7553b97137c3e Mon Sep 17 00:00:00 2001 From: Yiftach Kaplan <67583323+yift-r3@users.noreply.github.com> Date: Thu, 16 Nov 2023 19:44:45 +0000 Subject: [PATCH 1/5] CORE-18390: Allow old version of SetOwnRegistrationStatus in the command (#5114) * CORE-18357: Allow old version of SetOwnRegistrationStatus in the command * Set to the correct version --- .../PersistMemberRegistrationStateHandler.kt | 41 +++++++++++++++++-- ...rsistMemberRegistrationStateHandlerTest.kt | 17 ++++---- gradle.properties | 2 +- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt b/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt index c6d92b889ff..4178f8dc91e 100644 --- a/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt +++ b/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt @@ -2,6 +2,10 @@ package net.corda.membership.impl.registration.dynamic.handler.member import net.corda.data.identity.HoldingIdentity import net.corda.data.membership.command.registration.member.PersistMemberRegistrationState +import net.corda.data.membership.common.RegistrationStatus as RegistrationStatusV1 +import net.corda.data.membership.common.v2.RegistrationStatus as RegistrationStatusV2 +import net.corda.data.membership.p2p.SetOwnRegistrationStatus as SetOwnRegistrationStatusV1 +import net.corda.data.membership.p2p.v2.SetOwnRegistrationStatus as SetOwnRegistrationStatusV2 import net.corda.data.membership.state.RegistrationState import net.corda.membership.impl.registration.dynamic.handler.RegistrationHandler import net.corda.membership.impl.registration.dynamic.handler.RegistrationHandlerResult @@ -17,11 +21,12 @@ internal class PersistMemberRegistrationStateHandler( command: PersistMemberRegistrationState, ): RegistrationHandlerResult { val member = command.member.toCorda() + val request = command.request() val commands = membershipPersistenceClient.setRegistrationRequestStatus( member, - command.setStatusRequest.registrationId, - command.setStatusRequest.newStatus, - command.setStatusRequest.reason + request.registrationId, + request.newStatus, + request.reason ).createAsyncCommands() return RegistrationHandlerResult( null, @@ -35,4 +40,34 @@ internal class PersistMemberRegistrationStateHandler( state: RegistrationState?, command: PersistMemberRegistrationState ): HoldingIdentity = command.member + + private fun PersistMemberRegistrationState.request(): SetOwnRegistrationStatusV2 { + val request = this.setStatusRequest + return when (request) { + is SetOwnRegistrationStatusV2 -> request + is SetOwnRegistrationStatusV1 -> SetOwnRegistrationStatusV2( + request.registrationId, + request.newStatus.toV2(), + null + ) + else -> throw IllegalArgumentException("Unknown request status '${request.javaClass}' received.") + } + } + + private fun RegistrationStatusV1.toV2(): RegistrationStatusV2 { + return when(this) { + RegistrationStatusV1.NEW -> RegistrationStatusV2.NEW + RegistrationStatusV1.SENT_TO_MGM -> RegistrationStatusV2.SENT_TO_MGM + RegistrationStatusV1.RECEIVED_BY_MGM -> RegistrationStatusV2.RECEIVED_BY_MGM + RegistrationStatusV1.PENDING_MEMBER_VERIFICATION -> RegistrationStatusV2.PENDING_MEMBER_VERIFICATION + RegistrationStatusV1.PENDING_MANUAL_APPROVAL -> RegistrationStatusV2.PENDING_MANUAL_APPROVAL + RegistrationStatusV1.PENDING_AUTO_APPROVAL -> RegistrationStatusV2.PENDING_AUTO_APPROVAL + RegistrationStatusV1.APPROVED -> RegistrationStatusV2.APPROVED + RegistrationStatusV1.DECLINED -> RegistrationStatusV2.DECLINED + RegistrationStatusV1.INVALID -> RegistrationStatusV2.INVALID + RegistrationStatusV1.FAILED -> RegistrationStatusV2.FAILED + else -> throw IllegalArgumentException("Unknown status '${this.name}' received.") + } + + } } diff --git a/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt b/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt index f7acb9236d3..8386019d3ad 100644 --- a/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt +++ b/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt @@ -38,13 +38,14 @@ class PersistMemberRegistrationStateHandlerTest { } doReturn operation } private val reason = "some reason" - val command = PersistMemberRegistrationState( + private val request = SetOwnRegistrationStatus( + UUID(1,2).toString(), + RegistrationStatus.DECLINED, + reason + ) + private val command = PersistMemberRegistrationState( HoldingIdentity("O=Alice, L=London, C=GB", "GroupId"), - SetOwnRegistrationStatus( - UUID(1,2).toString(), - RegistrationStatus.DECLINED, - reason - ) + request ) private val handler = PersistMemberRegistrationStateHandler( @@ -66,8 +67,8 @@ class PersistMemberRegistrationStateHandlerTest { verify(membershipPersistenceClient).setRegistrationRequestStatus( command.member.toCorda(), - command.setStatusRequest.registrationId, - command.setStatusRequest.newStatus, + request.registrationId, + request.newStatus, reason ) } diff --git a/gradle.properties b/gradle.properties index 5cf01b55410..5a2f4561c8c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,7 +46,7 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.76 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.1.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.37-beta+ +cordaApiVersion=5.1.0.38-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 From e86405d976df295a84b6ab5265a6610b0e167abe Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Fri, 17 Nov 2023 10:14:14 +0000 Subject: [PATCH 2/5] CORE-18370 Fix `findUnconsumedStatesByExactType` (#5111) The query for `findUnconsumedStatesByExactType` did not actually filter by `consumed IS NULL` so it returned all states regardless of if they were consumed or not. Added `consumed IS NULL` to fix this. --- .../query/registration/impl/VaultNamedQueryFactoryProvider.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt index b66e5e5b056..5b289ed5e95 100644 --- a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt +++ b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt @@ -48,7 +48,7 @@ class VaultNamedQueryFactoryProvider @Activate constructor( private fun registerPlatformQueries(vaultNamedQueryBuilderFactory: VaultNamedQueryBuilderFactory) { vaultNamedQueryBuilderFactory .create(FIND_UNCONSUMED_STATES_BY_EXACT_TYPE) - .whereJson("WHERE visible_states.type = :type") + .whereJson("WHERE visible_states.type = :type AND visible_states.consumed IS NULL") .register() } } From f441f27aa12a80f4ee31d814fd3987c148647ebd Mon Sep 17 00:00:00 2001 From: Yash Nabar Date: Fri, 17 Nov 2023 13:27:57 +0000 Subject: [PATCH 3/5] CORE-15267 Make MemberInfo serializable (#5127) Makes `MemberInfo` serializable by registering custom serializers for `MemberContext` and `MGMContext`. The corresponding corda-api change marks these interfaces as `@CordaSerializable`. --- gradle.properties | 2 +- libs/membership/membership-impl/build.gradle | 2 + .../serializer/amqp/MGMContextSerializer.kt | 72 +++++++++++++++++++ .../amqp/MemberContextSerializer.kt | 72 +++++++++++++++++++ 4 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt create mode 100644 libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt diff --git a/gradle.properties b/gradle.properties index 5a2f4561c8c..2f2b96c64e1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,7 +46,7 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.76 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.1.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.38-beta+ +cordaApiVersion=5.1.0.39-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/membership/membership-impl/build.gradle b/libs/membership/membership-impl/build.gradle index 96285545f43..6965bd5bd8b 100644 --- a/libs/membership/membership-impl/build.gradle +++ b/libs/membership/membership-impl/build.gradle @@ -17,7 +17,9 @@ dependencies { implementation project(':libs:crypto:crypto-core') implementation project(":libs:membership:membership-common") implementation project(":libs:utilities") + implementation project(':libs:sandbox-types') implementation project(":libs:serialization:serialization-avro") + implementation project(':libs:serialization:serialization-internal') implementation "net.corda:corda-avro-schema" implementation "net.corda:corda-base" diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt new file mode 100644 index 00000000000..b287f9aed0b --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MGMContextImpl +import net.corda.sandbox.type.SandboxConstants +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [SandboxConstants.CORDA_UNINJECTABLE_SERVICE], + scope = ServiceScope.PROTOTYPE +) +class MGMContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MGMContextImpl): MGMContextProxy { + return MGMContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MGMContextProxy): MGMContextImpl { + return when(proxy.version) { + VERSION_1 -> + MGMContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MGMContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MGMContextProxy::class.java + + override val type: Class + get() = MGMContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MGMContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MGMContextImpl] serialization. + */ + val map: Map +) diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt new file mode 100644 index 00000000000..a6b795e0bff --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MemberContextImpl +import net.corda.sandbox.type.SandboxConstants.CORDA_UNINJECTABLE_SERVICE +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [ CORDA_UNINJECTABLE_SERVICE ], + scope = ServiceScope.PROTOTYPE +) +class MemberContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MemberContextImpl): MemberContextProxy { + return MemberContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MemberContextProxy): MemberContextImpl { + return when(proxy.version) { + VERSION_1 -> + MemberContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MemberContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MemberContextProxy::class.java + + override val type: Class + get() = MemberContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MemberContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MemberContextImpl] serialization. + */ + val map: Map +) From bd1e42602d569d44ab1290b17dab4af794a97044 Mon Sep 17 00:00:00 2001 From: Ben Millar <44114751+ben-millar@users.noreply.github.com> Date: Fri, 17 Nov 2023 16:01:09 +0000 Subject: [PATCH 4/5] CORE-17917 Adding handling to MessageBusClient for async kafka errors (#5107) A bug was observed whereby Kafka producer errors were not being correctly reported back to the MultiSourceEventMediator. This is because our Kafka messages are essentially 'fire-and-forget'; when something goes wrong, Kafka will exceptionally close a future in the background, but we never attempted to capture or track this future. This PR adds a callback which will log this exception clearly for the user. --- .../messaging/mediator/MessageBusClient.kt | 36 +++++++- .../messaging/publisher/CordaPublisherImpl.kt | 2 +- .../mediator/MessageBusClientTest.kt | 85 ++++++++++++++++--- 3 files changed, 106 insertions(+), 17 deletions(-) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt index 116ca1579dd..8b2c43f8818 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt @@ -2,12 +2,14 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.util.concurrent.CompletableFuture class MessageBusClient( override val id: String, @@ -18,9 +20,31 @@ class MessageBusClient( private val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - override fun send(message: MediatorMessage<*>): MediatorMessage<*>? { - producer.send(message.toCordaProducerRecord(), null) - return null + override fun send(message: MediatorMessage<*>): MediatorMessage<*> { + val future = CompletableFuture() + val record = message.toCordaProducerRecord() + producer.send(record) { ex -> + setFutureFromResponse(ex, future, record.topic) + } + + return MediatorMessage(future) + } + + /** + * Helper function to set a [future] result based on the presence of an [exception] + */ + private fun setFutureFromResponse( + exception: Exception?, + future: CompletableFuture, + topic: String + ) { + if (exception == null) { + future.complete(Unit) + } else { + val message = "Producer clientId $id for topic $topic failed to send." + log.warn(message, exception) + future.completeExceptionally(CordaMessageAPIFatalException(message, exception)) + } } override fun close() { @@ -34,6 +58,9 @@ class MessageBusClient( } } +/** + * Helper function to convert a [MediatorMessage] of a specific format to a [CordaProducerRecord] + */ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, *> { return CordaProducerRecord( topic = this.getProperty(MSG_PROP_ENDPOINT), @@ -43,5 +70,8 @@ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, * ) } +/** + * Helper function to extract headers from message props + */ private fun Map.toHeaders() = map { (key, value) -> (key to value.toString()) } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt index 5c9f2646aca..fa0b94f75eb 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt @@ -232,7 +232,7 @@ internal class CordaPublisherImpl( if (!config.transactional) { future.complete(Unit) } else { - log.debug { "Asynchronous send completed completed successfully." } + log.debug { "Asynchronous send completed successfully." } } } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt index 58559d25ba4..1cf81e3a2e6 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt @@ -2,19 +2,26 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.v5.base.exceptions.CordaRuntimeException +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.mockito.Mockito +import org.mockito.Mockito.any +import org.mockito.Mockito.doAnswer import org.mockito.Mockito.times import org.mockito.kotlin.eq -import org.mockito.kotlin.isNull import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.util.concurrent.CompletableFuture class MessageBusClientTest { private companion object { @@ -31,6 +38,12 @@ class MessageBusClientTest { MSG_PROP_KEY to TEST_KEY, ) private val message: MediatorMessage = MediatorMessage("value", messageProps) + private val record: CordaProducerRecord<*, *> = CordaProducerRecord( + TEST_ENDPOINT, + TEST_KEY, + message.payload, + messageProps.toHeaders(), + ) @BeforeEach @@ -39,22 +52,26 @@ class MessageBusClientTest { messageBusClient = MessageBusClient("client-id", cordaProducer) } + @Suppress("UNCHECKED_CAST") @Test - fun testSend() { - messageBusClient.send(message) + fun `test send`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(null) + }.whenever(cordaProducer).send(eq(record), any()) - val expected = CordaProducerRecord( - TEST_ENDPOINT, - TEST_KEY, - message.payload, - messageProps.toHeaders(), - ) + val result = messageBusClient.send(message) as MediatorMessage> - verify(cordaProducer).send(eq(expected), isNull()) + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + result.payload?.let { + assertTrue(it.isDone) + assertFalse(it.isCompletedExceptionally) + } } @Test - fun testSendWithError() { + fun `send should handle synchronous error`() { val record = CordaProducerRecord( TEST_ENDPOINT, TEST_KEY, @@ -62,14 +79,56 @@ class MessageBusClientTest { messageProps.toHeaders(), ) - Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), isNull()) + Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), any()) assertThrows { messageBusClient.send(message) } } + @Suppress("UNCHECKED_CAST") + @Test + fun `send should handle asynchronous CordaMessageAPIFatalException`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaMessageAPIFatalException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + + @Suppress("UNCHECKED_CAST") + @Test + fun `send should wrap unknown exceptions`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaRuntimeException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + @Test - fun testClose() { + fun `test close`() { messageBusClient.close() verify(cordaProducer, times(1)).close() } From f3d86d1f1e391d6a0af2ed722991da46103c7f77 Mon Sep 17 00:00:00 2001 From: Ronan Browne Date: Tue, 21 Nov 2023 17:22:52 +0000 Subject: [PATCH 5/5] fix merge --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 32699ced673..71bc2d6df1a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -44,10 +44,10 @@ caffeineVersion = 3.1.6 commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 bouncycastleVersion=1.76 +# Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy cordaApiVersion=5.2.0.4-beta+ - disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 felixVersion=7.0.5