From 2f72941d0343cce48ceafab06ff35242c20ef345 Mon Sep 17 00:00:00 2001 From: Jake Schwartz <49877044+hyperschwartz@users.noreply.github.com> Date: Wed, 24 May 2023 13:37:57 -0700 Subject: [PATCH] Bugfix: Async stream production isn't viable with standard sync server implementation (#51) * Forgot to terminate the object grant instance * Convert to the flow producer server because the other nonsense wasn't working --- .../gateway/configuration/AppProperties.kt | 1 - .../gateway/configuration/BeanQualifiers.kt | 1 - .../gateway/configuration/CoroutineConfig.kt | 6 -- .../server/ObjectStoreGatewayServer.kt | 28 ------ .../server/ObjectStoreGatewayStreamServer.kt | 28 ++++++ .../gateway/service/ObjectService.kt | 71 +++++++-------- .../application-container.properties | 1 - .../application-development.properties | 1 - .../server/ObjectStoreGatewayServerTest.kt | 86 ++++++++++--------- .../gateway/service/ObjectServiceTest.kt | 7 +- .../src/test/resources/application.properties | 1 - 11 files changed, 109 insertions(+), 122 deletions(-) create mode 100644 server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayStreamServer.kt diff --git a/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/AppProperties.kt b/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/AppProperties.kt index 26db08d..f46da39 100644 --- a/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/AppProperties.kt +++ b/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/AppProperties.kt @@ -12,7 +12,6 @@ import java.util.UUID @Validated data class BatchProperties( val maxProvidedRecords: Int, - val threadCount: Int, ) @ConstructorBinding diff --git a/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/BeanQualifiers.kt b/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/BeanQualifiers.kt index 66ff7cc..354db31 100644 --- a/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/BeanQualifiers.kt +++ b/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/BeanQualifiers.kt @@ -1,7 +1,6 @@ package tech.figure.objectstore.gateway.configuration object BeanQualifiers { - const val BATCH_PROCESS_COROUTINE_SCOPE_QUALIFIER = "batchProcessCoroutineScopeBean" const val EVENT_STREAM_COROUTINE_SCOPE_QUALIFIER = "eventStreamCoroutineScopeBean" const val OBJECTSTORE_ENCRYPTION_KEYS: String = "objectStoreEncryptionKeys" const val OBJECTSTORE_PRIVATE_KEYS: String = "objectStorePrivateKeys" diff --git a/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/CoroutineConfig.kt b/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/CoroutineConfig.kt index b481035..ab9dbc2 100644 --- a/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/CoroutineConfig.kt +++ b/server/src/main/kotlin/tech/figure/objectstore/gateway/configuration/CoroutineConfig.kt @@ -7,12 +7,6 @@ import tech.figure.objectstore.gateway.util.CoroutineUtil @Configuration class CoroutineConfig { - @Bean(BeanQualifiers.BATCH_PROCESS_COROUTINE_SCOPE_QUALIFIER) - fun batchProcessScope(batchProperties: BatchProperties): CoroutineScope = CoroutineUtil.newSingletonScope( - scopeName = CoroutineScopeNames.BATCH_PROCESS_SCOPE, - threadCount = batchProperties.threadCount, - ) - @Bean(BeanQualifiers.EVENT_STREAM_COROUTINE_SCOPE_QUALIFIER) fun eventStreamScope(eventStreamProperties: EventStreamProperties): CoroutineScope = CoroutineUtil.newSingletonScope( scopeName = CoroutineScopeNames.EVENT_STREAM_SCOPE, diff --git a/server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServer.kt b/server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServer.kt index 7d3f724..4e12996 100644 --- a/server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServer.kt +++ b/server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServer.kt @@ -11,8 +11,6 @@ import org.lognet.springboot.grpc.GRpcService import org.springframework.beans.factory.annotation.Qualifier import tech.figure.objectstore.gateway.GatewayGrpc import tech.figure.objectstore.gateway.GatewayOuterClass -import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest -import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest.GrantTargetCase import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionRequest import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionResponse import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsRequest @@ -22,7 +20,6 @@ import tech.figure.objectstore.gateway.GatewayOuterClass.RevokeScopePermissionRe import tech.figure.objectstore.gateway.address import tech.figure.objectstore.gateway.configuration.BeanQualifiers import tech.figure.objectstore.gateway.configuration.ProvenanceProperties -import tech.figure.objectstore.gateway.exception.InvalidInputException import tech.figure.objectstore.gateway.publicKey import tech.figure.objectstore.gateway.server.interceptor.JwtServerInterceptor import tech.figure.objectstore.gateway.service.GrantResponse @@ -116,31 +113,6 @@ class ObjectStoreGatewayServer( } } - override fun batchGrantObjectPermissions( - request: BatchGrantObjectPermissionsRequest, - responseObserver: StreamObserver, - ) { - val (granteeAddress, targetHashes) = when (request.grantTargetCase) { - GrantTargetCase.ALL_HASHES -> request.allHashes.granteeAddress to null - GrantTargetCase.SPECIFIED_HASHES -> request.specifiedHashes.let { it.granteeAddress to it.targetHashesList } - else -> throw InvalidInputException("A grant target must be supplied") - } - objectService.batchGrantAccess( - granteeAddress = granteeAddress, - granterAddress = address(), - targetHashes = targetHashes, - emitResponse = { hash, grantee -> - responseObserver.onNext( - GrantObjectPermissionsResponse.newBuilder().also { response -> - response.hash = hash - response.granteeAddress = grantee - }.build() - ) - }, - completeProcess = { responseObserver.onCompleted() }, - ) - } - override fun revokeObjectPermissions( request: GatewayOuterClass.RevokeObjectPermissionsRequest, responseObserver: StreamObserver diff --git a/server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayStreamServer.kt b/server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayStreamServer.kt new file mode 100644 index 0000000..8550c48 --- /dev/null +++ b/server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayStreamServer.kt @@ -0,0 +1,28 @@ +package tech.figure.objectstore.gateway.server + +import kotlinx.coroutines.flow.Flow +import org.lognet.springboot.grpc.GRpcService +import tech.figure.objectstore.gateway.GatewayGrpcKt +import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest +import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest.GrantTargetCase +import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsResponse +import tech.figure.objectstore.gateway.address +import tech.figure.objectstore.gateway.exception.InvalidInputException +import tech.figure.objectstore.gateway.server.interceptor.JwtServerInterceptor +import tech.figure.objectstore.gateway.service.ObjectService + +@GRpcService(interceptors = [JwtServerInterceptor::class]) +class ObjectStoreGatewayStreamServer(private val objectService: ObjectService) : GatewayGrpcKt.GatewayCoroutineImplBase() { + override fun batchGrantObjectPermissions(request: BatchGrantObjectPermissionsRequest): Flow { + val (granteeAddress, targetHashes) = when (request.grantTargetCase) { + GrantTargetCase.ALL_HASHES -> request.allHashes.granteeAddress to null + GrantTargetCase.SPECIFIED_HASHES -> request.specifiedHashes.let { it.granteeAddress to it.targetHashesList } + else -> throw InvalidInputException("A grant target must be supplied") + } + return objectService.batchGrantAccess( + granteeAddress = granteeAddress, + granterAddress = address(), + targetHashes = targetHashes, + ) + } +} diff --git a/server/src/main/kotlin/tech/figure/objectstore/gateway/service/ObjectService.kt b/server/src/main/kotlin/tech/figure/objectstore/gateway/service/ObjectService.kt index 03b0a27..047d8d3 100644 --- a/server/src/main/kotlin/tech/figure/objectstore/gateway/service/ObjectService.kt +++ b/server/src/main/kotlin/tech/figure/objectstore/gateway/service/ObjectService.kt @@ -8,13 +8,14 @@ import io.provenance.scope.objectstore.util.toHex import io.provenance.scope.util.NotFoundException import io.provenance.scope.util.base64String import io.provenance.scope.util.toByteString -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow import mu.KLogging import org.jetbrains.exposed.sql.transactions.transaction import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Component import tech.figure.objectstore.gateway.GatewayOuterClass +import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsResponse import tech.figure.objectstore.gateway.GatewayOuterClass.ObjectWithMeta import tech.figure.objectstore.gateway.configuration.BatchProperties import tech.figure.objectstore.gateway.configuration.BeanQualifiers @@ -33,7 +34,6 @@ class ObjectService( private val accountsRepository: DataStorageAccountsRepository, private val addressVerificationService: AddressVerificationService, private val batchProperties: BatchProperties, - @Qualifier(BeanQualifiers.BATCH_PROCESS_COROUTINE_SCOPE_QUALIFIER) private val batchProcessScope: CoroutineScope, private val objectStoreClient: CachedOsClient, @Qualifier(BeanQualifiers.OBJECTSTORE_ENCRYPTION_KEYS) private val encryptionKeys: Map, @Qualifier(BeanQualifiers.OBJECTSTORE_MASTER_KEY) private val masterKey: KeyRef, @@ -44,7 +44,7 @@ class ObjectService( private val masterAddress = masterKey.publicKey.getAddress(provenanceProperties.mainNet) - fun putObject(obj: GatewayOuterClass.ObjectWithMeta, requesterPublicKey: PublicKey, additionalAudienceKeys: List = listOf(), useRequesterKey: Boolean = false): String { + fun putObject(obj: ObjectWithMeta, requesterPublicKey: PublicKey, additionalAudienceKeys: List = listOf(), useRequesterKey: Boolean = false): String { val requesterAddress = requesterPublicKey.getAddress(provenanceProperties.mainNet) // Always allow the master key data storage rights if (requesterAddress != masterAddress && !encryptionKeys.keys.contains(requesterAddress) && !accountsRepository.isAddressEnabled(requesterAddress)) { @@ -110,7 +110,7 @@ class ObjectService( } } - fun getObject(hash: String, requesterAddress: String): GatewayOuterClass.ObjectWithMeta { + fun getObject(hash: String, requesterAddress: String): ObjectWithMeta { val objectPermission = objectPermissionsRepository.getAccessPermission(hash, requesterAddress) ?: throw AccessDeniedException("Object access not granted to $requesterAddress [hash: $hash]") @@ -179,11 +179,9 @@ class ObjectService( granteeAddress: String, granterAddress: String, targetHashes: Collection? = null, - emitResponse: (hash: String, grantee: String) -> Unit, - completeProcess: () -> Unit, - ) = transaction { + ): Flow = transaction { val cachedObjects = if (targetHashes != null) { - if (targetHashes.size == 0) { + if (targetHashes.isEmpty()) { throw InvalidInputException("Target hash count must be greater than zero") } if (targetHashes.size > batchProperties.maxProvidedRecords) { @@ -196,33 +194,36 @@ class ObjectService( } else { null } - val hashesToGrant = cachedObjects?.keys ?: objectPermissionsRepository.getAllGranterHashes(granterAddress = granterAddress) - batchProcessScope.launch { - hashesToGrant.map { hash -> - batchProcessScope.launch { - val objectToGrant = cachedObjects - ?.get(hash) - ?.firstOrNull() - ?: objectPermissionsRepository - .getAccessPermissionsForGranter(objectHash = hash, granterAddress = granterAddress) - .firstOrNull() - if (objectToGrant == null) { - logger.info { "Skipping object grant for hash [$hash]. It cannot be found for granter [$granterAddress]" } - } else { - logger.info { "ADDING object grant for hash [$hash] to [$granteeAddress] from [$granterAddress]" } - objectPermissionsRepository.addAccessPermission( - objectHash = hash, - granterAddress = granterAddress, - granteeAddress = granteeAddress, - storageKeyAddress = objectToGrant.storageKeyAddress, - objectSizeBytes = objectToGrant.objectSizeBytes, - isObjectWithMeta = objectToGrant.isObjectWithMeta, - ) - emitResponse(hash, granteeAddress) - } + val hashesToGrant = + cachedObjects?.keys ?: objectPermissionsRepository.getAllGranterHashes(granterAddress = granterAddress) + flow { + hashesToGrant.forEach { hash -> + val objectToGrant = cachedObjects + ?.get(hash) + ?.firstOrNull() + ?: objectPermissionsRepository + .getAccessPermissionsForGranter(objectHash = hash, granterAddress = granterAddress) + .firstOrNull() + if (objectToGrant == null) { + logger.info { "Skipping object grant for hash [$hash]. It cannot be found for granter [$granterAddress]" } + } else { + logger.info { "ADDING object grant for hash [$hash] to [$granteeAddress] from [$granterAddress]" } + objectPermissionsRepository.addAccessPermission( + objectHash = hash, + granterAddress = granterAddress, + granteeAddress = granteeAddress, + storageKeyAddress = objectToGrant.storageKeyAddress, + objectSizeBytes = objectToGrant.objectSizeBytes, + isObjectWithMeta = objectToGrant.isObjectWithMeta, + ) + emit( + GrantObjectPermissionsResponse.newBuilder().also { response -> + response.hash = hash + response.granteeAddress = granteeAddress + }.build() + ) } - }.forEach { it.join() } - completeProcess() + } } } diff --git a/server/src/main/resources/application-container.properties b/server/src/main/resources/application-container.properties index 8be67b2..145a568 100644 --- a/server/src/main/resources/application-container.properties +++ b/server/src/main/resources/application-container.properties @@ -1,6 +1,5 @@ # Batch Process Configuration batch.max_provided_records=${BATCH_MAX_PROVIDED_RECORDS:500} -batch.thread_count=${BATCH_THREAD_COUNT:20} # Event Stream Configuration event.stream.websocket_uri=${EVENT_STREAM_WEBSOCKET_URI} diff --git a/server/src/main/resources/application-development.properties b/server/src/main/resources/application-development.properties index 4c5f5e1..ce06428 100644 --- a/server/src/main/resources/application-development.properties +++ b/server/src/main/resources/application-development.properties @@ -1,6 +1,5 @@ # Batch Process Configuration batch.max_provided_records=500 -batch.thread_count=20 # Event Stream Configuration event.stream.websocket_uri=ws://localhost:26657 diff --git a/server/src/test/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServerTest.kt b/server/src/test/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServerTest.kt index 27e4c42..c18c26c 100644 --- a/server/src/test/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServerTest.kt +++ b/server/src/test/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServerTest.kt @@ -24,8 +24,8 @@ import io.provenance.scope.util.sha256String import io.provenance.scope.util.toByteString import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.test.TestCoroutineScope import org.jetbrains.exposed.sql.deleteAll import org.jetbrains.exposed.sql.selectAll import org.jetbrains.exposed.sql.transactions.transaction @@ -95,6 +95,7 @@ class ObjectStoreGatewayServerTest { val masterAccount: Account = genRandomAccount() lateinit var server: ObjectStoreGatewayServer + lateinit var streamServer: ObjectStoreGatewayStreamServer val defaultGranter: String = genRandomAccount().bech32Address val scopeAddress = MetadataAddress.forScope(UUID.randomUUID()).toString() @@ -137,6 +138,7 @@ class ObjectStoreGatewayServerTest { objectService = objectService, provenanceProperties = provenanceProperties, ) + streamServer = ObjectStoreGatewayStreamServer(objectService = objectService) } fun setUpScopePermissionValues() { @@ -360,20 +362,21 @@ class ObjectStoreGatewayServerTest { @Test fun `batchGrantObjectPermissions should successfully grant all permissions when requested`() { setUpBaseServicesAndObjectService() - val hashes = (0..10).mapTo(HashSet()) { putTestObject() } - val responseObserver = mockkObserver() + val hashes = (0 until 10).mapTo(HashSet()) { putTestObject() } val interceptedHashes = mutableSetOf() - every { responseObserver.onNext(any()) } answers { answer -> - interceptedHashes += (answer.invocation.args.first() as GrantObjectPermissionsResponse).hash - } val grantee = genRandomAccount() - server.batchGrantObjectPermissions( - request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> - request.allHashesBuilder.granteeAddress = grantee.bech32Address - }.build(), - responseObserver = responseObserver, + runBlocking { + streamServer.batchGrantObjectPermissions( + request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> + request.allHashesBuilder.granteeAddress = grantee.bech32Address + }.build(), + ).collect { interceptedHashes += it.hash } + } + assertEquals( + expected = interceptedHashes.size, + actual = hashes.size, + message = "Expected the amount of hashes encountered to exist in the responses. Got $interceptedHashes, expected $hashes", ) - waitFor(timeoutMessage = "Failed to intercept all expected hashes") { interceptedHashes.size == hashes.size } assertTrue( actual = interceptedHashes.all { it in hashes }, message = "Expected all stored hashes $hashes to be intercepted, but got $interceptedHashes", @@ -383,28 +386,28 @@ class ObjectStoreGatewayServerTest { val permission = repository.getAccessPermission(objectHash = interceptedHash, granteeAddress = grantee.bech32Address) assertNotNull(actual = permission, message = "Expected the grant to exist in the database") } - verify { responseObserver.onCompleted() } } @Test fun `batchGrantObjectPermissions should successfully grant target permissions when requested`() { setUpBaseServicesAndObjectService() - val hashes = (0..10).mapTo(HashSet()) { putTestObject() } - val responseObserver = mockkObserver() + val hashes = (0 until 10).mapTo(HashSet()) { putTestObject() } val interceptedHashes = mutableSetOf() - every { responseObserver.onNext(any()) } answers { answer -> - interceptedHashes += (answer.invocation.args.first() as GrantObjectPermissionsResponse).hash - } val grantee = genRandomAccount() val targetGrantHashes = hashes.take(4) - server.batchGrantObjectPermissions( - request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> - request.specifiedHashesBuilder.addAllTargetHashes(targetGrantHashes) - request.specifiedHashesBuilder.granteeAddress = grantee.bech32Address - }.build(), - responseObserver = responseObserver, + runBlocking { + streamServer.batchGrantObjectPermissions( + request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> + request.specifiedHashesBuilder.addAllTargetHashes(targetGrantHashes) + request.specifiedHashesBuilder.granteeAddress = grantee.bech32Address + }.build(), + ).collect { interceptedHashes += it.hash } + } + assertEquals( + expected = interceptedHashes.size, + actual = targetGrantHashes.size, + message = "Expected the amount of hashes encountered to exist in the responses. Got $interceptedHashes, expected $targetGrantHashes", ) - waitFor(timeoutMessage = "Failed to intercept all expected hashes") { interceptedHashes.size == targetGrantHashes.size } assertTrue( actual = interceptedHashes.all { it in targetGrantHashes }, message = "Expected all target hashes $targetGrantHashes to be intercepted, but got $interceptedHashes", @@ -418,7 +421,6 @@ class ObjectStoreGatewayServerTest { assertNull(actual = permission, message = "Expected an untargeted grant hash to be omitted from grants") } } - verify { responseObserver.onCompleted() } } @Test @@ -426,12 +428,13 @@ class ObjectStoreGatewayServerTest { setUpBaseServicesAndObjectService() val responseObserver = mockkObserver() assertFailsWith { - server.batchGrantObjectPermissions( - request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> - request.specifiedHashesBuilder.granteeAddress = genRandomAccount().bech32Address - }.build(), - responseObserver = responseObserver, - ) + runBlocking { + streamServer.batchGrantObjectPermissions( + request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> + request.specifiedHashesBuilder.granteeAddress = genRandomAccount().bech32Address + }.build(), + ) + } }.also { exception -> assertEquals( expected = "INVALID_ARGUMENT: Target hash count must be greater than zero", @@ -445,15 +448,15 @@ class ObjectStoreGatewayServerTest { fun `batchGrantScopePermission should fail when specified hash count greater than max allowed records`() { setUpBaseServicesAndObjectService() val hashes = (0..MAX_PROVIDED_BATCH_RECORDS + 1).mapTo(HashSet()) { putTestObject() } - val responseObserver = mockkObserver() assertFailsWith { - server.batchGrantObjectPermissions( - request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> - request.specifiedHashesBuilder.addAllTargetHashes(hashes) - request.specifiedHashesBuilder.granteeAddress = genRandomAccount().bech32Address - }.build(), - responseObserver = responseObserver, - ) + runBlocking { + streamServer.batchGrantObjectPermissions( + request = BatchGrantObjectPermissionsRequest.newBuilder().also { request -> + request.specifiedHashesBuilder.addAllTargetHashes(hashes) + request.specifiedHashesBuilder.granteeAddress = genRandomAccount().bech32Address + }.build(), + ) + } }.also { exception -> assertEquals( expected = "INVALID_ARGUMENT: Target hash count must be less than maximum value of [$MAX_PROVIDED_BATCH_RECORDS]", @@ -502,8 +505,7 @@ class ObjectStoreGatewayServerTest { ObjectService( accountsRepository = mockk(), addressVerificationService = addressVerificationService, - batchProperties = BatchProperties(maxProvidedRecords = MAX_PROVIDED_BATCH_RECORDS, threadCount = 10), - batchProcessScope = TestCoroutineScope(), + batchProperties = BatchProperties(maxProvidedRecords = MAX_PROVIDED_BATCH_RECORDS), objectStoreClient = objectStoreClient, encryptionKeys = mapOf(objectOwner.public.getAddress(false) to DirectKeyRef(keyPair)), masterKey = masterAccount.keyRef, diff --git a/server/src/test/kotlin/tech/figure/objectstore/gateway/service/ObjectServiceTest.kt b/server/src/test/kotlin/tech/figure/objectstore/gateway/service/ObjectServiceTest.kt index 4b012d1..94a6a3c 100644 --- a/server/src/test/kotlin/tech/figure/objectstore/gateway/service/ObjectServiceTest.kt +++ b/server/src/test/kotlin/tech/figure/objectstore/gateway/service/ObjectServiceTest.kt @@ -19,9 +19,7 @@ import io.provenance.scope.objectstore.util.base64Decode import io.provenance.scope.util.sha256 import io.provenance.scope.util.sha256String import io.provenance.scope.util.toByteString -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows @@ -49,7 +47,6 @@ class ObjectServiceTest { lateinit var accountsRepository: DataStorageAccountsRepository lateinit var addressVerificationService: AddressVerificationService lateinit var batchProperties: BatchProperties - lateinit var batchScope: CoroutineScope lateinit var osClient: CachedOsClient lateinit var objectPermissionsRepository: ObjectPermissionsRepository lateinit var provenanceProperties: ProvenanceProperties @@ -71,11 +68,10 @@ class ObjectServiceTest { fun setUp() { accountsRepository = mockk() addressVerificationService = mockk() - batchScope = TestCoroutineScope() osClient = mockk() objectPermissionsRepository = mockk() - batchProperties = BatchProperties(maxProvidedRecords = 10, threadCount = 10) + batchProperties = BatchProperties(maxProvidedRecords = 10) provenanceProperties = ProvenanceProperties(false, "pio-fakenet-1", URI("")) objectService = ObjectService( @@ -87,7 +83,6 @@ class ObjectServiceTest { masterKey = masterKey, objectPermissionsRepository = objectPermissionsRepository, provenanceProperties = provenanceProperties, - batchProcessScope = batchScope, ) } diff --git a/server/src/test/resources/application.properties b/server/src/test/resources/application.properties index 9aed03e..0d3cafb 100644 --- a/server/src/test/resources/application.properties +++ b/server/src/test/resources/application.properties @@ -1,6 +1,5 @@ # Batch Configuration batch.max_provided_records=500 -batch.thread_count=10 # Event Stream Configuration event.stream.websocket_uri=ws://localhost:26657