Skip to content

Commit

Permalink
CORE-16234: For persistence api add endpoint and change naming of exi…
Browse files Browse the repository at this point in the history
…sting endpoint (#4774)

For the RPC Epic the Persistence api will require two endpoints. One endpoint is already merged which relates to ledger-related persistence. This PR will be making a suggested change to the naming of that endpoint for clarity, and will also add the second endpoint required.

When this work is merged we will have the endpoints:
/ledger - for ledger persistence
/persistence - for entity persistence
  • Loading branch information
emilybowe authored Oct 13, 2023
1 parent f1be93e commit 040c4be
Show file tree
Hide file tree
Showing 20 changed files with 406 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class LedgerPersistenceRequestSubscriptionFactoryImpl @Activate constructor(
) : LedgerPersistenceRequestSubscriptionFactory {
companion object {
internal const val GROUP_NAME = "persistence.ledger.processor"
const val SUBSCRIPTION_NAME = "Persistence"
const val PERSISTENCE_PATH = "/persistence"
const val SUBSCRIPTION_NAME = "Ledger"
const val PATH = "/ledger"
}

override fun create(config: SmartConfig): Subscription<String, LedgerPersistenceRequest> {
Expand Down Expand Up @@ -67,7 +67,7 @@ class LedgerPersistenceRequestSubscriptionFactoryImpl @Activate constructor(
LedgerPersistenceRequest::class.java,
FlowEvent::class.java
)
val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PERSISTENCE_PATH)
val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PATH)
return subscriptionFactory.createHttpRPCSubscription(rpcConfig, processor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class VerificationSubscriptionFactoryImpl @Activate constructor(
companion object {
internal const val GROUP_NAME = "verification.ledger.processor"
const val SUBSCRIPTION_NAME = "Verification"
const val VERIFICATION_PATH = "/verification"
const val PATH = "/verification"
}

override fun create(config: SmartConfig): Subscription<String, TransactionVerificationRequest> {
Expand Down Expand Up @@ -61,7 +61,7 @@ class VerificationSubscriptionFactoryImpl @Activate constructor(
TransactionVerificationRequest::class.java,
FlowEvent::class.java
)
val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, VERIFICATION_PATH)
val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PATH)
return subscriptionFactory.createHttpRPCSubscription(rpcConfig, processor)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
implementation project(":libs:serialization:serialization-avro")

implementation 'net.corda:corda-application'
implementation 'net.corda:corda-avro-schema'
implementation 'net.corda:corda-config-schema'
implementation 'net.corda:corda-db-schema'
implementation 'net.corda:corda-topic-schema'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import net.corda.db.persistence.testkit.helpers.Resources
import net.corda.db.persistence.testkit.helpers.SandboxHelper.createDog
import net.corda.db.persistence.testkit.helpers.SandboxHelper.getDogClass
import net.corda.db.schema.DbSchema
import net.corda.entityprocessor.impl.internal.EntityMessageProcessor
import net.corda.entityprocessor.impl.internal.EntityRequestProcessor
import net.corda.entityprocessor.impl.tests.helpers.assertEventResponseWithError
import net.corda.entityprocessor.impl.tests.helpers.assertEventResponseWithoutError
import net.corda.flow.external.events.responses.exceptions.CpkNotAvailableException
import net.corda.flow.external.events.responses.exceptions.VirtualNodeException
import net.corda.flow.utils.toKeyValuePairList
Expand All @@ -49,8 +51,6 @@ import net.corda.virtualnode.toAvro
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
Expand Down Expand Up @@ -105,7 +105,7 @@ class PersistenceExceptionTests {
private lateinit var dbConnectionManager: FakeDbConnectionManager

private lateinit var entitySandboxService: EntitySandboxService
private lateinit var processor: EntityMessageProcessor
private lateinit var processor: EntityRequestProcessor

private lateinit var virtualNodeInfo: VirtualNodeInfo
private lateinit var cpkFileHashes: Set<SecureHash>
Expand Down Expand Up @@ -157,7 +157,7 @@ class PersistenceExceptionTests {
virtualNodeInfoReadService,
dbConnectionManager
)
processor = EntityMessageProcessor(
processor = EntityRequestProcessor(
currentSandboxGroupContext,
entitySandboxService,
responseFactory,
Expand Down Expand Up @@ -217,7 +217,7 @@ class PersistenceExceptionTests {
dbConnectionManager
)

val processor = EntityMessageProcessor(
val processor = EntityRequestProcessor(
currentSandboxGroupContext,
brokenEntitySandboxService,
responseFactory,
Expand Down Expand Up @@ -286,6 +286,8 @@ class PersistenceExceptionTests {
createDogDb(DOGS_TABLE_WITHOUT_PK)
val persistEntitiesRequest = createDogPersistRequest()

val initialDogDbCount = getDogDbCount(virtualNodeInfo.vaultDmlConnectionId)

val record1 = processor.onNext(listOf(Record(TOPIC, UUID.randomUUID().toString(), persistEntitiesRequest)))
assertEventResponseWithoutError(record1.single())
// duplicate request
Expand All @@ -294,7 +296,7 @@ class PersistenceExceptionTests {

val dogDbCount = getDogDbCount(virtualNodeInfo.vaultDmlConnectionId)
// There shouldn't be a dog duplicate entry in the DB, i.e. dogs count in the DB should still be 1
assertEquals(1, dogDbCount)
assertEquals(initialDogDbCount + 1, dogDbCount)
}

@Test
Expand Down Expand Up @@ -384,10 +386,3 @@ class PersistenceExceptionTests {
}
}

private fun assertEventResponseWithoutError(record: Record<*, *>) {
assertNull(((record.value as FlowEvent).payload as ExternalEventResponse).error)
}

private fun assertEventResponseWithError(record: Record<*, *>) {
assertNotNull(((record.value as FlowEvent).payload as ExternalEventResponse).error)
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package net.corda.entityprocessor.impl.tests

import net.corda.cpiinfo.read.CpiInfoReadService
import net.corda.cpk.read.CpkReadService
import net.corda.avro.serialization.CordaAvroDeserializer
import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.cpiinfo.read.CpiInfoReadService
import net.corda.cpk.read.CpkReadService
import net.corda.data.KeyValuePairList
import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.external.ExternalEventContext
import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.flow.event.external.ExternalEventResponseErrorType
import net.corda.v5.application.flows.FlowContextPropertyKeys.CPK_FILE_CHECKSUM
import net.corda.data.persistence.DeleteEntities
import net.corda.data.persistence.DeleteEntitiesById
import net.corda.data.persistence.EntityRequest
Expand All @@ -35,7 +34,7 @@ import net.corda.db.persistence.testkit.helpers.SandboxHelper.getCatClass
import net.corda.db.persistence.testkit.helpers.SandboxHelper.getDogClass
import net.corda.db.persistence.testkit.helpers.SandboxHelper.getOwnerClass
import net.corda.db.schema.DbSchema
import net.corda.entityprocessor.impl.internal.EntityMessageProcessor
import net.corda.entityprocessor.impl.internal.EntityRequestProcessor
import net.corda.entityprocessor.impl.internal.PersistenceServiceInternal
import net.corda.entityprocessor.impl.internal.getClass
import net.corda.entityprocessor.impl.tests.helpers.AnimalCreator.createCats
Expand All @@ -56,6 +55,7 @@ import net.corda.test.util.dsl.entities.cpx.getCpkFileHashes
import net.corda.testing.sandboxes.SandboxSetup
import net.corda.testing.sandboxes.fetchService
import net.corda.testing.sandboxes.lifecycle.EachTestLifecycle
import net.corda.v5.application.flows.FlowContextPropertyKeys.CPK_FILE_CHECKSUM
import net.corda.virtualnode.VirtualNodeInfo
import net.corda.virtualnode.read.VirtualNodeInfoReadService
import net.corda.virtualnode.toAvro
Expand All @@ -81,11 +81,6 @@ import java.time.ZoneOffset
import java.util.UUID
import javax.persistence.EntityManagerFactory

sealed class QuerySetup {
data class NamedQuery(val params: Map<String, String>, val query: String = "Dog.summon") : QuerySetup()
data class All(val className: String) : QuerySetup()
}

/**
* To use Postgres rather than in-memory (HSQL):
*
Expand All @@ -96,6 +91,12 @@ sealed class QuerySetup {
* Rather than creating a new serializer in these tests from scratch,
* we grab a reference to the one in the sandbox and use that to serialize and de-serialize.
*/

sealed class QuerySetup {
data class NamedQuery(val params: Map<String, String>, val query: String = "Dog.summon") : QuerySetup()
data class All(val className: String) : QuerySetup()
}

@ExtendWith(ServiceExtension::class, BundleContextExtension::class, DBSetup::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class PersistenceServiceInternalTests {
Expand Down Expand Up @@ -285,7 +286,7 @@ class PersistenceServiceInternalTests {
}
)

val processor = EntityMessageProcessor(
val processor = EntityRequestProcessor(
currentSandboxGroupContext,
myEntitySandboxService,
responseFactory,
Expand Down Expand Up @@ -1015,8 +1016,8 @@ class PersistenceServiceInternalTests {
private fun SandboxGroupContext.deserialize(bytes: ByteBuffer) =
getSerializationService().deserialize(bytes.array(), Any::class.java)

private fun getMessageProcessor(payloadCheck: (bytes: ByteBuffer) -> ByteBuffer): EntityMessageProcessor {
return EntityMessageProcessor(
private fun getMessageProcessor(payloadCheck: (bytes: ByteBuffer) -> ByteBuffer): EntityRequestProcessor {
return EntityRequestProcessor(
currentSandboxGroupContext,
entitySandboxService,
responseFactory,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package net.corda.entityprocessor.impl.tests.helpers

import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.messaging.api.records.Record
import org.junit.jupiter.api.Assertions

fun assertEventResponseWithoutError(record: Record<*, *>) {
Assertions.assertNull(((record.value as FlowEvent).payload as ExternalEventResponse).error)
}

fun assertEventResponseWithError(record: Record<*, *>) {
Assertions.assertNotNull(((record.value as FlowEvent).payload as ExternalEventResponse).error)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package net.corda.entityprocessor.impl

import net.corda.entityprocessor.EntityProcessor
import net.corda.entityprocessor.EntityProcessorFactory
import net.corda.entityprocessor.impl.internal.EntityMessageProcessor
import net.corda.data.flow.event.FlowEvent
import net.corda.data.persistence.EntityRequest
import net.corda.entityprocessor.EntityRequestSubscriptionFactory
import net.corda.entityprocessor.impl.internal.EntityRequestProcessor
import net.corda.entityprocessor.impl.internal.EntityRpcRequestProcessor
import net.corda.libs.configuration.SmartConfig
import net.corda.messaging.api.subscription.RPCSubscription
import net.corda.messaging.api.subscription.Subscription
import net.corda.messaging.api.subscription.config.SubscriptionConfig
import net.corda.messaging.api.subscription.config.SyncRPCConfig
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
import net.corda.persistence.common.EntitySandboxService
import net.corda.persistence.common.PayloadChecker
Expand All @@ -16,8 +21,8 @@ import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference

@Suppress("UNUSED")
@Component(service = [EntityProcessorFactory::class])
class EntityProcessorFactoryImpl @Activate constructor(
@Component(service = [EntityRequestSubscriptionFactory::class])
class EntityRequestSubscriptionFactoryImpl @Activate constructor(
@Reference(service = CurrentSandboxGroupContext::class)
private val currentSandboxGroupContext: CurrentSandboxGroupContext,
@Reference(service = SubscriptionFactory::class)
Expand All @@ -26,28 +31,40 @@ class EntityProcessorFactoryImpl @Activate constructor(
private val entitySandboxService: EntitySandboxService,
@Reference(service = ResponseFactory::class)
private val responseFactory: ResponseFactory
) : EntityProcessorFactory {
) : EntityRequestSubscriptionFactory {
companion object {
internal const val GROUP_NAME = "persistence.entity.processor"
const val SUBSCRIPTION_NAME = "Persistence"
const val PATH = "/persistence"
}

override fun create(config: SmartConfig): EntityProcessor {
override fun create(config: SmartConfig): Subscription<String, EntityRequest> {
val subscriptionConfig = SubscriptionConfig(GROUP_NAME, Schemas.Persistence.PERSISTENCE_ENTITY_PROCESSOR_TOPIC)

val processor = EntityMessageProcessor(
val processor = EntityRequestProcessor(
currentSandboxGroupContext,
entitySandboxService,
responseFactory,
PayloadChecker(config)::checkSize
)

val subscription = subscriptionFactory.createDurableSubscription(
return subscriptionFactory.createDurableSubscription(
subscriptionConfig,
processor,
config,
null
)
}

return EntityProcessorImpl(subscription)
override fun createRpcSubscription(): RPCSubscription<EntityRequest, FlowEvent> {
val processor = EntityRpcRequestProcessor(
currentSandboxGroupContext,
entitySandboxService,
responseFactory,
EntityRequest::class.java,
FlowEvent::class.java
)
val rpcConfig = SyncRPCConfig(SUBSCRIPTION_NAME, PATH)
return subscriptionFactory.createHttpRPCSubscription(rpcConfig, processor)
}
}
Loading

0 comments on commit 040c4be

Please sign in to comment.