From 4c2af7abe725a4cf103def8e0bfedbe0cc678ce2 Mon Sep 17 00:00:00 2001 From: piercetrey-figure Date: Mon, 6 Dec 2021 15:15:24 -0700 Subject: [PATCH] send an immediate event on client watch connect to force grpc headers through (#100) * send an immediate event on client watch connect to force grpc headers through - prevents ConnectionTimeoutInterceptor from timing out when an envelope isn't executed within the timeout window * move version string to constant --- build.gradle | 8 ++++++ p8e-api/build.gradle | 3 +++ .../io/provenance/engine/const/Versions.kt | 5 ++++ .../grpc/interceptors/JwtServerInterceptor.kt | 2 ++ .../grpc/observers/EnvelopeEventObserver.kt | 25 +++++++++++++++++++ .../src/main/kotlin/io/p8e/grpc/Constant.kt | 3 +++ .../client/ChallengeResponseInterceptor.kt | 17 ++++++++++++- .../src/main/proto/p8e/envelope.proto | 1 + .../src/main/kotlin/io/p8e/ContractManager.kt | 4 +-- p8e-sdk/src/main/resources/version.properties | 1 + versions.gradle | 1 + 11 files changed, 67 insertions(+), 3 deletions(-) create mode 100644 p8e-api/src/main/kotlin/io/provenance/engine/const/Versions.kt create mode 100644 p8e-sdk/src/main/resources/version.properties diff --git a/build.gradle b/build.gradle index bf9ed63f..22fca248 100644 --- a/build.gradle +++ b/build.gradle @@ -64,6 +64,14 @@ allprojects { ) } + processResources { + def properties = ['version': version] + inputs.properties(properties) + filesMatching('version.properties') { + expand(properties) + } + } + task sourcesJar(type: Jar, dependsOn: classes) { classifier = 'sources' from sourceSets.main.allSource diff --git a/p8e-api/build.gradle b/p8e-api/build.gradle index 392d1586..7de738e6 100644 --- a/p8e-api/build.gradle +++ b/p8e-api/build.gradle @@ -133,6 +133,9 @@ dependencies { // DataDog implementation "com.datadoghq:java-dogstatsd-client:$datadog_version" + // SemVer parsing utility + implementation "net.swiftzer.semver:semver:$semver_version" + // Test Things testImplementation 'io.grpc:grpc-testing:1.35.0' testImplementation 'com.h2database:h2:1.4.200' diff --git a/p8e-api/src/main/kotlin/io/provenance/engine/const/Versions.kt b/p8e-api/src/main/kotlin/io/provenance/engine/const/Versions.kt new file mode 100644 index 00000000..8606f4af --- /dev/null +++ b/p8e-api/src/main/kotlin/io/provenance/engine/const/Versions.kt @@ -0,0 +1,5 @@ +package io.provenance.engine.const + +enum class P8eVersions(val version: String) { + V0_8_22("0.8.22"), // client version sending supported after this version https://github.com/provenance-io/p8e/pull/100 +} diff --git a/p8e-api/src/main/kotlin/io/provenance/engine/grpc/interceptors/JwtServerInterceptor.kt b/p8e-api/src/main/kotlin/io/provenance/engine/grpc/interceptors/JwtServerInterceptor.kt index 8b3a0b12..daf86e59 100644 --- a/p8e-api/src/main/kotlin/io/provenance/engine/grpc/interceptors/JwtServerInterceptor.kt +++ b/p8e-api/src/main/kotlin/io/provenance/engine/grpc/interceptors/JwtServerInterceptor.kt @@ -31,10 +31,12 @@ class JwtServerInterceptor( } val clientIp = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) + val clientVersion = headers[Constant.CLIENT_VERSION_KEY] ?: "unknown" val context = Context.current() .withValue(Constant.PUBLIC_KEY_CTX, publicKey) .withValue(Constant.CLIENT_IP_CTX, clientIp.toString()) + .withValue(Constant.CLIENT_VERSION_CTX, clientVersion) return Contexts.interceptCall(context, call, headers, next) } diff --git a/p8e-api/src/main/kotlin/io/provenance/engine/grpc/observers/EnvelopeEventObserver.kt b/p8e-api/src/main/kotlin/io/provenance/engine/grpc/observers/EnvelopeEventObserver.kt index 98583c3a..44e487d9 100644 --- a/p8e-api/src/main/kotlin/io/provenance/engine/grpc/observers/EnvelopeEventObserver.kt +++ b/p8e-api/src/main/kotlin/io/provenance/engine/grpc/observers/EnvelopeEventObserver.kt @@ -2,6 +2,7 @@ package io.provenance.engine.grpc.observers import io.grpc.stub.StreamObserver import io.p8e.grpc.clientIp +import io.p8e.grpc.clientVersion import io.p8e.grpc.observers.CompleteState import io.p8e.grpc.observers.EndState import io.p8e.grpc.observers.ExceptionState @@ -14,6 +15,7 @@ import io.p8e.proto.Envelope.EnvelopeEvent.Action.* import io.p8e.proto.Envelope.EnvelopeEvent.EventType import io.p8e.proto.PK import io.p8e.util.* +import io.provenance.engine.const.P8eVersions import io.provenance.p8e.shared.extension.logger import io.provenance.engine.domain.AffiliateConnectionRecord import io.provenance.engine.domain.ConnectionStatus.CONNECTED @@ -23,6 +25,7 @@ import io.provenance.engine.grpc.interceptors.statusRuntimeException import io.provenance.engine.grpc.v1.* import io.provenance.engine.service.EventService import io.provenance.p8e.shared.service.AffiliateService +import net.swiftzer.semver.SemVer import org.jetbrains.exposed.sql.transactions.transaction import java.time.OffsetDateTime import java.util.concurrent.ConcurrentHashMap @@ -136,9 +139,31 @@ class EnvelopeEventObserver( } else -> queuer.error(IllegalStateException("Unknown action received by server ${value.action.name}").statusRuntimeException()) } + } else { + // send a dummy event back on connect to force headers through in order to satisfy the ConnectionTimeoutInterceptor + if (clientSupportsEnvelopeWatchConnected()) { + EnvelopeEvent.newBuilder() + .setEvent(EventType.ENVELOPE_WATCH_CONNECTED) + .setAction(CONNECT) + .setPublicKey( + PK.SigningAndEncryptionPublicKeys.newBuilder() + .setSigningPublicKey(queuerKey!!.publicKey.toPublicKeyProto()) + .setEncryptionPublicKey(queuerKey!!.publicKey.toPublicKeyProto()) + ).build() + .let(queuer::queue) + } else { + logger().info("Skipping ENVELOPE_WATCH_CONNECTED event for unsupported client (version ${clientVersion()})") + } } } + private fun clientSupportsEnvelopeWatchConnected(): Boolean = try { + SemVer.parse(clientVersion()).compareTo(SemVer.parse(P8eVersions.V0_8_22.version)) > 0 + } catch (e: Exception) { + logger().info("Error parsing version ${e.message}") + false + } + override fun onError(t: Throwable) { disconnect(ExceptionState(t)) } diff --git a/p8e-common/src/main/kotlin/io/p8e/grpc/Constant.kt b/p8e-common/src/main/kotlin/io/p8e/grpc/Constant.kt index 7b51cc63..df7f1d6b 100644 --- a/p8e-common/src/main/kotlin/io/p8e/grpc/Constant.kt +++ b/p8e-common/src/main/kotlin/io/p8e/grpc/Constant.kt @@ -11,6 +11,8 @@ object Constant { val JWT_ALGORITHM = "SHA256withECDSA" val PUBLIC_KEY_CTX = Context.key("public-key") val CLIENT_IP_CTX = Context.key("client-ip") + val CLIENT_VERSION_CTX = Context.key("client-version") + val CLIENT_VERSION_KEY = Metadata.Key.of("p8e-client-version", Metadata.ASCII_STRING_MARSHALLER) val JWT_METADATA_KEY = Metadata.Key.of("jwt", Metadata.ASCII_STRING_MARSHALLER) val JWT_CTX_KEY = Context.key("jwt") val MAX_MESSAGE_SIZE = 200 * 1024 * 1024 @@ -18,6 +20,7 @@ object Constant { fun publicKey() = Constant.PUBLIC_KEY_CTX.get().toJavaPublicKey() fun clientIp() = Constant.CLIENT_IP_CTX.get() +fun clientVersion() = Constant.CLIENT_VERSION_CTX.get() fun T.complete(observer: StreamObserver) { observer.onNext(this) diff --git a/p8e-common/src/main/kotlin/io/p8e/grpc/client/ChallengeResponseInterceptor.kt b/p8e-common/src/main/kotlin/io/p8e/grpc/client/ChallengeResponseInterceptor.kt index a132095c..eb7cdf62 100644 --- a/p8e-common/src/main/kotlin/io/p8e/grpc/client/ChallengeResponseInterceptor.kt +++ b/p8e-common/src/main/kotlin/io/p8e/grpc/client/ChallengeResponseInterceptor.kt @@ -10,9 +10,9 @@ import io.grpc.Metadata import io.grpc.MethodDescriptor import io.p8e.grpc.Constant import io.p8e.proto.Authentication -import io.p8e.util.toPublicKeyProto import io.p8e.util.toByteString import io.p8e.util.toProtoTimestampProv +import io.p8e.util.toPublicKeyProto import java.security.KeyPair import java.security.Signature import java.time.OffsetDateTime @@ -20,11 +20,23 @@ import java.time.ZoneOffset import java.util.* import java.util.concurrent.atomic.AtomicReference + class ChallengeResponseInterceptor( private val keyPair: KeyPair, private val authenticationClient: AuthenticationClient, private val toleranceSeconds: Long = 3 ): ClientInterceptor { + companion object { + private val P8E_VERSION = p8eVersion() + + private fun p8eVersion(): String = Properties().let { properties -> + ChallengeResponseInterceptor::class.java.getResourceAsStream("/version.properties")?.use { + properties.load(it) + } + properties.getProperty("version", "") + } + } + private val jwt = AtomicReference("") override fun interceptCall( @@ -44,6 +56,9 @@ class ChallengeResponseInterceptor( .takeIf { it.isNotEmpty() && !it.isExpired() } ?: authenticate()) } + + headers.put(Constant.CLIENT_VERSION_KEY, P8E_VERSION) + super.start( responseListener, headers diff --git a/p8e-proto-internal/src/main/proto/p8e/envelope.proto b/p8e-proto-internal/src/main/proto/p8e/envelope.proto index f120e21c..9e27a354 100644 --- a/p8e-proto-internal/src/main/proto/p8e/envelope.proto +++ b/p8e-proto-internal/src/main/proto/p8e/envelope.proto @@ -55,6 +55,7 @@ message EnvelopeEvent { ENVELOPE_MAILBOX_OUTBOUND = 4; ENVELOPE_ACCEPTED = 5; ENVELOPE_EXECUTION_ERROR = 6; + ENVELOPE_WATCH_CONNECTED = 7; } enum Action { diff --git a/p8e-sdk/src/main/kotlin/io/p8e/ContractManager.kt b/p8e-sdk/src/main/kotlin/io/p8e/ContractManager.kt index 0f656fa6..787e389d 100644 --- a/p8e-sdk/src/main/kotlin/io/p8e/ContractManager.kt +++ b/p8e-sdk/src/main/kotlin/io/p8e/ContractManager.kt @@ -563,7 +563,7 @@ class ContractManager( .orThrow { IllegalStateException("Handlers not registered for ${clazz.name}") } val response = when (event.event) { - EventType.ENVELOPE_ACCEPTED -> false + EventType.ENVELOPE_ACCEPTED, EventType.ENVELOPE_WATCH_CONNECTED -> false EventType.ENVELOPE_RESPONSE -> try { classHandlers.stepCompletionHandler.cast().handle(constructContract(clazz, event)) } catch (t: Throwable) { @@ -656,7 +656,7 @@ class ContractManager( EventType.ENVELOPE_REQUEST, EventType.ENVELOPE_RESPONSE -> Either.Right(constructContract(clazz, response)) - EventType.UNRECOGNIZED, EventType.UNUSED_TYPE, null -> throw IllegalStateException("Invalid EventType of ${response.event}") + EventType.ENVELOPE_WATCH_CONNECTED, EventType.UNRECOGNIZED, EventType.UNUSED_TYPE, null -> throw IllegalStateException("Invalid EventType of ${response.event}") } } } catch (t: Throwable) { diff --git a/p8e-sdk/src/main/resources/version.properties b/p8e-sdk/src/main/resources/version.properties new file mode 100644 index 00000000..a50bf5c8 --- /dev/null +++ b/p8e-sdk/src/main/resources/version.properties @@ -0,0 +1 @@ +version=${version} diff --git a/versions.gradle b/versions.gradle index 875e02ec..befda317 100644 --- a/versions.gradle +++ b/versions.gradle @@ -20,6 +20,7 @@ ext { datadog_version = '2.11.0' logback_version = '1.2.3' javax_annotation_version = '1.3.2' + semver_version = '1.1.2' gradle_download = '4.1.1' provenance_version = '1.3.0'