Skip to content

Commit

Permalink
Merge branch 'master' into yurii/6863-receipt-validator-checks-number…
Browse files Browse the repository at this point in the history
…-of-receipts
  • Loading branch information
durkmurder authored Dec 19, 2023
2 parents ca26248 + 9296205 commit d99b7b9
Show file tree
Hide file tree
Showing 168 changed files with 7,621 additions and 2,367 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
steps:
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: "1.20"
- name: Checkout repo
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
uses: actions/checkout@v3
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: ${{ env.GO_VERSION }}
cache: true
Expand All @@ -63,6 +64,7 @@ jobs:
uses: actions/checkout@v3
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: ${{ env.GO_VERSION }}
cache: true
Expand All @@ -81,6 +83,7 @@ jobs:
uses: actions/checkout@v3
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: ${{ env.GO_VERSION }}
cache: true
Expand All @@ -102,6 +105,7 @@ jobs:
uses: actions/checkout@v3
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: ${{ env.GO_VERSION }}
cache: true
Expand Down Expand Up @@ -145,6 +149,7 @@ jobs:
uses: actions/checkout@v3
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: ${{ env.GO_VERSION }}
cache: true
Expand Down Expand Up @@ -177,6 +182,7 @@ jobs:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: ${{ env.GO_VERSION }}
cache: true
Expand Down Expand Up @@ -267,6 +273,7 @@ jobs:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v4
timeout-minutes: 10 # fail fast. sometimes this step takes an extremely long time
with:
go-version: ${{ env.GO_VERSION }}
cache: true
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ ci: install-tools test
# Runs integration tests
.PHONY: ci-integration
ci-integration:
$(MAKE) -C integration ci-integration-test
$(MAKE) -C integration integration-test

# Runs benchmark tests
# NOTE: we do not need `docker-build-flow` as this is run as a separate step
Expand Down
261 changes: 189 additions & 72 deletions cmd/access/node_builder/access_node_builder.go

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,12 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
}

fetcher := fetcher.NewCollectionFetcher(node.Logger, exeNode.collectionRequester, node.State, exeNode.exeConf.onflowOnlyLNs)
loader := loader.NewUnexecutedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
var blockLoader ingestion.BlockLoader
if exeNode.exeConf.enableStorehouse {
blockLoader = loader.NewUnfinalizedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
} else {
blockLoader = loader.NewUnexecutedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
}

exeNode.ingestionEng, err = ingestion.New(
exeNode.ingestionUnit,
Expand All @@ -978,7 +983,7 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
exeNode.executionDataPruner,
exeNode.blockDataUploader,
exeNode.stopControl,
loader,
blockLoader,
)

