Skip to content

Commit

Permalink
Merge pull request #558 from oasisprotocol/mitjat/analyzer-sequencing
Browse files Browse the repository at this point in the history
analyzer: Slow-sync analyzers wait only for _relevant_ fast-sync analyzers to complete
  • Loading branch information
mitjat authored Nov 7, 2023
2 parents 02d919c + 1c01dc8 commit 0cb4781
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 41 deletions.
3 changes: 2 additions & 1 deletion analyzer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ var (

// Analyzer is a worker that analyzes a subset of the Oasis Network.
type Analyzer interface {
// Start starts the analyzer.
// Start starts the analyzer. The method should return once the analyzer
// is confident it has (and will have) no more work to do; that's possibly never.
Start(ctx context.Context)

// Name returns the name of the analyzer.
Expand Down
112 changes: 74 additions & 38 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func wipeStorage(cfg *config.StorageConfig) error {

// Service is Oasis Nexus's analysis service.
type Service struct {
analyzers []analyzer.Analyzer
fastSyncAnalyzers []analyzer.Analyzer
analyzers []SyncedAnalyzer
fastSyncAnalyzers []SyncedAnalyzer

sources *sourceFactory
target storage.TargetStorage
Expand Down Expand Up @@ -225,25 +225,45 @@ func (s *sourceFactory) IPFS(_ context.Context) (ipfsclient.Client, error) {
return s.ipfs, nil
}

// Shorthand for use within this file.
type A = analyzer.Analyzer

// An Analyzer that is tagged with a `syncTag`.
// The `syncTag` is used for sequencing analyzers: For any non-empty tag, nexus will
// first run all fast-sync analyzers with that tag to completion, and only then start
// other analyzers with the same tag. The empty tag "" is special; it can be used
// by slow-sync analyzers that don't need to wait for any fast-sync analyzers to complete.
// This mechanism is a simple(ish) alternative to supporting a full-blown execution/dependency graph between analyzers.
type SyncedAnalyzer struct {
Analyzer analyzer.Analyzer
SyncTag string
}

// addAnalyzer adds the analyzer produced by `analyzerGenerator()` to `analyzers`.
// It expects an initial state (analyzers, errSoFar) and returns the updated state, which
// should be fed into subsequent call to the function.
// As soon as an analyzerGenerator returns an error, all subsequent calls will
// short-circuit and return the same error, leaving `analyzers` unchanged.
func addAnalyzer(analyzers []A, errSoFar error, analyzerGenerator func() (A, error)) ([]A, error) {
// See `SyncedAnalyzer` for more info on `syncTag`.
func addAnalyzer(analyzers []SyncedAnalyzer, errSoFar error, syncTag string, analyzerGenerator func() (A, error)) ([]SyncedAnalyzer, error) {
if errSoFar != nil {
return analyzers, errSoFar
}
a, errSoFar := analyzerGenerator()
if errSoFar != nil {
return analyzers, errSoFar
}
analyzers = append(analyzers, a)
analyzers = append(analyzers, SyncedAnalyzer{Analyzer: a, SyncTag: syncTag})
return analyzers, nil
}

var (
syncTagConsensus = "consensus"
syncTagEmerald = string(common.RuntimeEmerald)
syncTagSapphire = string(common.RuntimeSapphire)
syncTagCipher = string(common.RuntimeCipher)
)

// NewService creates new Service.
func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
ctx := context.Background()
Expand All @@ -260,11 +280,11 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
}

// Initialize fast-sync analyzers.
fastSyncAnalyzers := []A{}
fastSyncAnalyzers := []SyncedAnalyzer{}
if cfg.Analyzers.Consensus != nil {
if fastRange := cfg.Analyzers.Consensus.FastSyncRange(); fastRange != nil {
for i := 0; i < cfg.Analyzers.Consensus.FastSync.Parallelism; i++ {
fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, func() (A, error) {
fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, syncTagConsensus, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
Expand All @@ -281,7 +301,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
if config != nil {
if fastRange := config.FastSyncRange(); fastRange != nil {
for i := 0; i < config.FastSync.Parallelism; i++ {
fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, func() (A, error) {
fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, string(runtimeName), func() (A, error) {
sdkPT := cfg.Source.SDKParaTime(runtimeName)
sourceClient, err1 := sources.Runtime(ctx, runtimeName)
if err1 != nil {
Expand All @@ -298,9 +318,9 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
addFastSyncRuntimeAnalyzers(common.RuntimeCipher, cfg.Analyzers.Cipher)

// Initialize slow-sync analyzers.
analyzers := []A{}
analyzers := []SyncedAnalyzer{}
if cfg.Analyzers.Consensus != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagConsensus, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
Expand All @@ -309,7 +329,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.Emerald != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) {
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeEmerald)
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
Expand All @@ -319,7 +339,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.Sapphire != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) {
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeSapphire)
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
Expand All @@ -329,7 +349,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.Cipher != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagCipher, func() (A, error) {
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeCipher)
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeCipher)
if err1 != nil {
Expand All @@ -339,7 +359,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.EmeraldEvmTokens != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
Expand All @@ -348,7 +368,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.SapphireEvmTokens != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
Expand All @@ -357,7 +377,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.EmeraldEvmNfts != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
Expand All @@ -370,7 +390,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.SapphireEvmNfts != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
Expand All @@ -384,7 +404,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
}
if cfg.Analyzers.EmeraldEvmTokenBalances != nil {
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeEmerald)
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
Expand All @@ -394,7 +414,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
}
if cfg.Analyzers.SapphireEvmTokenBalances != nil {
runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeSapphire)
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
Expand All @@ -403,7 +423,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.EmeraldContractCode != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
Expand All @@ -412,7 +432,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.SapphireContractCode != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) {
sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
Expand All @@ -421,22 +441,22 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.EmeraldContractVerifier != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) {
return evmverifier.NewAnalyzer(cfg.Source.ChainName, common.RuntimeEmerald, cfg.Analyzers.EmeraldContractVerifier.ItemBasedAnalyzerConfig, cfg.Analyzers.EmeraldContractVerifier.SourcifyServerUrl, dbClient, logger)
})
}
if cfg.Analyzers.SapphireContractVerifier != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) {
return evmverifier.NewAnalyzer(cfg.Source.ChainName, common.RuntimeSapphire, cfg.Analyzers.SapphireContractVerifier.ItemBasedAnalyzerConfig, cfg.Analyzers.SapphireContractVerifier.SourcifyServerUrl, dbClient, logger)
})
}
if cfg.Analyzers.MetadataRegistry != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
return metadata_registry.NewAnalyzer(cfg.Analyzers.MetadataRegistry.ItemBasedAnalyzerConfig, dbClient, logger)
})
}
if cfg.Analyzers.NodeStats != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
Expand All @@ -445,7 +465,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
})
}
if cfg.Analyzers.AggregateStats != nil {
analyzers, err = addAnalyzer(analyzers, err, func() (A, error) {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
return aggregate_stats.NewAggregateStatsAnalyzer(dbClient, logger)
})
}
Expand Down Expand Up @@ -475,33 +495,45 @@ func (a *Service) Start() {
defer cancelAnalyzers() // Start() only returns when analyzers are done, so this should be a no-op, but it makes the compiler happier.

// Start fast-sync analyzers.
var fastSyncWg sync.WaitGroup
fastSyncWg := map[string]*sync.WaitGroup{} // syncTag -> wg with all fast-sync analyzers with that tag
for _, an := range a.fastSyncAnalyzers {
fastSyncWg.Add(1)
go func(an analyzer.Analyzer) {
defer fastSyncWg.Done()
an.Start(ctx)
wg, ok := fastSyncWg[an.SyncTag]
if !ok {
wg = &sync.WaitGroup{}
fastSyncWg[an.SyncTag] = wg
}
wg.Add(1)
go func(an SyncedAnalyzer) {
defer wg.Done()
an.Analyzer.Start(ctx)
}(an)
}
fastSyncAnalyzersDone := util.ClosingChannel(&fastSyncWg)

// Prepare slow-sync analyzers (to be started after fast-sync analyzers are done).
var wg sync.WaitGroup
var slowSyncWg sync.WaitGroup
for _, an := range a.analyzers {
wg.Add(1)
go func(an analyzer.Analyzer) {
defer wg.Done()
slowSyncWg.Add(1)
go func(an SyncedAnalyzer) {
defer slowSyncWg.Done()

// Find the wait group for this analyzer's sync tag.
prereqWg, ok := fastSyncWg[an.SyncTag]
if !ok || an.SyncTag == "" {
// No fast-sync analyzers with this tag, start the analyzer immediately.
prereqWg = &sync.WaitGroup{}
}

// Start the analyzer after fast-sync analyzers,
// unless the context is canceled first (e.g. by ctrl+C during fast-sync).
select {
case <-ctx.Done():
return
case <-fastSyncAnalyzersDone:
an.Start(ctx)
case <-util.ClosingChannel(prereqWg):
an.Analyzer.Start(ctx)
}
}(an)
}
analyzersDone := util.ClosingChannel(&wg)
analyzersDone := util.ClosingChannel(&slowSyncWg)

// Trap Ctrl+C and SIGTERM; the latter is issued by Kubernetes to request a shutdown.
signalChan := make(chan os.Signal, 1)
Expand Down Expand Up @@ -535,6 +567,10 @@ func (a *Service) Start() {

// cleanup cleans up resources used by the service.
func (a *Service) cleanup() {
if a.sources == nil {
return
}

if err := a.sources.Close(); err != nil {
a.logger.Error("failed to cleanly close data source",
"firstErr", err.Error(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/nexus/cmd/analyzer"
cmdAnalyzer "github.com/oasisprotocol/nexus/cmd/analyzer"
"github.com/oasisprotocol/nexus/storage/postgres/testutil"
"github.com/oasisprotocol/nexus/tests"
)
Expand All @@ -27,5 +27,5 @@ func TestMigrations(t *testing.T) {
require.NoError(t, client.Wipe(ctx), "failed to wipe database")

// Run migrations.
require.NoError(t, analyzer.RunMigrations(migrationsPath, os.Getenv("CI_TEST_CONN_STRING")), "failed to run migrations")
require.NoError(t, cmdAnalyzer.RunMigrations(migrationsPath, os.Getenv("CI_TEST_CONN_STRING")), "failed to run migrations")
}
Loading

0 comments on commit 0cb4781

Please sign in to comment.