Skip to content

Commit

Permalink
Merge pull request #5045 from onflow/yahya/6870-fix-memory-intensive-…
Browse files Browse the repository at this point in the history
…issue-part-3

[Networking] Caching application specific score of GossipSub
  • Loading branch information
yhassanzadeh13 authored Dec 15, 2023
2 parents 0fd744c + d2ece43 commit 02bb636
Show file tree
Hide file tree
Showing 62 changed files with 2,638 additions and 1,313 deletions.
231 changes: 166 additions & 65 deletions cmd/access/node_builder/access_node_builder.go

Large diffs are not rendered by default.

99 changes: 64 additions & 35 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import (
"github.com/onflow/flow-go/network/p2p/p2plogging"
"github.com/onflow/flow-go/network/p2p/p2pnet"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/network/p2p/translator"
"github.com/onflow/flow-go/network/p2p/unicast/protocols"
"github.com/onflow/flow-go/network/p2p/utils"
Expand Down Expand Up @@ -470,24 +469,70 @@ func (builder *ObserverServiceBuilder) extraFlags() {
builder.ExtraFlags(func(flags *pflag.FlagSet) {
defaultConfig := DefaultObserverServiceConfig()

flags.StringVarP(&builder.rpcConf.UnsecureGRPCListenAddr, "rpc-addr", "r", defaultConfig.rpcConf.UnsecureGRPCListenAddr, "the address the unsecured gRPC server listens on")
flags.StringVar(&builder.rpcConf.SecureGRPCListenAddr, "secure-rpc-addr", defaultConfig.rpcConf.SecureGRPCListenAddr, "the address the secure gRPC server listens on")
flags.StringVarP(&builder.rpcConf.UnsecureGRPCListenAddr,
"rpc-addr",
"r",
defaultConfig.rpcConf.UnsecureGRPCListenAddr,
"the address the unsecured gRPC server listens on")
flags.StringVar(&builder.rpcConf.SecureGRPCListenAddr,
"secure-rpc-addr",
defaultConfig.rpcConf.SecureGRPCListenAddr,
"the address the secure gRPC server listens on")
flags.StringVarP(&builder.rpcConf.HTTPListenAddr, "http-addr", "h", defaultConfig.rpcConf.HTTPListenAddr, "the address the http proxy server listens on")
flags.StringVar(&builder.rpcConf.RestConfig.ListenAddress, "rest-addr", defaultConfig.rpcConf.RestConfig.ListenAddress, "the address the REST server listens on (if empty the REST server will not be started)")
flags.DurationVar(&builder.rpcConf.RestConfig.WriteTimeout, "rest-write-timeout", defaultConfig.rpcConf.RestConfig.WriteTimeout, "timeout to use when writing REST response")
flags.DurationVar(&builder.rpcConf.RestConfig.ReadTimeout, "rest-read-timeout", defaultConfig.rpcConf.RestConfig.ReadTimeout, "timeout to use when reading REST request headers")
flags.StringVar(&builder.rpcConf.RestConfig.ListenAddress,
"rest-addr",
defaultConfig.rpcConf.RestConfig.ListenAddress,
"the address the REST server listens on (if empty the REST server will not be started)")
flags.DurationVar(&builder.rpcConf.RestConfig.WriteTimeout,
"rest-write-timeout",
defaultConfig.rpcConf.RestConfig.WriteTimeout,
"timeout to use when writing REST response")
flags.DurationVar(&builder.rpcConf.RestConfig.ReadTimeout,
"rest-read-timeout",
defaultConfig.rpcConf.RestConfig.ReadTimeout,
"timeout to use when reading REST request headers")
flags.DurationVar(&builder.rpcConf.RestConfig.IdleTimeout, "rest-idle-timeout", defaultConfig.rpcConf.RestConfig.IdleTimeout, "idle timeout for REST connections")
flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", defaultConfig.rpcConf.MaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests")
flags.StringToIntVar(&builder.apiRatelimits, "api-rate-limits", defaultConfig.apiRatelimits, "per second rate limits for Access API methods e.g. Ping=300,GetTransaction=500 etc.")
flags.StringToIntVar(&builder.apiBurstlimits, "api-burst-limits", defaultConfig.apiBurstlimits, "burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.")
flags.StringVar(&builder.observerNetworkingKeyPath, "observer-networking-key-path", defaultConfig.observerNetworkingKeyPath, "path to the networking key for observer")
flags.StringSliceVar(&builder.bootstrapNodeAddresses, "bootstrap-node-addresses", defaultConfig.bootstrapNodeAddresses, "the network addresses of the bootstrap access node if this is an observer e.g. access-001.mainnet.flow.org:9653,access-002.mainnet.flow.org:9653")
flags.StringSliceVar(&builder.bootstrapNodePublicKeys, "bootstrap-node-public-keys", defaultConfig.bootstrapNodePublicKeys, "the networking public key of the bootstrap access node if this is an observer (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.UintVar(&builder.rpcConf.MaxMsgSize,
"rpc-max-message-size",
defaultConfig.rpcConf.MaxMsgSize,
"the maximum message size in bytes for messages sent or received over grpc")
flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize,
"connection-pool-size",
defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize,
"maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange,
"rpc-max-height-range",
defaultConfig.rpcConf.BackendConfig.MaxHeightRange,
"maximum size for height range requests")
flags.StringToIntVar(&builder.apiRatelimits,
"api-rate-limits",
defaultConfig.apiRatelimits,
"per second rate limits for Access API methods e.g. Ping=300,GetTransaction=500 etc.")
flags.StringToIntVar(&builder.apiBurstlimits,
"api-burst-limits",
defaultConfig.apiBurstlimits,
"burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.")
flags.StringVar(&builder.observerNetworkingKeyPath,
"observer-networking-key-path",
defaultConfig.observerNetworkingKeyPath,
"path to the networking key for observer")
flags.StringSliceVar(&builder.bootstrapNodeAddresses,
"bootstrap-node-addresses",
defaultConfig.bootstrapNodeAddresses,
"the network addresses of the bootstrap access node if this is an observer e.g. access-001.mainnet.flow.org:9653,access-002.mainnet.flow.org:9653")
flags.StringSliceVar(&builder.bootstrapNodePublicKeys,
"bootstrap-node-public-keys",
defaultConfig.bootstrapNodePublicKeys,
"the networking public key of the bootstrap access node if this is an observer (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.DurationVar(&builder.apiTimeout, "upstream-api-timeout", defaultConfig.apiTimeout, "tcp timeout for Flow API gRPC sockets to upstrem nodes")
flags.StringSliceVar(&builder.upstreamNodeAddresses, "upstream-node-addresses", defaultConfig.upstreamNodeAddresses, "the gRPC network addresses of the upstream access node. e.g. access-001.mainnet.flow.org:9000,access-002.mainnet.flow.org:9000")
flags.StringSliceVar(&builder.upstreamNodePublicKeys, "upstream-node-public-keys", defaultConfig.upstreamNodePublicKeys, "the networking public key of the upstream access node (in the same order as the upstream node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.StringSliceVar(&builder.upstreamNodeAddresses,
"upstream-node-addresses",
defaultConfig.upstreamNodeAddresses,
"the gRPC network addresses of the upstream access node. e.g. access-001.mainnet.flow.org:9000,access-002.mainnet.flow.org:9000")
flags.StringSliceVar(&builder.upstreamNodePublicKeys,
"upstream-node-public-keys",
defaultConfig.upstreamNodePublicKeys,
"the networking public key of the upstream access node (in the same order as the upstream node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
})
}
Expand Down Expand Up @@ -697,20 +742,9 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
pis = append(pis, pi)
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(),
NetworkingType: network.PublicNetwork,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

node, err := p2pbuilder.NewNodeBuilder(builder.Logger,
node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
&builder.FlowConfig.NetworkConfig.GossipSub,
&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
Expand All @@ -720,15 +754,12 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
networkKey,
builder.SporkID,
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubScoringRegistryConfig,
&builder.FlowConfig.NetworkConfig.ResourceManager,
&builder.FlowConfig.NetworkConfig.GossipSubConfig,
p2pconfig.PeerManagerDisableConfig(), // disable peer manager for observer node.
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
},
meshTracer,
&p2pconfig.UnicastConfig{
UnicastConfig: builder.FlowConfig.NetworkConfig.UnicastConfig,
}).
Expand All @@ -745,8 +776,6 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
dht.BootstrapPeers(pis...),
)
}).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.FlowConfig.NetworkConfig.GossipSubConfig.ScoreTracerInterval).
Build()

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
fnb.BaseConfig.NodeRole,
connGaterCfg,
peerManagerCfg,
&fnb.FlowConfig.NetworkConfig.GossipSubConfig,
&fnb.FlowConfig.NetworkConfig.GossipSub,
&fnb.FlowConfig.NetworkConfig.ResourceManager,
uniCfg,
&fnb.FlowConfig.NetworkConfig.ConnectionManagerConfig,
Expand Down
Loading

0 comments on commit 02bb636

Please sign in to comment.