Skip to content

Commit

Permalink
Query chain directly when fetching latest scope for IndexGrpc (#96)
Browse files Browse the repository at this point in the history
* Query chain directly when fetching latest scope for IndexGrpc

- Allows scopes that were created by new scope sdk to be queried properly via the old p8e
- This is an important piece of the los transition to the new sdk (since the final AssignToServicer contract requires scope sdk multiparty support to move that to the new sdk)
- Also prevent error resulting from incoming events that have no `lastEvent.executionUuid` set (i.e. scopes written by something other than the legacy p8e system, such as the new scope sdk, p8e doesn't need to do anything with these scopes anyways)

* only fetch scope from chain when not found in the database

- attempting to preserve full backwards compatibility with regular old p8e scope use cases

* remove unused import

* use mapNotNull
  • Loading branch information
piercetrey-figure authored Nov 22, 2021
1 parent 1bc7323 commit d99f90c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
55 changes: 52 additions & 3 deletions p8e-api/src/main/kotlin/io/provenance/engine/grpc/v1/IndexGrpc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.grpc.stub.StreamObserver
import io.p8e.definition.DefinitionService
import io.p8e.grpc.complete
import io.p8e.grpc.publicKey
import io.p8e.proto.ContractScope
import io.p8e.proto.Index
import io.p8e.proto.Index.ElasticSearchQueryRequest
import io.p8e.proto.Index.FactHistoryRequest
Expand Down Expand Up @@ -35,6 +36,7 @@ import io.provenance.os.client.OsClient
import io.provenance.p8e.shared.extension.logger
import io.provenance.p8e.shared.util.P8eMDC
import io.p8e.proto.Util.UUID
import io.provenance.engine.service.ProvenanceGrpcService
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.index.query.QueryBuilders
import org.jetbrains.exposed.sql.transactions.transaction
Expand All @@ -45,7 +47,8 @@ class IndexGrpc(
private val affiliateService: AffiliateService,
iOsClient: OsClient,
private val indexService: IndexService,
private val objectMapper: ObjectMapper
private val objectMapper: ObjectMapper,
private val provenanceGrpcService: ProvenanceGrpcService,
): IndexServiceImplBase() {
private val log = logger()

Expand Down Expand Up @@ -122,6 +125,15 @@ class IndexGrpc(

indexService.findLatestByScopeUuid(scopeUuid.toUuidProv())
?.toScopeWrapper()
.or {
log.debug("Scope not found in database, attempting to fetch from chain")
try {
provenanceGrpcService.retrieveScope(scopeUuid.toUuidProv()).toScopeWrapper()
} catch (e: Exception) {
log.debug("Failed to fetch scope from chain")
ScopeWrapper.getDefaultInstance()
}
}
.or { ScopeWrapper.getDefaultInstance() }
.complete(responseObserver)
}
Expand All @@ -132,9 +144,38 @@ class IndexGrpc(
) {
P8eMDC.set(publicKey(), clear = true)

indexService.findLatestByScopeUuids(request.uuidsList.map { it.toUuidProv() })
val indexedScopes = indexService.findLatestByScopeUuids(request.uuidsList.map { it.toUuidProv() })
.toScopeWrappers()
.complete(responseObserver)

log.debug("Fetched ${indexedScopes.scopesCount}/${request.uuidsCount} scopes from db")

var allScopes = indexedScopes

if (indexedScopes.scopesCount != request.uuidsCount) {
val fetchedUuids = indexedScopes.scopesList.map { it.scope.uuid.toUuidProv() }.toSet()


val chainScopes = request.uuidsList
.map { it.toUuidProv() }
.filterNot { fetchedUuids.contains(it) }
.also { log.debug("Attempting to fetch an additional ${it.count()} scopes from chain") }
.mapNotNull {
try {
provenanceGrpcService.retrieveScope(it)
} catch (e: Exception) {
null
}
}
.contractScopesToScopeWrappers()

log.debug("Fetched additional ${chainScopes.scopesCount} scopes from chain")

allScopes = allScopes.toBuilder()
.addAllScopes(chainScopes.scopesList)
.build()
}

allScopes.complete(responseObserver)
}

override fun queryCount(
Expand Down Expand Up @@ -244,6 +285,14 @@ fun List<IndexScopeRecord>.toScopeWrappers(): ScopeWrappers {
.build()
}

fun ContractScope.Scope.toScopeWrapper(): ScopeWrapper = ScopeWrapper.newBuilder()
.setScope(this)
.build()

fun List<ContractScope.Scope>.contractScopesToScopeWrappers(): ScopeWrappers = ScopeWrappers.newBuilder()
.addAllScopes(map { it.toScopeWrapper() })
.build()

fun SearchResponse.toRawQueryResults() = Index.RawQueryResults.newBuilder()
.addAllResults(hits.map { hit ->
Index.RawQueryResult.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class ProvenanceGrpcService(
)
}

fun retrieveScope(uuid: UUID): ContractScope.Scope = retrieveScope(uuid.toString())

fun retrieveScope(address: String): ContractScope.Scope {
val (scopeResponse, contractSpecHashLookup, scopeSpecificationName) = try {
val scopeResponse = metadataQueryService.scope(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class ScopeStream(
log.info("Received event stream block!")

transaction {
val uuids = IndexScopeRecord.batchInsert(blockHeight, events).flatMap { it.value }.toSet()
val filteredEvents = events.filter { it.scope.lastEvent.executionUuid.value.isNotBlank() } // p8e events will have this set, any other scope creates/updates aren't from p8e
val uuids = IndexScopeRecord.batchInsert(blockHeight, filteredEvents).flatMap { it.value }.toSet()
uuids.forEach {
// TODO: write a test for and possibly fix the case where multiple parties on multiparty contract share same node, but have different index names
// may need to index each separately to ensure both indexes populated https://github.com/FigureTechnologies/p8e/pull/444/files#r557465484
Expand All @@ -127,15 +128,15 @@ class ScopeStream(
}
}

events.map { it.txHash }
filteredEvents.map { it.txHash }
.toSet()
.also { txHashes -> log.debug("Received the following TXs $txHashes at height $blockHeight") }
.forEach { txHash -> TransactionStatusRecord.setSuccess(txHash) }

// Mark that we've stored up to the given block height for indexing.
EventStreamRecord.update(eventStreamId, blockHeight)

events.forEach {
filteredEvents.forEach {
ChaincodeInvokeService.unlockScope(it.scope.uuid.value)
}
}
Expand Down

0 comments on commit d99f90c

Please sign in to comment.