Skip to content

Commit

Permalink
Bugfix: Async stream production isn't viable with standard sync serve…
Browse files Browse the repository at this point in the history
…r implementation (#51)

* Forgot to terminate the object grant instance

* Convert to the flow producer server because the other nonsense wasn't working
  • Loading branch information
hyperschwartz authored May 24, 2023
1 parent 8ab7498 commit 2f72941
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import java.util.UUID
@Validated
data class BatchProperties(
val maxProvidedRecords: Int,
val threadCount: Int,
)

@ConstructorBinding
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tech.figure.objectstore.gateway.configuration

object BeanQualifiers {
const val BATCH_PROCESS_COROUTINE_SCOPE_QUALIFIER = "batchProcessCoroutineScopeBean"
const val EVENT_STREAM_COROUTINE_SCOPE_QUALIFIER = "eventStreamCoroutineScopeBean"
const val OBJECTSTORE_ENCRYPTION_KEYS: String = "objectStoreEncryptionKeys"
const val OBJECTSTORE_PRIVATE_KEYS: String = "objectStorePrivateKeys"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ import tech.figure.objectstore.gateway.util.CoroutineUtil

@Configuration
class CoroutineConfig {
@Bean(BeanQualifiers.BATCH_PROCESS_COROUTINE_SCOPE_QUALIFIER)
fun batchProcessScope(batchProperties: BatchProperties): CoroutineScope = CoroutineUtil.newSingletonScope(
scopeName = CoroutineScopeNames.BATCH_PROCESS_SCOPE,
threadCount = batchProperties.threadCount,
)

@Bean(BeanQualifiers.EVENT_STREAM_COROUTINE_SCOPE_QUALIFIER)
fun eventStreamScope(eventStreamProperties: EventStreamProperties): CoroutineScope = CoroutineUtil.newSingletonScope(
scopeName = CoroutineScopeNames.EVENT_STREAM_SCOPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import org.lognet.springboot.grpc.GRpcService
import org.springframework.beans.factory.annotation.Qualifier
import tech.figure.objectstore.gateway.GatewayGrpc
import tech.figure.objectstore.gateway.GatewayOuterClass
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest.GrantTargetCase
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionRequest
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsRequest
Expand All @@ -22,7 +20,6 @@ import tech.figure.objectstore.gateway.GatewayOuterClass.RevokeScopePermissionRe
import tech.figure.objectstore.gateway.address
import tech.figure.objectstore.gateway.configuration.BeanQualifiers
import tech.figure.objectstore.gateway.configuration.ProvenanceProperties
import tech.figure.objectstore.gateway.exception.InvalidInputException
import tech.figure.objectstore.gateway.publicKey
import tech.figure.objectstore.gateway.server.interceptor.JwtServerInterceptor
import tech.figure.objectstore.gateway.service.GrantResponse
Expand Down Expand Up @@ -116,31 +113,6 @@ class ObjectStoreGatewayServer(
}
}

override fun batchGrantObjectPermissions(
request: BatchGrantObjectPermissionsRequest,
responseObserver: StreamObserver<GrantObjectPermissionsResponse>,
) {
val (granteeAddress, targetHashes) = when (request.grantTargetCase) {
GrantTargetCase.ALL_HASHES -> request.allHashes.granteeAddress to null
GrantTargetCase.SPECIFIED_HASHES -> request.specifiedHashes.let { it.granteeAddress to it.targetHashesList }
else -> throw InvalidInputException("A grant target must be supplied")
}
objectService.batchGrantAccess(
granteeAddress = granteeAddress,
granterAddress = address(),
targetHashes = targetHashes,
emitResponse = { hash, grantee ->
responseObserver.onNext(
GrantObjectPermissionsResponse.newBuilder().also { response ->
response.hash = hash
response.granteeAddress = grantee
}.build()
)
},
completeProcess = { responseObserver.onCompleted() },
)
}

override fun revokeObjectPermissions(
request: GatewayOuterClass.RevokeObjectPermissionsRequest,
responseObserver: StreamObserver<GatewayOuterClass.RevokeObjectPermissionsResponse>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package tech.figure.objectstore.gateway.server

import kotlinx.coroutines.flow.Flow
import org.lognet.springboot.grpc.GRpcService
import tech.figure.objectstore.gateway.GatewayGrpcKt
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest.GrantTargetCase
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsResponse
import tech.figure.objectstore.gateway.address
import tech.figure.objectstore.gateway.exception.InvalidInputException
import tech.figure.objectstore.gateway.server.interceptor.JwtServerInterceptor
import tech.figure.objectstore.gateway.service.ObjectService

@GRpcService(interceptors = [JwtServerInterceptor::class])
class ObjectStoreGatewayStreamServer(private val objectService: ObjectService) : GatewayGrpcKt.GatewayCoroutineImplBase() {
override fun batchGrantObjectPermissions(request: BatchGrantObjectPermissionsRequest): Flow<GrantObjectPermissionsResponse> {
val (granteeAddress, targetHashes) = when (request.grantTargetCase) {
GrantTargetCase.ALL_HASHES -> request.allHashes.granteeAddress to null
GrantTargetCase.SPECIFIED_HASHES -> request.specifiedHashes.let { it.granteeAddress to it.targetHashesList }
else -> throw InvalidInputException("A grant target must be supplied")
}
return objectService.batchGrantAccess(
granteeAddress = granteeAddress,
granterAddress = address(),
targetHashes = targetHashes,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import io.provenance.scope.objectstore.util.toHex
import io.provenance.scope.util.NotFoundException
import io.provenance.scope.util.base64String
import io.provenance.scope.util.toByteString
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import mu.KLogging
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Component
import tech.figure.objectstore.gateway.GatewayOuterClass
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.ObjectWithMeta
import tech.figure.objectstore.gateway.configuration.BatchProperties
import tech.figure.objectstore.gateway.configuration.BeanQualifiers
Expand All @@ -33,7 +34,6 @@ class ObjectService(
private val accountsRepository: DataStorageAccountsRepository,
private val addressVerificationService: AddressVerificationService,
private val batchProperties: BatchProperties,
@Qualifier(BeanQualifiers.BATCH_PROCESS_COROUTINE_SCOPE_QUALIFIER) private val batchProcessScope: CoroutineScope,
private val objectStoreClient: CachedOsClient,
@Qualifier(BeanQualifiers.OBJECTSTORE_ENCRYPTION_KEYS) private val encryptionKeys: Map<String, KeyRef>,
@Qualifier(BeanQualifiers.OBJECTSTORE_MASTER_KEY) private val masterKey: KeyRef,
Expand All @@ -44,7 +44,7 @@ class ObjectService(

private val masterAddress = masterKey.publicKey.getAddress(provenanceProperties.mainNet)

fun putObject(obj: GatewayOuterClass.ObjectWithMeta, requesterPublicKey: PublicKey, additionalAudienceKeys: List<PublicKey> = listOf(), useRequesterKey: Boolean = false): String {
fun putObject(obj: ObjectWithMeta, requesterPublicKey: PublicKey, additionalAudienceKeys: List<PublicKey> = listOf(), useRequesterKey: Boolean = false): String {
val requesterAddress = requesterPublicKey.getAddress(provenanceProperties.mainNet)
// Always allow the master key data storage rights
if (requesterAddress != masterAddress && !encryptionKeys.keys.contains(requesterAddress) && !accountsRepository.isAddressEnabled(requesterAddress)) {
Expand Down Expand Up @@ -110,7 +110,7 @@ class ObjectService(
}
}

fun getObject(hash: String, requesterAddress: String): GatewayOuterClass.ObjectWithMeta {
fun getObject(hash: String, requesterAddress: String): ObjectWithMeta {
val objectPermission = objectPermissionsRepository.getAccessPermission(hash, requesterAddress)
?: throw AccessDeniedException("Object access not granted to $requesterAddress [hash: $hash]")

Expand Down Expand Up @@ -179,11 +179,9 @@ class ObjectService(
granteeAddress: String,
granterAddress: String,
targetHashes: Collection<String>? = null,
emitResponse: (hash: String, grantee: String) -> Unit,
completeProcess: () -> Unit,
) = transaction {
): Flow<GrantObjectPermissionsResponse> = transaction {
val cachedObjects = if (targetHashes != null) {
if (targetHashes.size == 0) {
if (targetHashes.isEmpty()) {
throw InvalidInputException("Target hash count must be greater than zero")
}
if (targetHashes.size > batchProperties.maxProvidedRecords) {
Expand All @@ -196,33 +194,36 @@ class ObjectService(
} else {
null
}
val hashesToGrant = cachedObjects?.keys ?: objectPermissionsRepository.getAllGranterHashes(granterAddress = granterAddress)
batchProcessScope.launch {
hashesToGrant.map { hash ->
batchProcessScope.launch {
val objectToGrant = cachedObjects
?.get(hash)
?.firstOrNull()
?: objectPermissionsRepository
.getAccessPermissionsForGranter(objectHash = hash, granterAddress = granterAddress)
.firstOrNull()
if (objectToGrant == null) {
logger.info { "Skipping object grant for hash [$hash]. It cannot be found for granter [$granterAddress]" }
} else {
logger.info { "ADDING object grant for hash [$hash] to [$granteeAddress] from [$granterAddress]" }
objectPermissionsRepository.addAccessPermission(
objectHash = hash,
granterAddress = granterAddress,
granteeAddress = granteeAddress,
storageKeyAddress = objectToGrant.storageKeyAddress,
objectSizeBytes = objectToGrant.objectSizeBytes,
isObjectWithMeta = objectToGrant.isObjectWithMeta,
)
emitResponse(hash, granteeAddress)
}
val hashesToGrant =
cachedObjects?.keys ?: objectPermissionsRepository.getAllGranterHashes(granterAddress = granterAddress)
flow {
hashesToGrant.forEach { hash ->
val objectToGrant = cachedObjects
?.get(hash)
?.firstOrNull()
?: objectPermissionsRepository
.getAccessPermissionsForGranter(objectHash = hash, granterAddress = granterAddress)
.firstOrNull()
if (objectToGrant == null) {
logger.info { "Skipping object grant for hash [$hash]. It cannot be found for granter [$granterAddress]" }
} else {
logger.info { "ADDING object grant for hash [$hash] to [$granteeAddress] from [$granterAddress]" }
objectPermissionsRepository.addAccessPermission(
objectHash = hash,
granterAddress = granterAddress,
granteeAddress = granteeAddress,
storageKeyAddress = objectToGrant.storageKeyAddress,
objectSizeBytes = objectToGrant.objectSizeBytes,
isObjectWithMeta = objectToGrant.isObjectWithMeta,
)
emit(
GrantObjectPermissionsResponse.newBuilder().also { response ->
response.hash = hash
response.granteeAddress = granteeAddress
}.build()
)
}
}.forEach { it.join() }
completeProcess()
}
}
}

Expand Down
1 change: 0 additions & 1 deletion server/src/main/resources/application-container.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Batch Process Configuration
batch.max_provided_records=${BATCH_MAX_PROVIDED_RECORDS:500}
batch.thread_count=${BATCH_THREAD_COUNT:20}

# Event Stream Configuration
event.stream.websocket_uri=${EVENT_STREAM_WEBSOCKET_URI}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Batch Process Configuration
batch.max_provided_records=500
batch.thread_count=20

# Event Stream Configuration
event.stream.websocket_uri=ws://localhost:26657
Expand Down
Loading

0 comments on commit 2f72941

Please sign in to comment.