Skip to content

Commit

Permalink
Bugfix / Tweak: Coroutine server implementation (#52)
Browse files Browse the repository at this point in the history
* Forgot to terminate the object grant instance

* Try the new coroutine server strategy

* Self-review cleanup
  • Loading branch information
hyperschwartz authored May 24, 2023
1 parent 2f72941 commit f35eff9
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 429 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,33 @@ package tech.figure.objectstore.gateway.server

import io.grpc.Status
import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver
import io.provenance.scope.encryption.model.KeyRef
import io.provenance.scope.encryption.util.getAddress
import io.provenance.scope.encryption.util.toPublicKey
import kotlinx.coroutines.flow.Flow
import mu.KLogging
import org.lognet.springboot.grpc.GRpcService
import org.springframework.beans.factory.annotation.Qualifier
import tech.figure.objectstore.gateway.GatewayGrpc
import tech.figure.objectstore.gateway.GatewayGrpcKt
import tech.figure.objectstore.gateway.GatewayOuterClass
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest.GrantTargetCase
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionRequest
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.FetchObjectByHashResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.FetchObjectRequest
import tech.figure.objectstore.gateway.GatewayOuterClass.FetchObjectResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsRequest
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantScopePermissionResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.PutObjectResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.RegisterExistingObjectResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.RevokeObjectPermissionsResponse
import tech.figure.objectstore.gateway.GatewayOuterClass.RevokeScopePermissionResponse
import tech.figure.objectstore.gateway.address
import tech.figure.objectstore.gateway.configuration.BeanQualifiers
import tech.figure.objectstore.gateway.configuration.ProvenanceProperties
import tech.figure.objectstore.gateway.exception.InvalidInputException
import tech.figure.objectstore.gateway.publicKey
import tech.figure.objectstore.gateway.server.interceptor.JwtServerInterceptor
import tech.figure.objectstore.gateway.service.GrantResponse
Expand All @@ -35,139 +44,101 @@ class ObjectStoreGatewayServer(
private val scopePermissionsService: ScopePermissionsService,
private val objectService: ObjectService,
private val provenanceProperties: ProvenanceProperties,
) : GatewayGrpc.GatewayImplBase() {
) : GatewayGrpcKt.GatewayCoroutineImplBase() {

companion object : KLogging() {
const val DEFAULT_UNKNOWN_DESCRIPTION: String = "An unexpected error occurred. Please try again later"
}

override fun fetchObject(
request: GatewayOuterClass.FetchObjectRequest,
responseObserver: StreamObserver<GatewayOuterClass.FetchObjectResponse>
) {
scopeFetchService.fetchScopeForGrantee(request.scopeAddress, publicKey(), request.granterAddress.takeIf { it.isNotBlank() }).let {
responseObserver.onNext(
GatewayOuterClass.FetchObjectResponse.newBuilder()
.setScopeId(request.scopeAddress)
.addAllRecords(it)
.build()
)
override suspend fun fetchObject(request: FetchObjectRequest): FetchObjectResponse = scopeFetchService
.fetchScopeForGrantee(request.scopeAddress, publicKey(), request.granterAddress.takeIf { it.isNotBlank() })
.let {
FetchObjectResponse.newBuilder()
.setScopeId(request.scopeAddress)
.addAllRecords(it)
.build()
}
responseObserver.onCompleted()
}

override fun putObject(
override suspend fun putObject(
request: GatewayOuterClass.PutObjectRequest,
responseObserver: StreamObserver<GatewayOuterClass.PutObjectResponse>
) {
objectService.putObject(request.`object`, publicKey(), request.additionalAudienceKeysList.map { it.toPublicKey() }, useRequesterKey = request.useRequesterKey).let {
responseObserver.onNext(
GatewayOuterClass.PutObjectResponse.newBuilder()
.setHash(it)
.build()
)
}
responseObserver.onCompleted()
}
): PutObjectResponse = objectService
.putObject(request.`object`, publicKey(), request.additionalAudienceKeysList.map { it.toPublicKey() }, useRequesterKey = request.useRequesterKey)
.let { PutObjectResponse.newBuilder().setHash(it).build() }

override fun registerExistingObject(
override suspend fun registerExistingObject(
request: GatewayOuterClass.RegisterExistingObjectRequest,
responseObserver: StreamObserver<GatewayOuterClass.RegisterExistingObjectResponse>
) {
objectService.registerExistingObject(request.hash, publicKey(), request.granteeAddressList).let {
responseObserver.onNext(
GatewayOuterClass.RegisterExistingObjectResponse.newBuilder()
.setRequest(request)
.build()
)
}
responseObserver.onCompleted()
}
): RegisterExistingObjectResponse = objectService
.registerExistingObject(request.hash, publicKey(), request.granteeAddressList)
.let { RegisterExistingObjectResponse.newBuilder().setRequest(request).build() }

override fun fetchObjectByHash(
override suspend fun fetchObjectByHash(
request: GatewayOuterClass.FetchObjectByHashRequest,
responseObserver: StreamObserver<GatewayOuterClass.FetchObjectByHashResponse>
) {
objectService.getObject(request.hash, address()).let { obj ->
responseObserver.onNext(
GatewayOuterClass.FetchObjectByHashResponse.newBuilder()
.setObject(obj)
.build()
)
responseObserver.onCompleted()
}
}
): FetchObjectByHashResponse = objectService.getObject(request.hash, address())
.let { obj -> FetchObjectByHashResponse.newBuilder().setObject(obj).build() }

override fun grantObjectPermissions(
override suspend fun grantObjectPermissions(
request: GrantObjectPermissionsRequest,
responseObserver: StreamObserver<GrantObjectPermissionsResponse>,
) {
objectService.grantAccess(request.hash, request.granteeAddress, address()).let {
responseObserver.onNext(
GrantObjectPermissionsResponse.newBuilder()
.setHash(request.hash)
.setGranteeAddress(request.granteeAddress)
.build()
)
responseObserver.onCompleted()
): GrantObjectPermissionsResponse = objectService
.grantAccess(request.hash, request.granteeAddress, address())
.let {
GrantObjectPermissionsResponse.newBuilder()
.setHash(request.hash)
.setGranteeAddress(request.granteeAddress)
.build()
}
}

override fun revokeObjectPermissions(
request: GatewayOuterClass.RevokeObjectPermissionsRequest,
responseObserver: StreamObserver<GatewayOuterClass.RevokeObjectPermissionsResponse>
) {
objectService.revokeAccess(request.hash, address(), request.granteeAddressList).let {
responseObserver.onNext(
GatewayOuterClass.RevokeObjectPermissionsResponse.newBuilder()
.setRequest(request)
.build()
)
override fun batchGrantObjectPermissions(request: BatchGrantObjectPermissionsRequest): Flow<GrantObjectPermissionsResponse> {
val (granteeAddress, targetHashes) = when (request.grantTargetCase) {
GrantTargetCase.ALL_HASHES -> request.allHashes.granteeAddress to null
GrantTargetCase.SPECIFIED_HASHES -> request.specifiedHashes.let { it.granteeAddress to it.targetHashesList }
else -> throw InvalidInputException("A grant target must be supplied")
}
responseObserver.onCompleted()
return objectService.batchGrantAccess(
granteeAddress = granteeAddress,
granterAddress = address(),
targetHashes = targetHashes,
)
}

override fun grantScopePermission(
override suspend fun revokeObjectPermissions(
request: GatewayOuterClass.RevokeObjectPermissionsRequest,
): RevokeObjectPermissionsResponse = objectService
.revokeAccess(request.hash, address(), request.granteeAddressList)
.let { RevokeObjectPermissionsResponse.newBuilder().setRequest(request).build() }

override suspend fun grantScopePermission(
request: GatewayOuterClass.GrantScopePermissionRequest,
responseObserver: StreamObserver<GrantScopePermissionResponse>,
) {
): GrantScopePermissionResponse {
val (grantResponse, sourceDetails) = processScopeGrant(
requesterAddress = address(),
scopeAddress = request.scopeAddress,
granteeAddress = request.granteeAddress,
grantId = request.grantId,
requestType = "Manual Grant",
)
val respond: (accepted: Boolean, granterAddress: String?) -> Unit = { accepted, granterAddress ->
responseObserver.onNext(
GrantScopePermissionResponse.newBuilder().also { rpcResp ->
rpcResp.request = request
rpcResp.grantAccepted = accepted
granterAddress?.also { rpcResp.granterAddress = it }
}.build()
)
responseObserver.onCompleted()
val getResponse: (accepted: Boolean, granterAddress: String?) -> GrantScopePermissionResponse = { accepted, granterAddress ->
GrantScopePermissionResponse.newBuilder().also { rpcResp ->
rpcResp.request = request
rpcResp.grantAccepted = accepted
granterAddress?.also { rpcResp.granterAddress = it }
}.build()
}
when (grantResponse) {
is GrantResponse.Accepted -> respond(true, grantResponse.granterAddress)
return when (grantResponse) {
is GrantResponse.Accepted -> getResponse(true, grantResponse.granterAddress)
is GrantResponse.Rejected -> {
logger.warn("REJECTED $sourceDetails: ${grantResponse.message}")
respond(false, null)
getResponse(false, null)
}
is GrantResponse.Error -> {
logger.error("ERROR $sourceDetails", grantResponse.cause)
responseObserver.onError(StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION)))
throw StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION))
}
}
}

override fun batchGrantScopePermission(
request: BatchGrantScopePermissionRequest,
responseObserver: StreamObserver<BatchGrantScopePermissionResponse>,
) {
override suspend fun batchGrantScopePermission(request: BatchGrantScopePermissionRequest): BatchGrantScopePermissionResponse {
if (request.granteesList.isEmpty()) {
responseObserver.onError(StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("At least one grantee is required")))
return
throw StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("At least one grantee is required"))
}
val requesterAddress = address()
val completedGrantDetails = request.granteesList.mapNotNull { grantee ->
Expand All @@ -179,7 +150,7 @@ class ObjectStoreGatewayServer(
requestType = "Batch Grant",
)
// The response should only include grantee information when the grantees did not throw exceptions. The
// caller can cross-reference the sent request versus what was produced in that list to determine any failures,
// caller can cross-reference the intercepted request versus what was produced in that list to determine any failures,
// if they care.
when (grantResponse) {
is GrantResponse.Accepted -> grantee to grantResponse.granterAddress
Expand All @@ -193,27 +164,23 @@ class ObjectStoreGatewayServer(
}
}
}
responseObserver.onNext(
BatchGrantScopePermissionResponse.newBuilder().also { batchResponse ->
batchResponse.request = request
completedGrantDetails.map { (grantee, granterAddress) ->
GrantScopePermissionResponse.newBuilder().also { grantResponse ->
grantResponse.requestBuilder.scopeAddress = request.scopeAddress
grantResponse.requestBuilder.granteeAddress = grantee.granteeAddress
grantResponse.requestBuilder.grantId = grantee.grantId
granterAddress?.also { grantResponse.granterAddress = it }
grantResponse.grantAccepted = granterAddress != null
}.build()
}.also(batchResponse::addAllGrantResponses)
}.build()
)
responseObserver.onCompleted()
return BatchGrantScopePermissionResponse.newBuilder().also { batchResponse ->
batchResponse.request = request
completedGrantDetails.map { (grantee, granterAddress) ->
GrantScopePermissionResponse.newBuilder().also { grantResponse ->
grantResponse.requestBuilder.scopeAddress = request.scopeAddress
grantResponse.requestBuilder.granteeAddress = grantee.granteeAddress
grantResponse.requestBuilder.grantId = grantee.grantId
granterAddress?.also { grantResponse.granterAddress = it }
grantResponse.grantAccepted = granterAddress != null
}.build()
}.also(batchResponse::addAllGrantResponses)
}.build()
}

override fun revokeScopePermission(
override suspend fun revokeScopePermission(
request: GatewayOuterClass.RevokeScopePermissionRequest,
responseObserver: StreamObserver<RevokeScopePermissionResponse>,
) {
): RevokeScopePermissionResponse {
val requesterAddress = address()
val grantId = request.grantId.takeIf { it.isNotBlank() }
val sourceDetails = "Main revoke request by $requesterAddress for scope ${request.scopeAddress}, grantee ${request.granteeAddress}${if (grantId != null) ", grantId $grantId" else ""}"
Expand All @@ -230,25 +197,22 @@ class ObjectStoreGatewayServer(
grantId = grantId,
sourceDetails = sourceDetails,
)
val respond: (accepted: Boolean, revokedGrants: Int?) -> Unit = { accepted, revokedGrants ->
responseObserver.onNext(
RevokeScopePermissionResponse.newBuilder().also { rpcResp ->
rpcResp.request = request
rpcResp.revokeAccepted = accepted
revokedGrants?.also { rpcResp.revokedGrantsCount = it }
}.build()
)
responseObserver.onCompleted()
val getResponse: (accepted: Boolean, revokedGrants: Int?) -> RevokeScopePermissionResponse = { accepted, revokedGrants ->
RevokeScopePermissionResponse.newBuilder().also { rpcResp ->
rpcResp.request = request
rpcResp.revokeAccepted = accepted
revokedGrants?.also { rpcResp.revokedGrantsCount = it }
}.build()
}
when (revokeResponse) {
is RevokeResponse.Accepted -> respond(true, revokeResponse.revokedGrantsCount)
return when (revokeResponse) {
is RevokeResponse.Accepted -> getResponse(true, revokeResponse.revokedGrantsCount)
is RevokeResponse.Rejected -> {
logger.warn("REJECTED $sourceDetails: ${revokeResponse.message}")
respond(false, null)
getResponse(false, null)
}
is RevokeResponse.Error -> {
logger.error("ERROR $sourceDetails", revokeResponse.cause)
responseObserver.onError(StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION)))
throw StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION))
}
}
}
Expand Down

This file was deleted.

Loading

0 comments on commit f35eff9

Please sign in to comment.