Skip to content

Commit

Permalink
send an immediate event on client watch connect to force grpc headers…
Browse files Browse the repository at this point in the history
… through (#100)

* send an immediate event on client watch connect to force grpc headers through

- prevents ConnectionTimeoutInterceptor from timing out when an envelope isn't executed within the timeout window

* move version string to constant
  • Loading branch information
celloman authored Dec 6, 2021
1 parent 3dac4e8 commit 4c2af7a
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 3 deletions.
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ allprojects {
)
}

processResources {
def properties = ['version': version]
inputs.properties(properties)
filesMatching('version.properties') {
expand(properties)
}
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
Expand Down
3 changes: 3 additions & 0 deletions p8e-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ dependencies {
// DataDog
implementation "com.datadoghq:java-dogstatsd-client:$datadog_version"

// SemVer parsing utility
implementation "net.swiftzer.semver:semver:$semver_version"

// Test Things
testImplementation 'io.grpc:grpc-testing:1.35.0'
testImplementation 'com.h2database:h2:1.4.200'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.provenance.engine.const

enum class P8eVersions(val version: String) {
V0_8_22("0.8.22"), // client version sending supported after this version https://github.com/provenance-io/p8e/pull/100
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ class JwtServerInterceptor(
}

val clientIp = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)
val clientVersion = headers[Constant.CLIENT_VERSION_KEY] ?: "unknown"

val context = Context.current()
.withValue(Constant.PUBLIC_KEY_CTX, publicKey)
.withValue(Constant.CLIENT_IP_CTX, clientIp.toString())
.withValue(Constant.CLIENT_VERSION_CTX, clientVersion)

return Contexts.interceptCall(context, call, headers, next)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.provenance.engine.grpc.observers

import io.grpc.stub.StreamObserver
import io.p8e.grpc.clientIp
import io.p8e.grpc.clientVersion
import io.p8e.grpc.observers.CompleteState
import io.p8e.grpc.observers.EndState
import io.p8e.grpc.observers.ExceptionState
Expand All @@ -14,6 +15,7 @@ import io.p8e.proto.Envelope.EnvelopeEvent.Action.*
import io.p8e.proto.Envelope.EnvelopeEvent.EventType
import io.p8e.proto.PK
import io.p8e.util.*
import io.provenance.engine.const.P8eVersions
import io.provenance.p8e.shared.extension.logger
import io.provenance.engine.domain.AffiliateConnectionRecord
import io.provenance.engine.domain.ConnectionStatus.CONNECTED
Expand All @@ -23,6 +25,7 @@ import io.provenance.engine.grpc.interceptors.statusRuntimeException
import io.provenance.engine.grpc.v1.*
import io.provenance.engine.service.EventService
import io.provenance.p8e.shared.service.AffiliateService
import net.swiftzer.semver.SemVer
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.OffsetDateTime
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -136,9 +139,31 @@ class EnvelopeEventObserver(
}
else -> queuer.error(IllegalStateException("Unknown action received by server ${value.action.name}").statusRuntimeException())
}
} else {
// send a dummy event back on connect to force headers through in order to satisfy the ConnectionTimeoutInterceptor
if (clientSupportsEnvelopeWatchConnected()) {
EnvelopeEvent.newBuilder()
.setEvent(EventType.ENVELOPE_WATCH_CONNECTED)
.setAction(CONNECT)
.setPublicKey(
PK.SigningAndEncryptionPublicKeys.newBuilder()
.setSigningPublicKey(queuerKey!!.publicKey.toPublicKeyProto())
.setEncryptionPublicKey(queuerKey!!.publicKey.toPublicKeyProto())
).build()
.let(queuer::queue)
} else {
logger().info("Skipping ENVELOPE_WATCH_CONNECTED event for unsupported client (version ${clientVersion()})")
}
}
}

private fun clientSupportsEnvelopeWatchConnected(): Boolean = try {
SemVer.parse(clientVersion()).compareTo(SemVer.parse(P8eVersions.V0_8_22.version)) > 0
} catch (e: Exception) {
logger().info("Error parsing version ${e.message}")
false
}

