From 98859b2ef086a0d37c87939d12c216394c0e2817 Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Thu, 23 Nov 2023 18:41:28 -0500 Subject: [PATCH] solution: an option to use HTTP compression --- docs/reference-configuration.adoc | 22 +++++++++++++++++++ .../io/emeraldpay/dshackle/GrpcServer.kt | 9 ++++++++ .../emeraldpay/dshackle/config/MainConfig.kt | 1 + .../dshackle/config/MainConfigReader.kt | 3 +++ .../dshackle/config/UpstreamsConfig.kt | 3 +++ .../dshackle/config/UpstreamsConfigReader.kt | 7 ++++++ .../dshackle/startup/ConfiguredUpstreams.kt | 9 +++++++- .../upstream/ethereum/EthereumWsFactory.kt | 3 +++ .../upstream/ethereum/WsConnectionImpl.kt | 5 ++++- .../dshackle/upstream/grpc/GrpcUpstreams.kt | 14 +++++++++++- .../upstream/rpcclient/JsonRpcHttpClient.kt | 3 +++ 11 files changed, 76 insertions(+), 3 deletions(-) diff --git a/docs/reference-configuration.adoc b/docs/reference-configuration.adoc index b98c22efd..881ab1121 100644 --- a/docs/reference-configuration.adoc +++ b/docs/reference-configuration.adoc @@ -172,6 +172,11 @@ cluster: | `2449` | Port to bind gRPC server +| `compress` +| `true` +| Enable support for gRPC compression (i.e., `gzip`). + + | `tls` | | Setup TLS configuration for the gRPC server. @@ -898,6 +903,10 @@ rpc: password: "${ETH_PASSWORD}" ---- +| `rpc.compress` +| Enable compression for HTTP connection (i.e., `gzip`). May not work with some servers. +Defaults is `false` + | `ws.url` | WebSocket URL to connect to. Optional, but optimizes performance if it's available. @@ -922,6 +931,10 @@ Default is 15Mb | How many concurrent connection to make. If more than one, each used in a robin-round fashion. Defaults is `1` +| `ws.compress` +| Enable compression for WebSocket connection (i.e., `permessage-deflate`). May not work with some servers. +Defaults is `false` + |=== ==== Bitcoin Connection Options @@ -948,6 +961,10 @@ rpc: password: "${NODE_PASSWORD}" ---- +| `rpc.compress` +| Enable compression for HTTP connection (i.e., `gzip`). May not work with some servers. +Defaults is `false` + | `zeromq.address` a| Set up an additional connection via ZeroMQ protocol to subscribe to the new blocks. The node must be launched with the same address specified as `-zmqpubhashblock="tcp://${HOST}:${POST}"` or in `bitcoin.conf` @@ -1011,4 +1028,9 @@ See link:08-authentication.adoc[Authentication] docs and <>. | `tls.certificate` + `tls.key` | Client certificate (x509) and its private key (PKCS 8) used for authentication on the remote server. +| `compress` +| Enable compression for gRPC requests (i.e., `gzip`). +Defaults is `true` + + |=== diff --git a/src/main/kotlin/io/emeraldpay/dshackle/GrpcServer.kt b/src/main/kotlin/io/emeraldpay/dshackle/GrpcServer.kt index 598479703..fc4a9c9c8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/GrpcServer.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/GrpcServer.kt @@ -19,6 +19,8 @@ package io.emeraldpay.dshackle import io.emeraldpay.dshackle.config.MainConfig import io.emeraldpay.dshackle.monitoring.accesslog.AccessLogHandlerGrpc import io.emeraldpay.dshackle.monitoring.accesslog.GenerateRequestId +import io.grpc.CompressorRegistry +import io.grpc.DecompressorRegistry import io.grpc.Server import io.grpc.netty.NettyServerBuilder import org.slf4j.LoggerFactory @@ -53,6 +55,13 @@ open class GrpcServer( } else { it } + if (mainConfig.compress) { + // standard registry supports only GZip compression + it.compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + } else { + it + } } .intercept(GenerateRequestId()) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfig.kt index 3761f584f..a1ef0e056 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfig.kt @@ -19,6 +19,7 @@ class MainConfig { var host = "127.0.0.1" var port = 2449 var tls: AuthConfig.ServerTlsAuth? = null + var compress: Boolean = true var cache: CacheConfig? = null var proxy: ProxyConfig? = null var upstreams: UpstreamsConfig? = null diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfigReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfigReader.kt index 51a4f9390..b35582f8d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfigReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/MainConfigReader.kt @@ -52,6 +52,9 @@ class MainConfigReader( getValueAsInt(input, "port")?.let { config.port = it } + getValueAsBool(input, "compress")?.let { + config.compress = it + } authConfigReader.readServerTls(input)?.let { config.tls = it diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt index 918221c84..7b34e518b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt @@ -157,6 +157,7 @@ open class UpstreamsConfig { var port: Int = 0 var auth: AuthConfig.ClientTlsAuth? = null var autoTls: Boolean? = null + var compress: Boolean? = null } class EthereumConnection : RpcConnection() { @@ -178,6 +179,7 @@ open class UpstreamsConfig { class HttpEndpoint(val url: URI) { var basicAuth: AuthConfig.ClientBasicAuth? = null var tls: AuthConfig.ClientTlsAuth? = null + var compress: Boolean? = null } class WsEndpoint(val url: URI) { @@ -186,6 +188,7 @@ open class UpstreamsConfig { var frameSize: Int? = null var msgSize: Int? = null var connections: Int = 1 + var compress: Boolean? = null } // TODO make it unmodifiable after initial load diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt index 5e40366d3..6ff050864 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt @@ -104,6 +104,10 @@ class UpstreamsConfigReader( connection.rpc = http http.basicAuth = authConfigReader.readClientBasicAuth(node) http.tls = authConfigReader.readClientTls(node) + + getValueAsBool(node, "compress")?.let { + http.compress = it + } } } getMapping(connConfigNode, "ws")?.let { node -> @@ -133,6 +137,9 @@ class UpstreamsConfigReader( } ws.connections = it } + getValueAsBool(node, "compress")?.let { + ws.compress = it + } } } } else { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 11b1c2b2a..6ea49bf98 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -352,6 +352,9 @@ open class ConfiguredUpstreams( currentRequestLogWriter, ).apply { this.options = options + endpoint.compress?.let { value -> + this.compress = value + } } log.info("Using ALL CHAINS (gRPC) upstream, at ${endpoint.host}:${endpoint.port}") ds.subscribeUpstreamChanges() @@ -395,12 +398,16 @@ open class ConfiguredUpstreams( connectionMetrics = ConnectionMetrics(metricsTags) ) urls.add(endpoint.url) - JsonRpcHttpClient( + val client = JsonRpcHttpClient( endpoint.url.toString(), metrics, conn.rpc?.basicAuth, tls ) + conn.rpc?.compress?.let { value -> + client.compress = value + } + client } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt index bf9830541..d057e06b3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt @@ -74,6 +74,9 @@ open class EthereumWsFactory( config?.msgSize?.let { ws.msgSizeLimit = it } + config?.compress?.let { + ws.compress = it + } } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt index 59ad9cbd2..8352ca434 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt @@ -90,6 +90,9 @@ open class WsConnectionImpl( var frameSize: Int = DEFAULT_FRAME_SIZE var msgSizeLimit: Int = DEFAULT_MSG_SIZE + // Some upstreams fail to connect if compression is enabled + var compress: Boolean = false + private var reconnectBackoff: BackOff = ExponentialBackOff().also { it.initialInterval = Duration.ofMillis(100).toMillis() it.maxInterval = Duration.ofMinutes(5).toMillis() @@ -245,7 +248,7 @@ open class WsConnectionImpl( .websocket( WebsocketClientSpec.builder() .handlePing(true) - .compress(false) + .compress(compress) .maxFramePayloadLength(frameSize) .build() ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt index fa4116b6c..21acdda3a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt @@ -33,6 +33,8 @@ import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.ethereum.ConnectionMetrics import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics +import io.grpc.CompressorRegistry +import io.grpc.DecompressorRegistry import io.grpc.ManagedChannelBuilder import io.grpc.netty.NettyChannelBuilder import io.micrometer.core.instrument.Counter @@ -74,12 +76,16 @@ class GrpcUpstreams( private val known = HashMap() private val lock = ReentrantLock() + // for gRCP it's just a suggestion and works only if both sides support the same compression algorithm + // so, it's enabled by default because it shouldn't harm if the server doesn't support any compression + var compress = true + private val client: ReactorBlockchainGrpc.ReactorBlockchainStub get() { if (clientValue != null) { return clientValue!! } - val channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) { + var channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) { NettyChannelBuilder.forAddress(conn.host, conn.port) // some messages are very large. many of them in megabytes, some even in gigabytes (ex. ETH Traces) .maxInboundMessageSize(Int.MAX_VALUE) @@ -99,6 +105,12 @@ class GrpcUpstreams( } } + if (compress) { + // standard registry supports only GZip compression + channel = channel.compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + } + this.clientValue = ReactorBlockchainGrpc.newReactorStub(channel.build()) return this.clientValue!! } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt index e795cf7e8..1934536bc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt @@ -56,11 +56,14 @@ class JsonRpcHttpClient( private val parser = ResponseRpcParser() private val httpClient: HttpClient + // some nodes respond with a corrupted response if compression is enabled, so disable it by default + var compress: Boolean = false override var onHttpError: Consumer? = null init { var build = HttpClient.create() .resolver(DefaultAddressResolverGroup.INSTANCE) + .compress(compress) .doOnChannelInit(metrics.onChannelInit) build = build.headers { h ->