Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-18414: Merging forward updates from release/os/5.1 to release/os/5.2 - 2023-11-21 #5140

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class VaultNamedQueryFactoryProvider @Activate constructor(
private fun registerPlatformQueries(vaultNamedQueryBuilderFactory: VaultNamedQueryBuilderFactory) {
vaultNamedQueryBuilderFactory
.create(FIND_UNCONSUMED_STATES_BY_EXACT_TYPE)
.whereJson("WHERE visible_states.type = :type")
.whereJson("WHERE visible_states.type = :type AND visible_states.consumed IS NULL")
.register()
}
}
2 changes: 2 additions & 0 deletions libs/membership/membership-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ dependencies {
implementation project(':libs:crypto:crypto-core')
implementation project(":libs:membership:membership-common")
implementation project(":libs:utilities")
implementation project(':libs:sandbox-types')
implementation project(":libs:serialization:serialization-avro")
implementation project(':libs:serialization:serialization-internal')

implementation "net.corda:corda-avro-schema"
implementation "net.corda:corda-base"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package net.corda.membership.lib.impl.serializer.amqp

import net.corda.layeredpropertymap.LayeredPropertyMapFactory
import net.corda.membership.lib.impl.MGMContextImpl
import net.corda.sandbox.type.SandboxConstants
import net.corda.sandbox.type.UsedByFlow
import net.corda.sandbox.type.UsedByPersistence
import net.corda.sandbox.type.UsedByVerification
import net.corda.serialization.BaseProxySerializer
import net.corda.serialization.InternalCustomSerializer
import net.corda.v5.base.exceptions.CordaRuntimeException
import net.corda.v5.base.types.LayeredPropertyMap
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.osgi.service.component.annotations.ServiceScope

@Component(
service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ],
property = [SandboxConstants.CORDA_UNINJECTABLE_SERVICE],
scope = ServiceScope.PROTOTYPE
)
class MGMContextSerializer @Activate constructor(
@Reference(service = LayeredPropertyMapFactory::class)
private val layeredPropertyMapFactory: LayeredPropertyMapFactory,
) : BaseProxySerializer<MGMContextImpl, MGMContextProxy>(), UsedByFlow, UsedByPersistence, UsedByVerification {
private companion object {
private const val VERSION_1 = 1
}

override fun toProxy(obj: MGMContextImpl): MGMContextProxy {
return MGMContextProxy(
VERSION_1,
obj.toMap()
)
}

override fun fromProxy(proxy: MGMContextProxy): MGMContextImpl {
return when(proxy.version) {
VERSION_1 ->
MGMContextImpl(layeredPropertyMapFactory.createMap(proxy.map))
else ->
throw CordaRuntimeException("Unable to create MGMContextImpl with Version='${proxy.version}'")
}
}

override val proxyType: Class<MGMContextProxy>
get() = MGMContextProxy::class.java

override val type: Class<MGMContextImpl>
get() = MGMContextImpl::class.java

override val withInheritance: Boolean
get() = false

private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value }

}

/**
* The class that actually gets serialized on the wire.
*/
data class MGMContextProxy(
/**
* Version of container.
*/
val version: Int,
/**
* Properties for [MGMContextImpl] serialization.
*/
val map: Map<String, String?>
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package net.corda.membership.lib.impl.serializer.amqp

import net.corda.layeredpropertymap.LayeredPropertyMapFactory
import net.corda.membership.lib.impl.MemberContextImpl
import net.corda.sandbox.type.SandboxConstants.CORDA_UNINJECTABLE_SERVICE
import net.corda.sandbox.type.UsedByFlow
import net.corda.sandbox.type.UsedByPersistence
import net.corda.sandbox.type.UsedByVerification
import net.corda.serialization.BaseProxySerializer
import net.corda.serialization.InternalCustomSerializer
import net.corda.v5.base.exceptions.CordaRuntimeException
import net.corda.v5.base.types.LayeredPropertyMap
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.osgi.service.component.annotations.ServiceScope

@Component(
service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ],
property = [ CORDA_UNINJECTABLE_SERVICE ],
scope = ServiceScope.PROTOTYPE
)
class MemberContextSerializer @Activate constructor(
@Reference(service = LayeredPropertyMapFactory::class)
private val layeredPropertyMapFactory: LayeredPropertyMapFactory,
) : BaseProxySerializer<MemberContextImpl, MemberContextProxy>(), UsedByFlow, UsedByPersistence, UsedByVerification {
private companion object {
private const val VERSION_1 = 1
}

override fun toProxy(obj: MemberContextImpl): MemberContextProxy {
return MemberContextProxy(
VERSION_1,
obj.toMap()
)
}

override fun fromProxy(proxy: MemberContextProxy): MemberContextImpl {
return when(proxy.version) {
VERSION_1 ->
MemberContextImpl(layeredPropertyMapFactory.createMap(proxy.map))
else ->
throw CordaRuntimeException("Unable to create MemberContextImpl with Version='${proxy.version}'")
}
}

override val proxyType: Class<MemberContextProxy>
get() = MemberContextProxy::class.java

override val type: Class<MemberContextImpl>
get() = MemberContextImpl::class.java

override val withInheritance: Boolean
get() = false

private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value }

}