override fun onError(t: Throwable) {
disconnect(ExceptionState(t))
}
Expand Down
3 changes: 3 additions & 0 deletions p8e-common/src/main/kotlin/io/p8e/grpc/Constant.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ object Constant {
val JWT_ALGORITHM = "SHA256withECDSA"
val PUBLIC_KEY_CTX = Context.key<String>("public-key")
val CLIENT_IP_CTX = Context.key<String>("client-ip")
val CLIENT_VERSION_CTX = Context.key<String>("client-version")
val CLIENT_VERSION_KEY = Metadata.Key.of("p8e-client-version", Metadata.ASCII_STRING_MARSHALLER)
val JWT_METADATA_KEY = Metadata.Key.of("jwt", Metadata.ASCII_STRING_MARSHALLER)
val JWT_CTX_KEY = Context.key<String>("jwt")
val MAX_MESSAGE_SIZE = 200 * 1024 * 1024
}

fun publicKey() = Constant.PUBLIC_KEY_CTX.get().toJavaPublicKey()
fun clientIp() = Constant.CLIENT_IP_CTX.get()
fun clientVersion() = Constant.CLIENT_VERSION_CTX.get()

fun <T: Message> T.complete(observer: StreamObserver<T>) {
observer.onNext(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,33 @@ import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.p8e.grpc.Constant
import io.p8e.proto.Authentication
import io.p8e.util.toPublicKeyProto
import io.p8e.util.toByteString
import io.p8e.util.toProtoTimestampProv
import io.p8e.util.toPublicKeyProto
import java.security.KeyPair
import java.security.Signature
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.*
import java.util.concurrent.atomic.AtomicReference


class ChallengeResponseInterceptor(
private val keyPair: KeyPair,
private val authenticationClient: AuthenticationClient,
private val toleranceSeconds: Long = 3
): ClientInterceptor {
companion object {
private val P8E_VERSION = p8eVersion()

private fun p8eVersion(): String = Properties().let { properties ->
ChallengeResponseInterceptor::class.java.getResourceAsStream("/version.properties")?.use {
properties.load(it)
}
properties.getProperty("version", "")
}
}

private val jwt = AtomicReference("")

override fun <ReqT : Any, RespT : Any> interceptCall(
Expand All @@ -44,6 +56,9 @@ class ChallengeResponseInterceptor(
.takeIf { it.isNotEmpty() && !it.isExpired() }
?: authenticate())
}

headers.put(Constant.CLIENT_VERSION_KEY, P8E_VERSION)

super.start(
responseListener,
headers
Expand Down
1 change: 1 addition & 0 deletions p8e-proto-internal/src/main/proto/p8e/envelope.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message EnvelopeEvent {
ENVELOPE_MAILBOX_OUTBOUND = 4;
ENVELOPE_ACCEPTED = 5;
ENVELOPE_EXECUTION_ERROR = 6;
ENVELOPE_WATCH_CONNECTED = 7;
}

enum Action {
Expand Down
4 changes: 2 additions & 2 deletions p8e-sdk/src/main/kotlin/io/p8e/ContractManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class ContractManager(
.orThrow { IllegalStateException("Handlers not registered for ${clazz.name}") }

val response = when (event.event) {
EventType.ENVELOPE_ACCEPTED -> false
EventType.ENVELOPE_ACCEPTED, EventType.ENVELOPE_WATCH_CONNECTED -> false
EventType.ENVELOPE_RESPONSE -> try {
classHandlers.stepCompletionHandler.cast<T>().handle(constructContract(clazz, event))
} catch (t: Throwable) {
Expand Down Expand Up @@ -656,7 +656,7 @@ class ContractManager(
EventType.ENVELOPE_REQUEST,
EventType.ENVELOPE_RESPONSE -> Either.Right(constructContract(clazz, response))

EventType.UNRECOGNIZED, EventType.UNUSED_TYPE, null -> throw IllegalStateException("Invalid EventType of ${response.event}")
EventType.ENVELOPE_WATCH_CONNECTED, EventType.UNRECOGNIZED, EventType.UNUSED_TYPE, null -> throw IllegalStateException("Invalid EventType of ${response.event}")
}
}
} catch (t: Throwable) {
Expand Down
1 change: 1 addition & 0 deletions p8e-sdk/src/main/resources/version.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version=${version}
1 change: 1 addition & 0 deletions versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ext {
datadog_version = '2.11.0'
logback_version = '1.2.3'
javax_annotation_version = '1.3.2'
semver_version = '1.1.2'

gradle_download = '4.1.1'
provenance_version = '1.3.0'
Expand Down

0 comments on commit 4c2af7a

Please sign in to comment.