Skip to content

Commit

Permalink
Merge branch 'master' into jord/5828-allow-service-events-default
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanschalm authored May 10, 2024
2 parents bef5cfa + dceead4 commit a8e1e6b
Show file tree
Hide file tree
Showing 79 changed files with 2,807 additions and 1,964 deletions.
69 changes: 50 additions & 19 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/chainsync"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
Expand Down Expand Up @@ -594,6 +595,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
),
}

if !builder.BitswapReprovideEnabled {
opts = append(opts, blob.WithReprovideInterval(-1))
}

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, ds, opts...)
if err != nil {
Expand Down Expand Up @@ -694,6 +699,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
blob.NewTracer(node.Logger.With().Str("public_blob_service", channels.PublicExecutionDataService.String()).Logger()),
),
),
blob.WithParentBlobService(bs),
}

net := builder.AccessNodeConfig.PublicNetworkConfig.Network
Expand Down Expand Up @@ -1439,12 +1445,15 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedBlockHeight storage.ConsumerProgress

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
}

ingestionDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(ingestionDependable)
var lastFullBlockHeight *counters.PersistentStrictMonotonicCounter

builder.
BuildConsensusFollower().
Expand Down Expand Up @@ -1619,6 +1628,24 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Storage.LightTransactionResults)
return nil
}).
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
processedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
rootBlockHeight := node.State.Params().FinalizedRoot().Height

var err error
lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight),
rootBlockHeight,
)
if err != nil {
return fmt.Errorf("failed to initialize monotonic consumer progress: %w", err)
}

return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1717,9 +1744,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
LastFullBlockHeight: lastFullBlockHeight,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down Expand Up @@ -1785,6 +1813,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Storage.Results,
node.Storage.Receipts,
builder.collectionExecutedMetric,
processedBlockHeight,
lastFullBlockHeight,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1836,23 +1866,24 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
})
}

builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
node.IdentityProvider,
node.IDTranslator,
node.Me,
builder.PingMetrics,
builder.pingEnabled,
builder.nodeInfoFile,
node.PingService,
)
if err != nil {
return nil, fmt.Errorf("could not create ping engine: %w", err)
}
if builder.pingEnabled {
builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
node.IdentityProvider,
node.IDTranslator,
node.Me,
builder.PingMetrics,
builder.nodeInfoFile,
node.PingService,
)
if err != nil {
return nil, fmt.Errorf("could not create ping engine: %w", err)
}

return ping, nil
})
return ping, nil
})
}

return builder.FlowNodeBuilder.Build()
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ func (exeNode *ExecutionNode) LoadBlobService(
),
}

if !node.BitswapReprovideEnabled {
opts = append(opts, blob.WithReprovideInterval(-1))
}

