Skip to content

Commit

Permalink
Merge pull request #275 from emeraldpay/feat/compression
Browse files Browse the repository at this point in the history
solution: an option to use HTTP compression
  • Loading branch information
splix authored Nov 23, 2023
2 parents 5c6499f + 98859b2 commit dbb06d3
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 3 deletions.
22 changes: 22 additions & 0 deletions docs/reference-configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ cluster:
| `2449`
| Port to bind gRPC server

| `compress`
| `true`
| Enable support for gRPC compression (i.e., `gzip`).


| `tls`
|
| Setup TLS configuration for the gRPC server.
Expand Down Expand Up @@ -898,6 +903,10 @@ rpc:
password: "${ETH_PASSWORD}"
----

| `rpc.compress`
| Enable compression for HTTP connection (i.e., `gzip`). May not work with some servers.
Defaults is `false`

| `ws.url`
| WebSocket URL to connect to.
Optional, but optimizes performance if it's available.
Expand All @@ -922,6 +931,10 @@ Default is 15Mb
| How many concurrent connection to make. If more than one, each used in a robin-round fashion.
Defaults is `1`

| `ws.compress`
| Enable compression for WebSocket connection (i.e., `permessage-deflate`). May not work with some servers.
Defaults is `false`

|===

==== Bitcoin Connection Options
Expand All @@ -948,6 +961,10 @@ rpc:
password: "${NODE_PASSWORD}"
----

| `rpc.compress`
| Enable compression for HTTP connection (i.e., `gzip`). May not work with some servers.
Defaults is `false`

| `zeromq.address`
a| Set up an additional connection via ZeroMQ protocol to subscribe to the new blocks.
The node must be launched with the same address specified as `-zmqpubhashblock="tcp://${HOST}:${POST}"` or in `bitcoin.conf`
Expand Down Expand Up @@ -1011,4 +1028,9 @@ See link:08-authentication.adoc[Authentication] docs and <<tls>>.
| `tls.certificate` + `tls.key`
| Client certificate (x509) and its private key (PKCS 8) used for authentication on the remote server.

| `compress`
| Enable compression for gRPC requests (i.e., `gzip`).
Defaults is `true`


