From 040c4be0f254eebad3976e5fad27d15d99da4bc4 Mon Sep 17 00:00:00 2001 From: Emily Bowe Date: Fri, 13 Oct 2023 11:21:06 +0100 Subject: [PATCH] CORE-16234: For persistence api add endpoint and change naming of existing endpoint (#4774) For the RPC Epic the Persistence api will require two endpoints. One endpoint is already merged which relates to ledger-related persistence. This PR will be making a suggested change to the naming of that endpoint for clarity, and will also add the second endpoint required. When this work is merged we will have the endpoints: /ledger - for ledger persistence /persistence - for entity persistence --- ...rsistenceRequestSubscriptionFactoryImpl.kt | 6 +- .../VerificationSubscriptionFactoryImpl.kt | 4 +- .../build.gradle | 1 + .../impl/tests/PersistenceExceptionTests.kt | 23 +-- .../tests/PersistenceServiceInternalTests.kt | 25 +-- .../impl/tests/helpers/Utils.kt | 14 ++ .../impl/EntityProcessorImpl.kt | 27 --- ...> EntityRequestSubscriptionFactoryImpl.kt} | 37 +++- .../impl/FlowPersistenceServiceImpl.kt | 44 +++-- .../impl/internal/EntityRequestProcessor.kt | 79 ++++++++ .../internal/EntityRpcRequestProcessor.kt | 50 +++++ ...essageProcessor.kt => ProcessorService.kt} | 171 ++++++------------ .../entityprocessor/impl/internal/Utils.kt | 8 + .../impl/EntityProcessorFactoryImplTest.kt | 16 -- ...ntityRequestSubscriptionFactoryImplTest.kt | 81 +++++++++ .../entity-processor-service/build.gradle | 5 + .../corda/entityprocessor/EntityProcessor.kt | 5 - .../entityprocessor/EntityProcessorFactory.kt | 13 -- .../EntityRequestSubscriptionFactory.kt | 26 +++ .../BatchedUniquenessCheckerLifecycleImpl.kt | 4 +- 20 files changed, 406 insertions(+), 233 deletions(-) create mode 100644 components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/helpers/Utils.kt delete mode 100644 components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityProcessorImpl.kt rename components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/{EntityProcessorFactoryImpl.kt => EntityRequestSubscriptionFactoryImpl.kt} (53%) create mode 100644 components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRequestProcessor.kt create mode 100644 components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRpcRequestProcessor.kt rename components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/{EntityMessageProcessor.kt => ProcessorService.kt} (53%) create mode 100644 components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/Utils.kt delete mode 100644 components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityProcessorFactoryImplTest.kt create mode 100644 components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityRequestSubscriptionFactoryImplTest.kt delete mode 100644 components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessor.kt delete mode 100644 components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessorFactory.kt create mode 100644 components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityRequestSubscriptionFactory.kt diff --git a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/processor/impl/LedgerPersistenceRequestSubscriptionFactoryImpl.kt b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/processor/impl/LedgerPersistenceRequestSubscriptionFactoryImpl.kt index 186ffac292b..4448a05bc6a 100644 --- a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/processor/impl/LedgerPersistenceRequestSubscriptionFactoryImpl.kt +++ b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/processor/impl/LedgerPersistenceRequestSubscriptionFactoryImpl.kt @@ -36,8 +36,8 @@ class LedgerPersistenceRequestSubscriptionFactoryImpl @Activate constructor( ) : LedgerPersistenceRequestSubscriptionFactory { companion object { internal const val GROUP_NAME = "persistence.ledger.processor" - const val SUBSCRIPTION_NAME = "Persistence" - const val PERSISTENCE_PATH = "/persistence" + const val SUBSCRIPTION_NAME = "Ledger" + const val PATH = "/ledger" } override fun create(config: SmartConfig): Subscription { @@ -67,7 +67,7 @@ class LedgerPersistenceRequestSubscriptionFactoryImpl @Activate constructor( LedgerPersistenceRequest::class.java, FlowEvent::class.java ) - val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PERSISTENCE_PATH) + val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PATH) return subscriptionFactory.createHttpRPCSubscription(rpcConfig, processor) } } diff --git a/components/ledger/ledger-verification/src/main/kotlin/net/corda/ledger/verification/processor/impl/VerificationSubscriptionFactoryImpl.kt b/components/ledger/ledger-verification/src/main/kotlin/net/corda/ledger/verification/processor/impl/VerificationSubscriptionFactoryImpl.kt index 8ccce636f81..e667596f645 100644 --- a/components/ledger/ledger-verification/src/main/kotlin/net/corda/ledger/verification/processor/impl/VerificationSubscriptionFactoryImpl.kt +++ b/components/ledger/ledger-verification/src/main/kotlin/net/corda/ledger/verification/processor/impl/VerificationSubscriptionFactoryImpl.kt @@ -31,7 +31,7 @@ class VerificationSubscriptionFactoryImpl @Activate constructor( companion object { internal const val GROUP_NAME = "verification.ledger.processor" const val SUBSCRIPTION_NAME = "Verification" - const val VERIFICATION_PATH = "/verification" + const val PATH = "/verification" } override fun create(config: SmartConfig): Subscription { @@ -61,7 +61,7 @@ class VerificationSubscriptionFactoryImpl @Activate constructor( TransactionVerificationRequest::class.java, FlowEvent::class.java ) - val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, VERIFICATION_PATH) + val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PATH) return subscriptionFactory.createHttpRPCSubscription(rpcConfig, processor) } diff --git a/components/persistence/entity-processor-service-impl/build.gradle b/components/persistence/entity-processor-service-impl/build.gradle index 17bf5e0b876..c333ff49c66 100644 --- a/components/persistence/entity-processor-service-impl/build.gradle +++ b/components/persistence/entity-processor-service-impl/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation project(":libs:serialization:serialization-avro") implementation 'net.corda:corda-application' + implementation 'net.corda:corda-avro-schema' implementation 'net.corda:corda-config-schema' implementation 'net.corda:corda-db-schema' implementation 'net.corda:corda-topic-schema' diff --git a/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceExceptionTests.kt b/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceExceptionTests.kt index f735b9a9e48..c83c27ed9b3 100644 --- a/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceExceptionTests.kt +++ b/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceExceptionTests.kt @@ -22,7 +22,9 @@ import net.corda.db.persistence.testkit.helpers.Resources import net.corda.db.persistence.testkit.helpers.SandboxHelper.createDog import net.corda.db.persistence.testkit.helpers.SandboxHelper.getDogClass import net.corda.db.schema.DbSchema -import net.corda.entityprocessor.impl.internal.EntityMessageProcessor +import net.corda.entityprocessor.impl.internal.EntityRequestProcessor +import net.corda.entityprocessor.impl.tests.helpers.assertEventResponseWithError +import net.corda.entityprocessor.impl.tests.helpers.assertEventResponseWithoutError import net.corda.flow.external.events.responses.exceptions.CpkNotAvailableException import net.corda.flow.external.events.responses.exceptions.VirtualNodeException import net.corda.flow.utils.toKeyValuePairList @@ -49,8 +51,6 @@ import net.corda.virtualnode.toAvro import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotNull -import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance @@ -105,7 +105,7 @@ class PersistenceExceptionTests { private lateinit var dbConnectionManager: FakeDbConnectionManager private lateinit var entitySandboxService: EntitySandboxService - private lateinit var processor: EntityMessageProcessor + private lateinit var processor: EntityRequestProcessor private lateinit var virtualNodeInfo: VirtualNodeInfo private lateinit var cpkFileHashes: Set @@ -157,7 +157,7 @@ class PersistenceExceptionTests { virtualNodeInfoReadService, dbConnectionManager ) - processor = EntityMessageProcessor( + processor = EntityRequestProcessor( currentSandboxGroupContext, entitySandboxService, responseFactory, @@ -217,7 +217,7 @@ class PersistenceExceptionTests { dbConnectionManager ) - val processor = EntityMessageProcessor( + val processor = EntityRequestProcessor( currentSandboxGroupContext, brokenEntitySandboxService, responseFactory, @@ -286,6 +286,8 @@ class PersistenceExceptionTests { createDogDb(DOGS_TABLE_WITHOUT_PK) val persistEntitiesRequest = createDogPersistRequest() + val initialDogDbCount = getDogDbCount(virtualNodeInfo.vaultDmlConnectionId) + val record1 = processor.onNext(listOf(Record(TOPIC, UUID.randomUUID().toString(), persistEntitiesRequest))) assertEventResponseWithoutError(record1.single()) // duplicate request @@ -294,7 +296,7 @@ class PersistenceExceptionTests { val dogDbCount = getDogDbCount(virtualNodeInfo.vaultDmlConnectionId) // There shouldn't be a dog duplicate entry in the DB, i.e. dogs count in the DB should still be 1 - assertEquals(1, dogDbCount) + assertEquals(initialDogDbCount + 1, dogDbCount) } @Test @@ -384,10 +386,3 @@ class PersistenceExceptionTests { } } -private fun assertEventResponseWithoutError(record: Record<*, *>) { - assertNull(((record.value as FlowEvent).payload as ExternalEventResponse).error) -} - -private fun assertEventResponseWithError(record: Record<*, *>) { - assertNotNull(((record.value as FlowEvent).payload as ExternalEventResponse).error) -} \ No newline at end of file diff --git a/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceServiceInternalTests.kt b/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceServiceInternalTests.kt index 15f18d2401f..1e725884113 100644 --- a/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceServiceInternalTests.kt +++ b/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/PersistenceServiceInternalTests.kt @@ -1,15 +1,14 @@ package net.corda.entityprocessor.impl.tests -import net.corda.cpiinfo.read.CpiInfoReadService -import net.corda.cpk.read.CpkReadService import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializationFactory +import net.corda.cpiinfo.read.CpiInfoReadService +import net.corda.cpk.read.CpkReadService import net.corda.data.KeyValuePairList import net.corda.data.flow.event.FlowEvent import net.corda.data.flow.event.external.ExternalEventContext import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.event.external.ExternalEventResponseErrorType -import net.corda.v5.application.flows.FlowContextPropertyKeys.CPK_FILE_CHECKSUM import net.corda.data.persistence.DeleteEntities import net.corda.data.persistence.DeleteEntitiesById import net.corda.data.persistence.EntityRequest @@ -35,7 +34,7 @@ import net.corda.db.persistence.testkit.helpers.SandboxHelper.getCatClass import net.corda.db.persistence.testkit.helpers.SandboxHelper.getDogClass import net.corda.db.persistence.testkit.helpers.SandboxHelper.getOwnerClass import net.corda.db.schema.DbSchema -import net.corda.entityprocessor.impl.internal.EntityMessageProcessor +import net.corda.entityprocessor.impl.internal.EntityRequestProcessor import net.corda.entityprocessor.impl.internal.PersistenceServiceInternal import net.corda.entityprocessor.impl.internal.getClass import net.corda.entityprocessor.impl.tests.helpers.AnimalCreator.createCats @@ -56,6 +55,7 @@ import net.corda.test.util.dsl.entities.cpx.getCpkFileHashes import net.corda.testing.sandboxes.SandboxSetup import net.corda.testing.sandboxes.fetchService import net.corda.testing.sandboxes.lifecycle.EachTestLifecycle +import net.corda.v5.application.flows.FlowContextPropertyKeys.CPK_FILE_CHECKSUM import net.corda.virtualnode.VirtualNodeInfo import net.corda.virtualnode.read.VirtualNodeInfoReadService import net.corda.virtualnode.toAvro @@ -81,11 +81,6 @@ import java.time.ZoneOffset import java.util.UUID import javax.persistence.EntityManagerFactory -sealed class QuerySetup { - data class NamedQuery(val params: Map, val query: String = "Dog.summon") : QuerySetup() - data class All(val className: String) : QuerySetup() -} - /** * To use Postgres rather than in-memory (HSQL): * @@ -96,6 +91,12 @@ sealed class QuerySetup { * Rather than creating a new serializer in these tests from scratch, * we grab a reference to the one in the sandbox and use that to serialize and de-serialize. */ + +sealed class QuerySetup { + data class NamedQuery(val params: Map, val query: String = "Dog.summon") : QuerySetup() + data class All(val className: String) : QuerySetup() +} + @ExtendWith(ServiceExtension::class, BundleContextExtension::class, DBSetup::class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) class PersistenceServiceInternalTests { @@ -285,7 +286,7 @@ class PersistenceServiceInternalTests { } ) - val processor = EntityMessageProcessor( + val processor = EntityRequestProcessor( currentSandboxGroupContext, myEntitySandboxService, responseFactory, @@ -1015,8 +1016,8 @@ class PersistenceServiceInternalTests { private fun SandboxGroupContext.deserialize(bytes: ByteBuffer) = getSerializationService().deserialize(bytes.array(), Any::class.java) - private fun getMessageProcessor(payloadCheck: (bytes: ByteBuffer) -> ByteBuffer): EntityMessageProcessor { - return EntityMessageProcessor( + private fun getMessageProcessor(payloadCheck: (bytes: ByteBuffer) -> ByteBuffer): EntityRequestProcessor { + return EntityRequestProcessor( currentSandboxGroupContext, entitySandboxService, responseFactory, diff --git a/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/helpers/Utils.kt b/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/helpers/Utils.kt new file mode 100644 index 00000000000..5113f3b0526 --- /dev/null +++ b/components/persistence/entity-processor-service-impl/src/integrationTest/kotlin/net/corda/entityprocessor/impl/tests/helpers/Utils.kt @@ -0,0 +1,14 @@ +package net.corda.entityprocessor.impl.tests.helpers + +import net.corda.data.flow.event.FlowEvent +import net.corda.data.flow.event.external.ExternalEventResponse +import net.corda.messaging.api.records.Record +import org.junit.jupiter.api.Assertions + +fun assertEventResponseWithoutError(record: Record<*, *>) { + Assertions.assertNull(((record.value as FlowEvent).payload as ExternalEventResponse).error) +} + +fun assertEventResponseWithError(record: Record<*, *>) { + Assertions.assertNotNull(((record.value as FlowEvent).payload as ExternalEventResponse).error) +} \ No newline at end of file diff --git a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityProcessorImpl.kt b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityProcessorImpl.kt deleted file mode 100644 index 62f2c643c07..00000000000 --- a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityProcessorImpl.kt +++ /dev/null @@ -1,27 +0,0 @@ -package net.corda.entityprocessor.impl - -import net.corda.data.persistence.EntityRequest -import net.corda.entityprocessor.EntityProcessor -import net.corda.messaging.api.subscription.Subscription -import org.osgi.service.component.annotations.Component - -/** - * Entity processor. - * Starts the subscription, which in turn passes the messages to the - * [net.corda.entityprocessor.impl.internal.EntityMessageProcessor]. - */ -@Component(service = [EntityProcessor::class]) -class EntityProcessorImpl( - private val subscription: Subscription -) : - EntityProcessor { - override val isRunning: Boolean - get() = subscription.isRunning - - override fun start() = subscription.start() - - // It is important to call `subscription.close()` rather than `subscription.stop()` as the latter does not remove - // Lifecycle coordinator from the registry, causing it to appear there in `DOWN` state. This will in turn fail - // overall Health check's `status` check. - override fun stop() = subscription.close() -} diff --git a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityProcessorFactoryImpl.kt b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityRequestSubscriptionFactoryImpl.kt similarity index 53% rename from components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityProcessorFactoryImpl.kt rename to components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityRequestSubscriptionFactoryImpl.kt index 124f60cff65..cd6a437449d 100644 --- a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityProcessorFactoryImpl.kt +++ b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/EntityRequestSubscriptionFactoryImpl.kt @@ -1,10 +1,15 @@ package net.corda.entityprocessor.impl -import net.corda.entityprocessor.EntityProcessor -import net.corda.entityprocessor.EntityProcessorFactory -import net.corda.entityprocessor.impl.internal.EntityMessageProcessor +import net.corda.data.flow.event.FlowEvent +import net.corda.data.persistence.EntityRequest +import net.corda.entityprocessor.EntityRequestSubscriptionFactory +import net.corda.entityprocessor.impl.internal.EntityRequestProcessor +import net.corda.entityprocessor.impl.internal.EntityRpcRequestProcessor import net.corda.libs.configuration.SmartConfig +import net.corda.messaging.api.subscription.RPCSubscription +import net.corda.messaging.api.subscription.Subscription import net.corda.messaging.api.subscription.config.SubscriptionConfig +import net.corda.messaging.api.subscription.config.SyncRPCConfig import net.corda.messaging.api.subscription.factory.SubscriptionFactory import net.corda.persistence.common.EntitySandboxService import net.corda.persistence.common.PayloadChecker @@ -16,8 +21,8 @@ import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @Suppress("UNUSED") -@Component(service = [EntityProcessorFactory::class]) -class EntityProcessorFactoryImpl @Activate constructor( +@Component(service = [EntityRequestSubscriptionFactory::class]) +class EntityRequestSubscriptionFactoryImpl @Activate constructor( @Reference(service = CurrentSandboxGroupContext::class) private val currentSandboxGroupContext: CurrentSandboxGroupContext, @Reference(service = SubscriptionFactory::class) @@ -26,28 +31,40 @@ class EntityProcessorFactoryImpl @Activate constructor( private val entitySandboxService: EntitySandboxService, @Reference(service = ResponseFactory::class) private val responseFactory: ResponseFactory -) : EntityProcessorFactory { +) : EntityRequestSubscriptionFactory { companion object { internal const val GROUP_NAME = "persistence.entity.processor" + const val SUBSCRIPTION_NAME = "Persistence" + const val PATH = "/persistence" } - override fun create(config: SmartConfig): EntityProcessor { + override fun create(config: SmartConfig): Subscription { val subscriptionConfig = SubscriptionConfig(GROUP_NAME, Schemas.Persistence.PERSISTENCE_ENTITY_PROCESSOR_TOPIC) - val processor = EntityMessageProcessor( + val processor = EntityRequestProcessor( currentSandboxGroupContext, entitySandboxService, responseFactory, PayloadChecker(config)::checkSize ) - val subscription = subscriptionFactory.createDurableSubscription( + return subscriptionFactory.createDurableSubscription( subscriptionConfig, processor, config, null ) + } - return EntityProcessorImpl(subscription) + override fun createRpcSubscription(): RPCSubscription { + val processor = EntityRpcRequestProcessor( + currentSandboxGroupContext, + entitySandboxService, + responseFactory, + EntityRequest::class.java, + FlowEvent::class.java + ) + val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PATH) + return subscriptionFactory.createHttpRPCSubscription(rpcConfig, processor) } } diff --git a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/FlowPersistenceServiceImpl.kt b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/FlowPersistenceServiceImpl.kt index b4b369a791a..fed025b22c4 100644 --- a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/FlowPersistenceServiceImpl.kt +++ b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/FlowPersistenceServiceImpl.kt @@ -3,8 +3,8 @@ package net.corda.entityprocessor.impl import net.corda.configuration.read.ConfigChangedEvent import net.corda.configuration.read.ConfigurationReadService import net.corda.cpiinfo.read.CpiInfoReadService -import net.corda.entityprocessor.EntityProcessor -import net.corda.entityprocessor.EntityProcessorFactory +import net.corda.data.persistence.EntityRequest +import net.corda.entityprocessor.EntityRequestSubscriptionFactory import net.corda.entityprocessor.FlowPersistenceService import net.corda.libs.configuration.helper.getConfig import net.corda.lifecycle.DependentComponents @@ -15,8 +15,8 @@ import net.corda.lifecycle.LifecycleStatus import net.corda.lifecycle.RegistrationStatusChangeEvent import net.corda.lifecycle.Resource import net.corda.lifecycle.StartEvent -import net.corda.lifecycle.StopEvent import net.corda.lifecycle.createCoordinator +import net.corda.messaging.api.subscription.Subscription import net.corda.sandboxgroupcontext.service.SandboxGroupContextComponent import net.corda.schema.configuration.ConfigKeys.BOOT_CONFIG import net.corda.schema.configuration.ConfigKeys.MESSAGING_CONFIG @@ -40,14 +40,15 @@ class FlowPersistenceServiceImpl @Activate constructor( private val virtualNodeInfoReadService: VirtualNodeInfoReadService, @Reference(service = CpiInfoReadService::class) private val cpiInfoReadService: CpiInfoReadService, - @Reference(service = EntityProcessorFactory::class) - private val entityProcessorFactory: EntityProcessorFactory + @Reference(service = EntityRequestSubscriptionFactory::class) + private val entityRequestSubscriptionFactory: EntityRequestSubscriptionFactory ) : FlowPersistenceService { private var configHandle: Resource? = null - private var entityProcessor: EntityProcessor? = null + private var entityProcessorSubscription: Subscription? = null companion object { private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + const val RPC_SUBSCRIPTION = "RPC_SUBSCRIPTION" } private val dependentComponents = DependentComponents.of( @@ -56,7 +57,7 @@ class FlowPersistenceServiceImpl @Activate constructor( ::virtualNodeInfoReadService, ::cpiInfoReadService, ) - private val coordinator = coordinatorFactory.createCoordinator(dependentComponents, ::eventHandler) + private val lifecycleCoordinator = coordinatorFactory.createCoordinator(dependentComponents, ::eventHandler) private fun eventHandler(event: LifecycleEvent, coordinator: LifecycleCoordinator) { logger.debug { "FlowPersistenceService received: $event" } @@ -66,39 +67,46 @@ class FlowPersistenceServiceImpl @Activate constructor( } is RegistrationStatusChangeEvent -> { if (event.status == LifecycleStatus.UP) { + configHandle?.close() configHandle = configurationReadService.registerComponentForUpdates( coordinator, setOf(BOOT_CONFIG, MESSAGING_CONFIG) ) + initialiseRpcSubscription() } else { - configHandle?.close() + coordinator.updateStatus(event.status) } } is ConfigChangedEvent -> { - entityProcessor?.stop() - val newEntityProcessor = entityProcessorFactory.create( + entityProcessorSubscription?.close() + val newEntityProcessorSubscription = entityRequestSubscriptionFactory.create( event.config.getConfig(MESSAGING_CONFIG) ) logger.debug("Starting EntityProcessor.") - newEntityProcessor.start() - entityProcessor = newEntityProcessor + newEntityProcessorSubscription.start() + entityProcessorSubscription = newEntityProcessorSubscription coordinator.updateStatus(LifecycleStatus.UP) } - is StopEvent -> { - entityProcessor?.stop() - logger.debug { "Stopping EntityProcessor." } + } + } + + private fun initialiseRpcSubscription() { + val subscription = entityRequestSubscriptionFactory.createRpcSubscription() + lifecycleCoordinator.createManagedResource(RPC_SUBSCRIPTION) { + subscription.also { + it.start() } } } override val isRunning: Boolean - get() = coordinator.isRunning + get() = lifecycleCoordinator.isRunning override fun start() { - coordinator.start() + lifecycleCoordinator.start() } override fun stop() { - coordinator.stop() + lifecycleCoordinator.stop() } } diff --git a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRequestProcessor.kt b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRequestProcessor.kt new file mode 100644 index 00000000000..a9a35613f62 --- /dev/null +++ b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRequestProcessor.kt @@ -0,0 +1,79 @@ +package net.corda.entityprocessor.impl.internal + +import net.corda.data.persistence.EntityRequest +import net.corda.data.persistence.EntityResponse +import net.corda.libs.virtualnode.datamodel.repository.RequestsIdsRepository +import net.corda.libs.virtualnode.datamodel.repository.RequestsIdsRepositoryImpl +import net.corda.messaging.api.processor.DurableProcessor +import net.corda.messaging.api.records.Record +import net.corda.metrics.CordaMetrics +import net.corda.persistence.common.EntitySandboxService +import net.corda.persistence.common.ResponseFactory +import net.corda.sandboxgroupcontext.CurrentSandboxGroupContext +import net.corda.tracing.traceEventProcessingNullableSingle +import net.corda.utilities.debug +import org.slf4j.LoggerFactory +import java.nio.ByteBuffer +import java.time.Duration +import java.time.Instant + + +/** + * Handles incoming requests, typically from the flow worker, and sends responses. + * + * The [EntityRequest] contains the request and a typed payload. + * + * The [EntityResponse] contains the response or an exception-like payload whose presence indicates + * an error has occurred. + * + * [payloadCheck] is called against each AMQP payload in the result (not the entire Avro array of results) + */ +class EntityRequestProcessor( + private val currentSandboxGroupContext: CurrentSandboxGroupContext, + private val entitySandboxService: EntitySandboxService, + private val responseFactory: ResponseFactory, + private val payloadCheck: (bytes: ByteBuffer) -> ByteBuffer, + private val requestsIdsRepository: RequestsIdsRepository = RequestsIdsRepositoryImpl() +) : DurableProcessor { + + private companion object { + val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + val processorService = ProcessorService() + } + + override val keyClass = String::class.java + + override val valueClass = EntityRequest::class.java + + override fun onNext(events: List>): List> { + logger.debug { "onNext processing messages ${events.joinToString(",") { it.key }}" } + + return events.mapNotNull { event -> + val request = event.value + if (request == null) { + // We received a [null] external event therefore we do not know the flow id to respond to. + null + } else { + val eventType = request.request?.javaClass?.simpleName ?: "Unknown" + traceEventProcessingNullableSingle(event, "Crypto Event - $eventType") { + CordaMetrics.Metric.Db.EntityPersistenceRequestLag.builder() + .withTag(CordaMetrics.Tag.OperationName, request.request::class.java.name).build().record( + Duration.ofMillis(Instant.now().toEpochMilli() - event.timestamp) + ) + + // val persistenceServiceInternal = PersistenceServiceInternal(sandbox::getClass, payloadCheck) + + processorService.processEvent( + logger, + request, + entitySandboxService, + currentSandboxGroupContext, + responseFactory, + requestsIdsRepository, + payloadCheck + ) + } + } + } + } +} diff --git a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRpcRequestProcessor.kt b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRpcRequestProcessor.kt new file mode 100644 index 00000000000..531f323c0d4 --- /dev/null +++ b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityRpcRequestProcessor.kt @@ -0,0 +1,50 @@ +package net.corda.entityprocessor.impl.internal + +import net.corda.data.flow.event.FlowEvent +import net.corda.data.persistence.EntityRequest +import net.corda.data.persistence.EntityResponse +import net.corda.libs.virtualnode.datamodel.repository.RequestsIdsRepository +import net.corda.libs.virtualnode.datamodel.repository.RequestsIdsRepositoryImpl +import net.corda.messaging.api.processor.SyncRPCProcessor +import net.corda.persistence.common.EntitySandboxService +import net.corda.persistence.common.ResponseFactory +import net.corda.sandboxgroupcontext.CurrentSandboxGroupContext +import net.corda.utilities.debug +import org.slf4j.LoggerFactory + + +/** + * Handles incoming requests, typically from the flow worker, and sends responses. + * + * The [EntityRequest] contains the request and a typed payload. + * + * The [EntityResponse] contains the response or an exception-like payload whose presence indicates + * an error has occurred. + * + * [payloadCheck] is called against each AMQP payload in the result (not the entire Avro array of results) + */ + +@Suppress("LongParameterList") +class EntityRpcRequestProcessor( + private val currentSandboxGroupContext: CurrentSandboxGroupContext, + private val entitySandboxService: EntitySandboxService, + private val responseFactory: ResponseFactory, + override val requestClass: Class, + override val responseClass: Class, + private val requestsIdsRepository: RequestsIdsRepository = RequestsIdsRepositoryImpl() +) : SyncRPCProcessor { + + private companion object { + val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + val processorService = ProcessorService() + } + + override fun process(request: EntityRequest): FlowEvent { + logger.debug { "process processing request $request" } + + val record = processorService.processEvent( + logger, request, entitySandboxService, currentSandboxGroupContext, responseFactory, requestsIdsRepository + ) { it } + return record.value as FlowEvent + } +} diff --git a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityMessageProcessor.kt b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/ProcessorService.kt similarity index 53% rename from components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityMessageProcessor.kt rename to components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/ProcessorService.kt index 9e9b27a09a1..5324e79f17e 100644 --- a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/EntityMessageProcessor.kt +++ b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/ProcessorService.kt @@ -2,7 +2,6 @@ package net.corda.entityprocessor.impl.internal import net.corda.crypto.core.parseSecureHash import net.corda.data.KeyValuePairList -import net.corda.v5.application.flows.FlowContextPropertyKeys.CPK_FILE_CHECKSUM import net.corda.data.flow.event.FlowEvent import net.corda.data.persistence.DeleteEntities import net.corda.data.persistence.DeleteEntitiesById @@ -15,8 +14,6 @@ import net.corda.data.persistence.MergeEntities import net.corda.data.persistence.PersistEntities import net.corda.flow.utils.toMap import net.corda.libs.virtualnode.datamodel.repository.RequestsIdsRepository -import net.corda.libs.virtualnode.datamodel.repository.RequestsIdsRepositoryImpl -import net.corda.messaging.api.processor.DurableProcessor import net.corda.messaging.api.records.Record import net.corda.metrics.CordaMetrics import net.corda.orm.utils.transaction @@ -26,187 +23,140 @@ import net.corda.persistence.common.getEntityManagerFactory import net.corda.persistence.common.getSerializationService import net.corda.sandboxgroupcontext.CurrentSandboxGroupContext import net.corda.sandboxgroupcontext.SandboxGroupContext -import net.corda.tracing.traceEventProcessingNullableSingle import net.corda.utilities.MDC_CLIENT_ID import net.corda.utilities.MDC_EXTERNAL_EVENT_ID -import net.corda.utilities.debug import net.corda.utilities.translateFlowContextToMDC import net.corda.utilities.withMDC +import net.corda.v5.application.flows.FlowContextPropertyKeys import net.corda.v5.base.exceptions.CordaRuntimeException import net.corda.virtualnode.toCorda -import org.slf4j.LoggerFactory +import org.slf4j.Logger import java.nio.ByteBuffer import java.time.Duration -import java.time.Instant import java.util.UUID import javax.persistence.EntityManager import javax.persistence.PersistenceException -fun SandboxGroupContext.getClass(fullyQualifiedClassName: String) = - this.sandboxGroup.loadClassFromMainBundles(fullyQualifiedClassName) - -/** - * Handles incoming requests, typically from the flow worker, and sends responses. - * - * The [EntityRequest] contains the request and a typed payload. - * - * The [EntityResponse] contains the response or an exception-like payload whose presence indicates - * an error has occurred. - * - * [payloadCheck] is called against each AMQP payload in the result (not the entire Avro array of results) - */ -class EntityMessageProcessor( - private val currentSandboxGroupContext: CurrentSandboxGroupContext, - private val entitySandboxService: EntitySandboxService, - private val responseFactory: ResponseFactory, - private val payloadCheck: (bytes: ByteBuffer) -> ByteBuffer, - private val requestsIdsRepository: RequestsIdsRepository = RequestsIdsRepositoryImpl() -) : DurableProcessor { - private companion object { - val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) - } - - override val keyClass = String::class.java - - override val valueClass = EntityRequest::class.java - - override fun onNext(events: List>): List> { - logger.debug { "onNext processing messages ${events.joinToString(",") { it.key }}" } - - return events.mapNotNull { event -> - val request = event.value - if (request == null) { - // We received a [null] external event therefore we do not know the flow id to respond to. - null - } else { - val eventType = request.request?.javaClass?.simpleName ?: "Unknown" - traceEventProcessingNullableSingle(event, "Crypto Event - $eventType") { - CordaMetrics.Metric.Db.EntityPersistenceRequestLag.builder() - .withTag(CordaMetrics.Tag.OperationName, request.request::class.java.name) - .build() - .record( - Duration.ofMillis(Instant.now().toEpochMilli() - event.timestamp) - ) - processEvent(request) - } - } - } - } - - private fun processEvent(request: EntityRequest): Record<*, *> { - val clientRequestId = - request.flowExternalEventContext.contextProperties.toMap()[MDC_CLIENT_ID] ?: "" - - return withMDC( +@SuppressWarnings("LongParameterList") +class ProcessorService { + + fun processEvent( + logger: Logger, + request: EntityRequest, + entitySandboxService: EntitySandboxService, + currentSandboxGroupContext: CurrentSandboxGroupContext, + responseFactory: ResponseFactory, + requestsIdsRepository: RequestsIdsRepository, + payload: (bytes: ByteBuffer) -> ByteBuffer + ): Record<*, *> { + val startTime = System.nanoTime() + val clientRequestId = request.flowExternalEventContext.contextProperties.toMap()[MDC_CLIENT_ID] ?: "" + val holdingIdentity = request.holdingIdentity.toCorda() + + val result = withMDC( mapOf( - MDC_CLIENT_ID to clientRequestId, - MDC_EXTERNAL_EVENT_ID to request.flowExternalEventContext.requestId + MDC_CLIENT_ID to clientRequestId, MDC_EXTERNAL_EVENT_ID to request.flowExternalEventContext.requestId ) + translateFlowContextToMDC(request.flowExternalEventContext.contextProperties.toMap()) ) { - val startTime = System.nanoTime() var requestOutcome = "FAILED" try { - val holdingIdentity = request.holdingIdentity.toCorda() logger.info("Handling ${request.request::class.java.name} for holdingIdentity ${holdingIdentity.shortHash.value}") - val cpkFileHashes = request.flowExternalEventContext.contextProperties.items - .filter { it.key.startsWith(CPK_FILE_CHECKSUM) } - .map { it.value.toSecureHash() } - .toSet() + val cpkFileHashes = request.flowExternalEventContext.contextProperties.items.filter { + it.key.startsWith(FlowContextPropertyKeys.CPK_FILE_CHECKSUM) + }.map { it.value.toSecureHash() }.toSet() val sandbox = entitySandboxService.get(holdingIdentity, cpkFileHashes) currentSandboxGroupContext.set(sandbox) - processRequestWithSandbox(sandbox, request).also { requestOutcome = "SUCCEEDED" } + val persistenceServiceInternal = PersistenceServiceInternal(sandbox::getClass, payload) + + processRequestWithSandbox( + sandbox, request, responseFactory, persistenceServiceInternal, requestsIdsRepository + ).also { requestOutcome = "SUCCEEDED" } } catch (e: Exception) { responseFactory.errorResponse(request.flowExternalEventContext, e) } finally { + currentSandboxGroupContext.remove() + }.also { CordaMetrics.Metric.Db.EntityPersistenceRequestTime.builder() .withTag(CordaMetrics.Tag.OperationName, request.request::class.java.name) - .withTag(CordaMetrics.Tag.OperationStatus, requestOutcome) - .build() + .withTag(CordaMetrics.Tag.OperationStatus, requestOutcome).build() .record(Duration.ofNanos(System.nanoTime() - startTime)) - - currentSandboxGroupContext.remove() } } + return result } + private fun String.toSecureHash() = parseSecureHash(this) + @Suppress("ComplexMethod") private fun processRequestWithSandbox( sandbox: SandboxGroupContext, - request: EntityRequest + request: EntityRequest, + responseFactory: ResponseFactory, + persistenceServiceInternal: PersistenceServiceInternal, + requestsIdsRepository: RequestsIdsRepository ): Record { // get the per-sandbox entity manager and serialization services val entityManagerFactory = sandbox.getEntityManagerFactory() val serializationService = sandbox.getSerializationService() + val entityManager = entityManagerFactory.createEntityManager() - val persistenceServiceInternal = PersistenceServiceInternal(sandbox::getClass, payloadCheck) - - val em = entityManagerFactory.createEntityManager() return when (val entityRequest = request.request) { is PersistEntities -> { val requestId = UUID.fromString(request.flowExternalEventContext.requestId) - val entityResponse = withDeduplicationCheck( - requestId, - em, - onDuplication = { - EntityResponse(emptyList(), KeyValuePairList(emptyList()), null) - } - ) { + val entityResponse = withDeduplicationCheck(requestId, entityManager, onDuplication = { + EntityResponse(emptyList(), KeyValuePairList(emptyList()), null) + }, requestsIdsRepository) { persistenceServiceInternal.persist(serializationService, it, entityRequest) } responseFactory.successResponse( - request.flowExternalEventContext, - entityResponse + request.flowExternalEventContext, entityResponse ) } - is DeleteEntities -> em.transaction { + is DeleteEntities -> entityManager.transaction { responseFactory.successResponse( request.flowExternalEventContext, persistenceServiceInternal.deleteEntities(serializationService, it, entityRequest) ) } - is DeleteEntitiesById -> em.transaction { + is DeleteEntitiesById -> entityManager.transaction { responseFactory.successResponse( - request.flowExternalEventContext, - persistenceServiceInternal.deleteEntitiesByIds( - serializationService, - it, - entityRequest + request.flowExternalEventContext, persistenceServiceInternal.deleteEntitiesByIds( + serializationService, it, entityRequest ) ) } is MergeEntities -> { - val entityResponse = em.transaction { + val entityResponse = entityManager.transaction { persistenceServiceInternal.merge(serializationService, it, entityRequest) } responseFactory.successResponse( - request.flowExternalEventContext, - entityResponse + request.flowExternalEventContext, entityResponse ) } - is FindEntities -> em.transaction { + is FindEntities -> entityManager.transaction { responseFactory.successResponse( request.flowExternalEventContext, persistenceServiceInternal.find(serializationService, it, entityRequest) ) } - is FindAll -> em.transaction { + is FindAll -> entityManager.transaction { responseFactory.successResponse( request.flowExternalEventContext, persistenceServiceInternal.findAll(serializationService, it, entityRequest) ) } - is FindWithNamedQuery -> em.transaction { + is FindWithNamedQuery -> entityManager.transaction { responseFactory.successResponse( request.flowExternalEventContext, persistenceServiceInternal.findWithNamedQuery(serializationService, it, entityRequest) @@ -215,23 +165,21 @@ class EntityMessageProcessor( else -> { responseFactory.fatalErrorResponse( - request.flowExternalEventContext, - CordaRuntimeException("Unknown command") + request.flowExternalEventContext, CordaRuntimeException("Unknown command") ) } } } - private fun String.toSecureHash() = parseSecureHash(this) - // We should require requestId to be a UUID to avoid request ids collisions private fun withDeduplicationCheck( requestId: UUID, - em: EntityManager, + entityManager: EntityManager, onDuplication: () -> EntityResponse, - block: (EntityManager) -> EntityResponse, + requestsIdsRepository: RequestsIdsRepository, + block: (EntityManager) -> EntityResponse ): EntityResponse { - return em.transaction { + return entityManager.transaction { try { requestsIdsRepository.persist(requestId, it) it.flush() @@ -241,7 +189,8 @@ class EntityMessageProcessor( it.transaction.setRollbackOnly() return@transaction onDuplication() } - block(em) + block(entityManager) } } -} + +} \ No newline at end of file diff --git a/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/Utils.kt b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/Utils.kt new file mode 100644 index 00000000000..ed1ba37809a --- /dev/null +++ b/components/persistence/entity-processor-service-impl/src/main/kotlin/net/corda/entityprocessor/impl/internal/Utils.kt @@ -0,0 +1,8 @@ +package net.corda.entityprocessor.impl.internal + +import net.corda.sandboxgroupcontext.SandboxGroupContext + + +fun SandboxGroupContext.getClass(fullyQualifiedClassName: String) = + this.sandboxGroup.loadClassFromMainBundles(fullyQualifiedClassName) + diff --git a/components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityProcessorFactoryImplTest.kt b/components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityProcessorFactoryImplTest.kt deleted file mode 100644 index 1eeef77f5d6..00000000000 --- a/components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityProcessorFactoryImplTest.kt +++ /dev/null @@ -1,16 +0,0 @@ -package net.corda.entityprocessor.impl - -import net.corda.persistence.common.PayloadChecker -import net.corda.persistence.common.exceptions.KafkaMessageSizeException -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import java.nio.ByteBuffer - -internal class EntityProcessorFactoryImplTest { - @Test - fun `payload check throws if max bytes exceeded`() { - val maxSize = 1024 * 10 - val bytes = ByteBuffer.wrap(ByteArray(maxSize + 1)) - assertThrows { PayloadChecker(maxSize).checkSize(bytes) } - } -} diff --git a/components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityRequestSubscriptionFactoryImplTest.kt b/components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityRequestSubscriptionFactoryImplTest.kt new file mode 100644 index 00000000000..7fdef4dc5ba --- /dev/null +++ b/components/persistence/entity-processor-service-impl/src/test/kotlin/net/corda/entityprocessor/impl/EntityRequestSubscriptionFactoryImplTest.kt @@ -0,0 +1,81 @@ +package net.corda.entityprocessor.impl + +import net.corda.data.flow.event.FlowEvent +import net.corda.data.persistence.EntityRequest +import net.corda.libs.configuration.SmartConfig +import net.corda.messaging.api.processor.DurableProcessor +import net.corda.messaging.api.processor.SyncRPCProcessor +import net.corda.messaging.api.subscription.RPCSubscription +import net.corda.messaging.api.subscription.Subscription +import net.corda.messaging.api.subscription.config.SubscriptionConfig +import net.corda.messaging.api.subscription.factory.SubscriptionFactory +import net.corda.persistence.common.PayloadChecker +import net.corda.persistence.common.exceptions.KafkaMessageSizeException +import net.corda.schema.Schemas +import org.assertj.core.api.Assertions +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.mockito.kotlin.any +import org.mockito.kotlin.eq +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever +import java.nio.ByteBuffer + +internal class EntityRequestSubscriptionFactoryImplTest { + @Test + fun `payload check throws if max bytes exceeded`() { + val maxSize = 1024 * 10 + val bytes = ByteBuffer.wrap(ByteArray(maxSize + 1)) + assertThrows { PayloadChecker(maxSize).checkSize(bytes) } + } + + //TODO - add test for create and createRpcSubscription + + @Test + fun `factory creates kafka subscription`() { + val subscriptionFactory = mock() + val config = mock() + + val expectedSubscription = mock>() + val expectedSubscriptionConfig = SubscriptionConfig( + "persistence.entity.processor", + Schemas.Persistence.PERSISTENCE_ENTITY_PROCESSOR_TOPIC + ) + + whenever( + subscriptionFactory.createDurableSubscription( + eq(expectedSubscriptionConfig), + any>(), + eq(config), + eq(null) + ) + ).thenReturn(expectedSubscription) + + val target = EntityRequestSubscriptionFactoryImpl(mock(), subscriptionFactory, mock(), mock()) + + val result = target.create(config) + + Assertions.assertThat(result).isSameAs(expectedSubscription) + } + + @Test + fun `factory creates rpc subscription`() { + val subscriptionFactory = mock() + + val expectedSubscription = mock>() + + whenever( + subscriptionFactory.createHttpRPCSubscription( + any(), + any>(), + ) + ).thenReturn(expectedSubscription) + + val target = EntityRequestSubscriptionFactoryImpl(mock(), subscriptionFactory, mock(), mock()) + Assertions.assertThat(target).isNotNull + val result = target.createRpcSubscription() + Assertions.assertThat(result).isNotNull + Assertions.assertThat(result).isSameAs(expectedSubscription) + } + +} diff --git a/components/persistence/entity-processor-service/build.gradle b/components/persistence/entity-processor-service/build.gradle index ded51edd9bb..359259a35da 100644 --- a/components/persistence/entity-processor-service/build.gradle +++ b/components/persistence/entity-processor-service/build.gradle @@ -10,5 +10,10 @@ dependencies { api project(':libs:lifecycle:lifecycle') api project(':libs:configuration:configuration-core') + implementation 'org.jetbrains.kotlin:kotlin-osgi-bundle' + implementation platform("net.corda:corda-api:$cordaApiVersion") + implementation project(':libs:messaging:messaging') + + api "net.corda:corda-avro-schema" } diff --git a/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessor.kt b/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessor.kt deleted file mode 100644 index e0af3e9b60b..00000000000 --- a/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessor.kt +++ /dev/null @@ -1,5 +0,0 @@ -package net.corda.entityprocessor - -import net.corda.lifecycle.Lifecycle - -interface EntityProcessor : Lifecycle diff --git a/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessorFactory.kt b/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessorFactory.kt deleted file mode 100644 index 6b477b61e5b..00000000000 --- a/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityProcessorFactory.kt +++ /dev/null @@ -1,13 +0,0 @@ -package net.corda.entityprocessor - -import net.corda.libs.configuration.SmartConfig - -interface EntityProcessorFactory { - /** - * Create a new entity processor. - * - * This should be called from/wired into the db-processor start up. - */ - fun create(config: SmartConfig): EntityProcessor -} - diff --git a/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityRequestSubscriptionFactory.kt b/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityRequestSubscriptionFactory.kt new file mode 100644 index 00000000000..24c96df4640 --- /dev/null +++ b/components/persistence/entity-processor-service/src/main/kotlin/net/corda/entityprocessor/EntityRequestSubscriptionFactory.kt @@ -0,0 +1,26 @@ +package net.corda.entityprocessor + +import net.corda.data.flow.event.FlowEvent +import net.corda.data.persistence.EntityRequest +import net.corda.libs.configuration.SmartConfig +import net.corda.messaging.api.subscription.RPCSubscription +import net.corda.messaging.api.subscription.Subscription + +/** + * The [EntityRequestSubscriptionFactory] creates a new subscription to the durable topic used to receive + * [EntityRequest] messages. + */ +interface EntityRequestSubscriptionFactory { + /** + * Create a new subscription + * + * @param config Configuration for the subscription + * @return A new subscription for [EntityRequest] messages + */ + + fun create(config: SmartConfig): Subscription + + fun createRpcSubscription(): RPCSubscription + +} + diff --git a/components/uniqueness/uniqueness-checker-impl/src/main/kotlin/net/corda/uniqueness/checker/impl/BatchedUniquenessCheckerLifecycleImpl.kt b/components/uniqueness/uniqueness-checker-impl/src/main/kotlin/net/corda/uniqueness/checker/impl/BatchedUniquenessCheckerLifecycleImpl.kt index 06920a86a4f..04619cefe44 100644 --- a/components/uniqueness/uniqueness-checker-impl/src/main/kotlin/net/corda/uniqueness/checker/impl/BatchedUniquenessCheckerLifecycleImpl.kt +++ b/components/uniqueness/uniqueness-checker-impl/src/main/kotlin/net/corda/uniqueness/checker/impl/BatchedUniquenessCheckerLifecycleImpl.kt @@ -52,7 +52,7 @@ class BatchedUniquenessCheckerLifecycleImpl @Activate constructor( const val GROUP_NAME = "uniqueness.checker" const val CONFIG_HANDLE = "CONFIG_HANDLE" const val SUBSCRIPTION_NAME = "Uniqueness Check" - const val UNIQUENESS_CHECKER_PATH = "/uniqueness-checker" + const val PATH = "/uniqueness-checker" const val SUBSCRIPTION = "SUBSCRIPTION" const val RPC_SUBSCRIPTION = "RPC_SUBSCRIPTION" @@ -141,7 +141,7 @@ class BatchedUniquenessCheckerLifecycleImpl @Activate constructor( FlowEvent::class.java ) lifecycleCoordinator.createManagedResource(RPC_SUBSCRIPTION) { - val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, UNIQUENESS_CHECKER_PATH) + val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PATH) subscriptionFactory.createHttpRPCSubscription(rpcConfig, processor).also { it.start() }