// TODO: we should solve these mutual dependencies better
Expand Down
101 changes: 65 additions & 36 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import (
"github.com/onflow/flow-go/network/p2p/p2plogging"
"github.com/onflow/flow-go/network/p2p/p2pnet"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/network/p2p/translator"
"github.com/onflow/flow-go/network/p2p/unicast/protocols"
"github.com/onflow/flow-go/network/p2p/utils"
Expand Down Expand Up @@ -470,24 +469,70 @@ func (builder *ObserverServiceBuilder) extraFlags() {
builder.ExtraFlags(func(flags *pflag.FlagSet) {
defaultConfig := DefaultObserverServiceConfig()

flags.StringVarP(&builder.rpcConf.UnsecureGRPCListenAddr, "rpc-addr", "r", defaultConfig.rpcConf.UnsecureGRPCListenAddr, "the address the unsecured gRPC server listens on")
flags.StringVar(&builder.rpcConf.SecureGRPCListenAddr, "secure-rpc-addr", defaultConfig.rpcConf.SecureGRPCListenAddr, "the address the secure gRPC server listens on")
flags.StringVarP(&builder.rpcConf.UnsecureGRPCListenAddr,
"rpc-addr",
"r",
defaultConfig.rpcConf.UnsecureGRPCListenAddr,
"the address the unsecured gRPC server listens on")
flags.StringVar(&builder.rpcConf.SecureGRPCListenAddr,
"secure-rpc-addr",
defaultConfig.rpcConf.SecureGRPCListenAddr,
"the address the secure gRPC server listens on")
flags.StringVarP(&builder.rpcConf.HTTPListenAddr, "http-addr", "h", defaultConfig.rpcConf.HTTPListenAddr, "the address the http proxy server listens on")
flags.StringVar(&builder.rpcConf.RestConfig.ListenAddress, "rest-addr", defaultConfig.rpcConf.RestConfig.ListenAddress, "the address the REST server listens on (if empty the REST server will not be started)")
flags.DurationVar(&builder.rpcConf.RestConfig.WriteTimeout, "rest-write-timeout", defaultConfig.rpcConf.RestConfig.WriteTimeout, "timeout to use when writing REST response")
flags.DurationVar(&builder.rpcConf.RestConfig.ReadTimeout, "rest-read-timeout", defaultConfig.rpcConf.RestConfig.ReadTimeout, "timeout to use when reading REST request headers")
flags.StringVar(&builder.rpcConf.RestConfig.ListenAddress,
"rest-addr",
defaultConfig.rpcConf.RestConfig.ListenAddress,
"the address the REST server listens on (if empty the REST server will not be started)")
flags.DurationVar(&builder.rpcConf.RestConfig.WriteTimeout,
"rest-write-timeout",
defaultConfig.rpcConf.RestConfig.WriteTimeout,
"timeout to use when writing REST response")
flags.DurationVar(&builder.rpcConf.RestConfig.ReadTimeout,
"rest-read-timeout",
defaultConfig.rpcConf.RestConfig.ReadTimeout,
"timeout to use when reading REST request headers")
flags.DurationVar(&builder.rpcConf.RestConfig.IdleTimeout, "rest-idle-timeout", defaultConfig.rpcConf.RestConfig.IdleTimeout, "idle timeout for REST connections")
flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", defaultConfig.rpcConf.MaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests")
flags.StringToIntVar(&builder.apiRatelimits, "api-rate-limits", defaultConfig.apiRatelimits, "per second rate limits for Access API methods e.g. Ping=300,GetTransaction=500 etc.")
flags.StringToIntVar(&builder.apiBurstlimits, "api-burst-limits", defaultConfig.apiBurstlimits, "burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.")
flags.StringVar(&builder.observerNetworkingKeyPath, "observer-networking-key-path", defaultConfig.observerNetworkingKeyPath, "path to the networking key for observer")
flags.StringSliceVar(&builder.bootstrapNodeAddresses, "bootstrap-node-addresses", defaultConfig.bootstrapNodeAddresses, "the network addresses of the bootstrap access node if this is an observer e.g. access-001.mainnet.flow.org:9653,access-002.mainnet.flow.org:9653")
flags.StringSliceVar(&builder.bootstrapNodePublicKeys, "bootstrap-node-public-keys", defaultConfig.bootstrapNodePublicKeys, "the networking public key of the bootstrap access node if this is an observer (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.UintVar(&builder.rpcConf.MaxMsgSize,
"rpc-max-message-size",
defaultConfig.rpcConf.MaxMsgSize,
"the maximum message size in bytes for messages sent or received over grpc")
flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize,
"connection-pool-size",
defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize,
"maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange,
"rpc-max-height-range",
defaultConfig.rpcConf.BackendConfig.MaxHeightRange,
"maximum size for height range requests")
flags.StringToIntVar(&builder.apiRatelimits,
"api-rate-limits",
defaultConfig.apiRatelimits,
"per second rate limits for Access API methods e.g. Ping=300,GetTransaction=500 etc.")
flags.StringToIntVar(&builder.apiBurstlimits,
"api-burst-limits",
defaultConfig.apiBurstlimits,
"burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.")
flags.StringVar(&builder.observerNetworkingKeyPath,
"observer-networking-key-path",
defaultConfig.observerNetworkingKeyPath,
"path to the networking key for observer")
flags.StringSliceVar(&builder.bootstrapNodeAddresses,
"bootstrap-node-addresses",
defaultConfig.bootstrapNodeAddresses,
"the network addresses of the bootstrap access node if this is an observer e.g. access-001.mainnet.flow.org:9653,access-002.mainnet.flow.org:9653")
flags.StringSliceVar(&builder.bootstrapNodePublicKeys,
"bootstrap-node-public-keys",
defaultConfig.bootstrapNodePublicKeys,
"the networking public key of the bootstrap access node if this is an observer (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.DurationVar(&builder.apiTimeout, "upstream-api-timeout", defaultConfig.apiTimeout, "tcp timeout for Flow API gRPC sockets to upstrem nodes")
flags.StringSliceVar(&builder.upstreamNodeAddresses, "upstream-node-addresses", defaultConfig.upstreamNodeAddresses, "the gRPC network addresses of the upstream access node. e.g. access-001.mainnet.flow.org:9000,access-002.mainnet.flow.org:9000")
flags.StringSliceVar(&builder.upstreamNodePublicKeys, "upstream-node-public-keys", defaultConfig.upstreamNodePublicKeys, "the networking public key of the upstream access node (in the same order as the upstream node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.StringSliceVar(&builder.upstreamNodeAddresses,
"upstream-node-addresses",
defaultConfig.upstreamNodeAddresses,
"the gRPC network addresses of the upstream access node. e.g. access-001.mainnet.flow.org:9000,access-002.mainnet.flow.org:9000")
flags.StringSliceVar(&builder.upstreamNodePublicKeys,
"upstream-node-public-keys",
defaultConfig.upstreamNodePublicKeys,
"the networking public key of the upstream access node (in the same order as the upstream node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
})
}
Expand Down Expand Up @@ -697,20 +742,9 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
pis = append(pis, pi)
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(),
NetworkingType: network.PublicNetwork,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

node, err := p2pbuilder.NewNodeBuilder(builder.Logger,
node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
&builder.FlowConfig.NetworkConfig.GossipSub,
&p2pconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
Expand All @@ -720,17 +754,14 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
networkKey,
builder.SporkID,
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.GossipSubScoringRegistryConfig,
&builder.FlowConfig.NetworkConfig.ResourceManager,
&builder.FlowConfig.NetworkConfig.GossipSubConfig,
p2pconfig.PeerManagerDisableConfig(), // disable peer manager for observer node.
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
},
meshTracer,
&p2pconfig.UnicastConfig{
UnicastConfig: builder.FlowConfig.NetworkConfig.UnicastConfig,
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
}).
SetSubscriptionFilter(
subscription.NewRoleBasedFilter(
Expand All @@ -745,8 +776,6 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
dht.BootstrapPeers(pis...),
)
}).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.FlowConfig.NetworkConfig.GossipSubConfig.ScoreTracerInterval).
Build()