/**
* The class that actually gets serialized on the wire.
*/
data class MemberContextProxy(
/**
* Version of container.
*/
val version: Int,
/**
* Properties for [MemberContextImpl] serialization.
*/
val map: Map<String, String?>
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package net.corda.messaging.mediator

import net.corda.messagebus.api.producer.CordaProducer
import net.corda.messagebus.api.producer.CordaProducerRecord
import net.corda.messaging.api.exception.CordaMessageAPIFatalException
import net.corda.messaging.api.mediator.MediatorMessage
import net.corda.messaging.api.mediator.MessagingClient
import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT
import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.CompletableFuture

class MessageBusClient(
override val id: String,
Expand All @@ -18,9 +20,31 @@ class MessageBusClient(
private val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

override fun send(message: MediatorMessage<*>): MediatorMessage<*>? {
producer.send(message.toCordaProducerRecord(), null)
return null
override fun send(message: MediatorMessage<*>): MediatorMessage<*> {
val future = CompletableFuture<Unit>()
val record = message.toCordaProducerRecord()
producer.send(record) { ex ->
setFutureFromResponse(ex, future, record.topic)
}

return MediatorMessage(future)
}

/**
* Helper function to set a [future] result based on the presence of an [exception]
*/
private fun setFutureFromResponse(
exception: Exception?,
future: CompletableFuture<Unit>,
topic: String
) {
if (exception == null) {
future.complete(Unit)
} else {
val message = "Producer clientId $id for topic $topic failed to send."
log.warn(message, exception)
future.completeExceptionally(CordaMessageAPIFatalException(message, exception))
}
}

override fun close() {
Expand All @@ -34,6 +58,9 @@ class MessageBusClient(
}
}

/**
* Helper function to convert a [MediatorMessage] of a specific format to a [CordaProducerRecord]
*/
private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, *> {
return CordaProducerRecord(
topic = this.getProperty<String>(MSG_PROP_ENDPOINT),
Expand All @@ -43,5 +70,8 @@ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, *
)
}

/**
* Helper function to extract headers from message props
*/
private fun Map<String, Any>.toHeaders() =
map { (key, value) -> (key to value.toString()) }
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ internal class CordaPublisherImpl(
if (!config.transactional) {
future.complete(Unit)
} else {
log.debug { "Asynchronous send completed completed successfully." }
log.debug { "Asynchronous send completed successfully." }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,26 @@ package net.corda.messaging.mediator

import net.corda.messagebus.api.producer.CordaProducer
import net.corda.messagebus.api.producer.CordaProducerRecord
import net.corda.messaging.api.exception.CordaMessageAPIFatalException
import net.corda.messaging.api.mediator.MediatorMessage
import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT
import net.corda.v5.base.exceptions.CordaRuntimeException
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.mockito.Mockito
import org.mockito.Mockito.any
import org.mockito.Mockito.doAnswer
import org.mockito.Mockito.times
import org.mockito.kotlin.eq
import org.mockito.kotlin.isNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.util.concurrent.CompletableFuture

class MessageBusClientTest {
private companion object {
Expand All @@ -31,6 +38,12 @@ class MessageBusClientTest {
MSG_PROP_KEY to TEST_KEY,
)
private val message: MediatorMessage<Any> = MediatorMessage("value", messageProps)
private val record: CordaProducerRecord<*, *> = CordaProducerRecord(
TEST_ENDPOINT,
TEST_KEY,
message.payload,
messageProps.toHeaders(),
)


@BeforeEach
Expand All @@ -39,37 +52,83 @@ class MessageBusClientTest {
messageBusClient = MessageBusClient("client-id", cordaProducer)
}

@Suppress("UNCHECKED_CAST")
@Test
fun testSend() {
messageBusClient.send(message)
fun `test send`() {
doAnswer {
val callback = it.getArgument<CordaProducer.Callback>(1)
callback.onCompletion(null)
}.whenever(cordaProducer).send(eq(record), any())

val expected = CordaProducerRecord(
TEST_ENDPOINT,
TEST_KEY,
message.payload,
messageProps.toHeaders(),
)
val result = messageBusClient.send(message) as MediatorMessage<CompletableFuture<Unit>>

verify(cordaProducer).send(eq(expected), isNull())
verify(cordaProducer).send(eq(record), any())
assertNotNull(result.payload)
result.payload?.let {
assertTrue(it.isDone)
assertFalse(it.isCompletedExceptionally)
}
}

@Test
fun testSendWithError() {
fun `send should handle synchronous error`() {
val record = CordaProducerRecord(
TEST_ENDPOINT,
TEST_KEY,
message.payload,
messageProps.toHeaders(),
)

Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), isNull())
Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), any())
assertThrows<CordaRuntimeException> {
messageBusClient.send(message)
}
}

@Suppress("UNCHECKED_CAST")
@Test
fun `send should handle asynchronous CordaMessageAPIFatalException`() {
doAnswer {
val callback = it.getArgument<CordaProducer.Callback>(1)
callback.onCompletion(CordaMessageAPIFatalException("test"))
}.whenever(cordaProducer).send(eq(record), any())

val result = messageBusClient.send(message) as MediatorMessage<CompletableFuture<Unit>>

verify(cordaProducer).send(eq(record), any())
assertNotNull(result.payload)

result.payload?.isCompletedExceptionally?.let { assertTrue(it) }

result.payload?.handle { _, exception ->
assertTrue(exception is CordaMessageAPIFatalException)
assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message)
}?.get()
}

@Suppress("UNCHECKED_CAST")
@Test
fun `send should wrap unknown exceptions`() {
doAnswer {
val callback = it.getArgument<CordaProducer.Callback>(1)
callback.onCompletion(CordaRuntimeException("test"))
}.whenever(cordaProducer).send(eq(record), any())

val result = messageBusClient.send(message) as MediatorMessage<CompletableFuture<Unit>>

verify(cordaProducer).send(eq(record), any())
assertNotNull(result.payload)

result.payload?.isCompletedExceptionally?.let { assertTrue(it) }

result.payload?.handle { _, exception ->
assertTrue(exception is CordaMessageAPIFatalException)
assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message)
}?.get()
}

@Test
fun testClose() {
fun `test close`() {
messageBusClient.close()
verify(cordaProducer, times(1)).close()
}
Expand Down