|===
9 changes: 9 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/GrpcServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package io.emeraldpay.dshackle
import io.emeraldpay.dshackle.config.MainConfig
import io.emeraldpay.dshackle.monitoring.accesslog.AccessLogHandlerGrpc
import io.emeraldpay.dshackle.monitoring.accesslog.GenerateRequestId
import io.grpc.CompressorRegistry
import io.grpc.DecompressorRegistry
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -53,6 +55,13 @@ open class GrpcServer(
} else {
it
}
if (mainConfig.compress) {
// standard registry supports only GZip compression
it.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
} else {
it
}
}
.intercept(GenerateRequestId())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MainConfig {
var host = "127.0.0.1"
var port = 2449
var tls: AuthConfig.ServerTlsAuth? = null
var compress: Boolean = true
var cache: CacheConfig? = null
var proxy: ProxyConfig? = null
var upstreams: UpstreamsConfig? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class MainConfigReader(
getValueAsInt(input, "port")?.let {
config.port = it
}
getValueAsBool(input, "compress")?.let {
config.compress = it
}

authConfigReader.readServerTls(input)?.let {
config.tls = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ open class UpstreamsConfig {
var port: Int = 0
var auth: AuthConfig.ClientTlsAuth? = null
var autoTls: Boolean? = null
var compress: Boolean? = null
}

class EthereumConnection : RpcConnection() {
Expand All @@ -178,6 +179,7 @@ open class UpstreamsConfig {
class HttpEndpoint(val url: URI) {
var basicAuth: AuthConfig.ClientBasicAuth? = null
var tls: AuthConfig.ClientTlsAuth? = null
var compress: Boolean? = null
}

class WsEndpoint(val url: URI) {
Expand All @@ -186,6 +188,7 @@ open class UpstreamsConfig {
var frameSize: Int? = null
var msgSize: Int? = null
var connections: Int = 1
var compress: Boolean? = null
}

// TODO make it unmodifiable after initial load
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class UpstreamsConfigReader(
connection.rpc = http
http.basicAuth = authConfigReader.readClientBasicAuth(node)
http.tls = authConfigReader.readClientTls(node)

getValueAsBool(node, "compress")?.let {
http.compress = it
}
}
}
getMapping(connConfigNode, "ws")?.let { node ->
Expand Down Expand Up @@ -133,6 +137,9 @@ class UpstreamsConfigReader(
}
ws.connections = it
}
getValueAsBool(node, "compress")?.let {
ws.compress = it
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ open class ConfiguredUpstreams(
currentRequestLogWriter,
).apply {
this.options = options
endpoint.compress?.let { value ->
this.compress = value
}
}
log.info("Using ALL CHAINS (gRPC) upstream, at ${endpoint.host}:${endpoint.port}")
ds.subscribeUpstreamChanges()
Expand Down Expand Up @@ -395,12 +398,16 @@ open class ConfiguredUpstreams(
connectionMetrics = ConnectionMetrics(metricsTags)
)
urls.add(endpoint.url)
JsonRpcHttpClient(
val client = JsonRpcHttpClient(
endpoint.url.toString(),
metrics,
conn.rpc?.basicAuth,
tls
)
conn.rpc?.compress?.let { value ->
client.compress = value
}
client
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ open class EthereumWsFactory(
config?.msgSize?.let {
ws.msgSizeLimit = it
}
config?.compress?.let {
ws.compress = it
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ open class WsConnectionImpl(
var frameSize: Int = DEFAULT_FRAME_SIZE
var msgSizeLimit: Int = DEFAULT_MSG_SIZE

// Some upstreams fail to connect if compression is enabled
var compress: Boolean = false

private var reconnectBackoff: BackOff = ExponentialBackOff().also {
it.initialInterval = Duration.ofMillis(100).toMillis()
it.maxInterval = Duration.ofMinutes(5).toMillis()
Expand Down Expand Up @@ -245,7 +248,7 @@ open class WsConnectionImpl(
.websocket(
WebsocketClientSpec.builder()
.handlePing(true)
.compress(false)
.compress(compress)
.maxFramePayloadLength(frameSize)
.build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.ethereum.ConnectionMetrics
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient
import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
import io.grpc.CompressorRegistry
import io.grpc.DecompressorRegistry
import io.grpc.ManagedChannelBuilder
import io.grpc.netty.NettyChannelBuilder
import io.micrometer.core.instrument.Counter
Expand Down Expand Up @@ -74,12 +76,16 @@ class GrpcUpstreams(
private val known = HashMap<Chain, DefaultUpstream>()
private val lock = ReentrantLock()

// for gRCP it's just a suggestion and works only if both sides support the same compression algorithm
// so, it's enabled by default because it shouldn't harm if the server doesn't support any compression
var compress = true

private val client: ReactorBlockchainGrpc.ReactorBlockchainStub
get() {
if (clientValue != null) {
return clientValue!!
}
val channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) {
var channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) {
NettyChannelBuilder.forAddress(conn.host, conn.port)
// some messages are very large. many of them in megabytes, some even in gigabytes (ex. ETH Traces)
.maxInboundMessageSize(Int.MAX_VALUE)
Expand All @@ -99,6 +105,12 @@ class GrpcUpstreams(
}
}

if (compress) {
// standard registry supports only GZip compression
channel = channel.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
}

this.clientValue = ReactorBlockchainGrpc.newReactorStub(channel.build())
return this.clientValue!!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ class JsonRpcHttpClient(
private val parser = ResponseRpcParser()
private val httpClient: HttpClient

// some nodes respond with a corrupted response if compression is enabled, so disable it by default
var compress: Boolean = false
override var onHttpError: Consumer<Int>? = null

init {
var build = HttpClient.create()
.resolver(DefaultAddressResolverGroup.INSTANCE)
.compress(compress)
.doOnChannelInit(metrics.onChannelInit)

build = build.headers { h ->
Expand Down

0 comments on commit dbb06d3

Please sign in to comment.