if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,21 +309,21 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {

// setup default rate limiter options
unicastRateLimiterOpts := []ratelimit.RateLimitersOption{
ratelimit.WithDisabledRateLimiting(fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.DryRun),
ratelimit.WithDisabledRateLimiting(fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.DryRun),
ratelimit.WithNotifier(fnb.UnicastRateLimiterDistributor),
}

// override noop unicast message rate limiter
if fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit > 0 {
if fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.MessageRateLimit > 0 {
unicastMessageRateLimiter := ratelimiter.NewRateLimiter(
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.LockoutDuration,
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.MessageRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.MessageRateLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.LockoutDuration,
)
unicastRateLimiterOpts = append(unicastRateLimiterOpts, ratelimit.WithMessageRateLimiter(unicastMessageRateLimiter))

// avoid connection gating and pruning during dry run
if !fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.DryRun {
if !fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.DryRun {
f := rateLimiterPeerFilter(unicastMessageRateLimiter)
// add IsRateLimited peerFilters to conn gater intercept secure peer and peer manager filters list
// don't allow rate limited peers to establishing incoming connections
Expand All @@ -334,16 +334,16 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
}

// override noop unicast bandwidth rate limiter
if fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthRateLimit > 0 && fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthBurstLimit > 0 {
if fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthRateLimit > 0 && fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthBurstLimit > 0 {
unicastBandwidthRateLimiter := ratelimit.NewBandWidthRateLimiter(
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthBurstLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.LockoutDuration,
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthBurstLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.LockoutDuration,
)
unicastRateLimiterOpts = append(unicastRateLimiterOpts, ratelimit.WithBandwidthRateLimiter(unicastBandwidthRateLimiter))

// avoid connection gating and pruning during dry run
if !fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.DryRun {
if !fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.DryRun {
f := rateLimiterPeerFilter(unicastBandwidthRateLimiter)
// add IsRateLimited peerFilters to conn gater intercept secure peer and peer manager filters list
connGaterInterceptSecureFilters = append(connGaterInterceptSecureFilters, f)
Expand All @@ -355,7 +355,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
unicastRateLimiters := ratelimit.NewRateLimiters(unicastRateLimiterOpts...)

uniCfg := &p2pconfig.UnicastConfig{
UnicastConfig: fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig,
Unicast: fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast,
RateLimiterDistributor: fnb.UnicastRateLimiterDistributor,
}

Expand Down Expand Up @@ -394,10 +394,10 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
fnb.BaseConfig.NodeRole,
connGaterCfg,
peerManagerCfg,
&fnb.FlowConfig.NetworkConfig.GossipSubConfig,
&fnb.FlowConfig.NetworkConfig.GossipSub,
&fnb.FlowConfig.NetworkConfig.ResourceManager,
uniCfg,
&fnb.FlowConfig.NetworkConfig.ConnectionManagerConfig,
&fnb.FlowConfig.NetworkConfig.ConnectionManager,
&p2p.DisallowListCacheConfig{
MaxSize: fnb.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(fnb.HeroCacheMetricsFactory(), network.PrivateNetwork),
Expand Down Expand Up @@ -492,7 +492,7 @@ func (fnb *FlowNodeBuilder) InitFlowNetworkWithConduitFactory(
IdentityProvider: fnb.IdentityProvider,
ReceiveCache: receiveCache,
ConduitFactory: cf,
UnicastMessageTimeout: fnb.FlowConfig.NetworkConfig.UnicastMessageTimeout,
UnicastMessageTimeout: fnb.FlowConfig.NetworkConfig.Unicast.MessageTimeout,
IdentityTranslator: fnb.IDTranslator,
AlspCfg: &alspmgr.MisbehaviorReportManagerConfig{
Logger: fnb.Logger,
Expand Down
Loading

0 comments on commit d99b7b9

Please sign in to comment.