if exeNode.exeConf.blobstoreRateLimit > 0 && exeNode.exeConf.blobstoreBurstLimit > 0 {
opts = append(opts, blob.WithRateLimit(float64(exeNode.exeConf.blobstoreRateLimit), exeNode.exeConf.blobstoreBurstLimit))
}
Expand Down Expand Up @@ -1323,7 +1327,6 @@ func (exeNode *ExecutionNode) LoadSynchronizationEngine(
error,
) {
// initialize the synchronization engine
//var err error
spamConfig, err := synchronization.NewSpamDetectionConfig()
if err != nil {
return nil, fmt.Errorf("could not initialize spam detection config: %w", err)
Expand Down
19 changes: 14 additions & 5 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ type BaseConfig struct {

// FlowConfig Flow configuration.
FlowConfig config.FlowConfig

// DhtSystemEnabled configures whether the DHT system is enabled on Access and Execution nodes.
DhtSystemEnabled bool

// BitswapReprovideEnabled configures whether the Bitswap reprovide mechanism is enabled.
// This is only meaningful to Access and Execution nodes.
BitswapReprovideEnabled bool
}

// NodeConfig contains all the derived parameters such the NodeID, private keys etc. and initialized instances of
Expand Down Expand Up @@ -235,7 +242,7 @@ type StateExcerptAtBoot struct {
SealedRootBlock *flow.Block // The last sealed block when bootstrapped.
RootQC *flow.QuorumCertificate // QC for Finalized Root Block
RootResult *flow.ExecutionResult // Result for SealedRootBlock
RootSeal *flow.Seal //Seal for RootResult
RootSeal *flow.Seal // Seal for RootResult
RootChainID flow.ChainID
SporkID flow.Identifier
LastFinalizedHeader *flow.Header // last finalized header when the node boots up
Expand Down Expand Up @@ -280,10 +287,12 @@ func DefaultBaseConfig() *BaseConfig {
Duration: 10 * time.Second,
},

HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
}
}

Expand Down
26 changes: 18 additions & 8 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
fnb.flags.UintVar(&fnb.BaseConfig.guaranteesCacheSize, "guarantees-cache-size", bstorage.DefaultCacheSize, "collection guarantees cache size")
fnb.flags.UintVar(&fnb.BaseConfig.receiptsCacheSize, "receipts-cache-size", bstorage.DefaultCacheSize, "receipts cache size")

fnb.flags.BoolVar(&fnb.BaseConfig.DhtSystemEnabled,
"dht-enabled",
defaultConfig.DhtSystemEnabled,
"[experimental] whether to enable dht system. This is an experimental feature. Use with caution.")
fnb.flags.BoolVar(&fnb.BaseConfig.BitswapReprovideEnabled,
"bitswap-reprovide-enabled",
defaultConfig.BitswapReprovideEnabled,
"[experimental] whether to enable bitswap reproviding. This is an experimental feature. Use with caution.")

// dynamic node startup flags
fnb.flags.StringVar(&fnb.BaseConfig.DynamicStartupANPubkey,
"dynamic-startup-access-publickey",
Expand Down Expand Up @@ -414,7 +423,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
return publicLibp2pNode, nil
}

dhtActivationStatus, err := DhtSystemActivationStatus(fnb.NodeRole)
dhtActivationStatus, err := DhtSystemActivationStatus(fnb.NodeRole, fnb.DhtSystemEnabled)
if err != nil {
return nil, fmt.Errorf("could not determine dht activation status: %w", err)
}
Expand Down Expand Up @@ -2131,10 +2140,11 @@ func (fnb *FlowNodeBuilder) extraFlagsValidation() error {
// DhtSystemActivationStatus parses the given role string and returns the corresponding DHT system activation status.
// Args:
// - roleStr: the role string to parse.
// - enabled: whether the DHT system is configured to be enabled. Only meaningful for access and execution nodes.
// Returns:
// - DhtSystemActivation: the corresponding DHT system activation status.
// - error: if the role string is invalid, returns an error.
func DhtSystemActivationStatus(roleStr string) (p2pbuilder.DhtSystemActivation, error) {
func DhtSystemActivationStatus(roleStr string, enabled bool) (p2pbuilder.DhtSystemActivation, error) {
if roleStr == "ghost" {
// ghost node is not a valid role, so we don't need to parse it
return p2pbuilder.DhtSystemDisabled, nil
Expand All @@ -2145,12 +2155,12 @@ func DhtSystemActivationStatus(roleStr string) (p2pbuilder.DhtSystemActivation,
// ghost role is not a valid role, so we don't need to parse it
return p2pbuilder.DhtSystemDisabled, fmt.Errorf("could not parse node role: %w", err)
}
if role == flow.RoleAccess || role == flow.RoleExecution {
// Only access and execution nodes need to run DHT;
// Access nodes and execution nodes need DHT to run a blob service.
// Moreover, access nodes run a DHT to let un-staked (public) access nodes find each other on the public network.
return p2pbuilder.DhtSystemEnabled, nil

// Only access and execution nodes need to run DHT; which is used by bitswap.
// Access nodes also run a DHT on the public network for peer discovery of un-staked nodes.
if role != flow.RoleAccess && role != flow.RoleExecution {
return p2pbuilder.DhtSystemDisabled, nil
}

return p2pbuilder.DhtSystemDisabled, nil
return p2pbuilder.DhtSystemActivation(enabled), nil
}
24 changes: 23 additions & 1 deletion cmd/scaffold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,56 +738,78 @@ func TestDhtSystemActivationStatus(t *testing.T) {
tests := []struct {
name string
roleStr string
enabled bool
expected p2pbuilder.DhtSystemActivation
expectErr bool
}{
{
name: "ghost role returns disabled",
roleStr: "ghost",
enabled: true,
expected: p2pbuilder.DhtSystemDisabled,
expectErr: false,
},
{
name: "access role returns enabled",
roleStr: "access",
enabled: true,
expected: p2pbuilder.DhtSystemEnabled,
expectErr: false,
},
{
name: "execution role returns enabled",
roleStr: "execution",
enabled: true,
expected: p2pbuilder.DhtSystemEnabled,
expectErr: false,
},
{
name: "access role with disabled returns disabled",
roleStr: "access",
enabled: false,
expected: p2pbuilder.DhtSystemDisabled,
expectErr: false,
},
{
name: "execution role with disabled returns disabled",
roleStr: "execution",
enabled: false,
expected: p2pbuilder.DhtSystemDisabled,
expectErr: false,
},
{
name: "collection role returns disabled",
roleStr: "collection",
enabled: true,
expected: p2pbuilder.DhtSystemDisabled,
expectErr: false,
},
{
name: "consensus role returns disabled",
roleStr: "consensus",
enabled: true,
expected: p2pbuilder.DhtSystemDisabled,
expectErr: false,
},
{
name: "verification nodes return disabled",
roleStr: "verification",
enabled: true,
expected: p2pbuilder.DhtSystemDisabled,
expectErr: false,
},
{
name: "invalid role returns error",
roleStr: "invalidRole",
enabled: true,
expected: p2pbuilder.DhtSystemDisabled,
expectErr: true,
}, // Add more test cases for other roles, if needed.
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := DhtSystemActivationStatus(tt.roleStr)
result, err := DhtSystemActivationStatus(tt.roleStr, tt.enabled)
require.Equal(t, tt.expectErr, err != nil, "unexpected error status")
require.Equal(t, tt.expected, result, "unexpected activation status")
})
Expand Down
22 changes: 12 additions & 10 deletions cmd/testclient/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@ require (
require (
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/ethereum/go-ethereum v1.9.9 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/ethereum/go-ethereum v1.13.15 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/onflow/cadence v0.4.0 // indirect
github.com/onflow/flow/protobuf/go/flow v0.1.5-0.20200601215056-34a11def1d6b // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/raviqqe/hamt v0.0.0-20190615202029-864fb7caef85 // indirect
github.com/rivo/uniseg v0.1.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/segmentio/fasthash v1.0.2 // indirect
github.com/stretchr/testify v1.5.1 // indirect
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit a8e1e6b

Please sign in to comment.