diff --git a/analyzer/api.go b/analyzer/api.go index eae47199d..5edda3fa8 100644 --- a/analyzer/api.go +++ b/analyzer/api.go @@ -17,7 +17,8 @@ var ( // Analyzer is a worker that analyzes a subset of the Oasis Network. type Analyzer interface { - // Start starts the analyzer. + // Start starts the analyzer. The method should return once the analyzer + // is confident it has (and will have) no more work to do; that's possibly never. Start(ctx context.Context) // Name returns the name of the analyzer. diff --git a/cmd/analyzer/analyzer.go b/cmd/analyzer/analyzer.go index c53833d6b..463070879 100644 --- a/cmd/analyzer/analyzer.go +++ b/cmd/analyzer/analyzer.go @@ -147,8 +147,8 @@ func wipeStorage(cfg *config.StorageConfig) error { // Service is Oasis Nexus's analysis service. type Service struct { - analyzers []analyzer.Analyzer - fastSyncAnalyzers []analyzer.Analyzer + analyzers []SyncedAnalyzer + fastSyncAnalyzers []SyncedAnalyzer sources *sourceFactory target storage.TargetStorage @@ -225,14 +225,27 @@ func (s *sourceFactory) IPFS(_ context.Context) (ipfsclient.Client, error) { return s.ipfs, nil } +// Shorthand for use within this file. type A = analyzer.Analyzer +// An Analyzer that is tagged with a `syncTag`. +// The `syncTag` is used for sequencing analyzers: For any non-empty tag, nexus will +// first run all fast-sync analyzers with that tag to completion, and only then start +// other analyzers with the same tag. The empty tag "" is special; it can be used +// by slow-sync analyzers that don't need to wait for any fast-sync analyzers to complete. +// This mechanism is a simple(ish) alternative to supporting a full-blown execution/dependency graph between analyzers. +type SyncedAnalyzer struct { + Analyzer analyzer.Analyzer + SyncTag string +} + // addAnalyzer adds the analyzer produced by `analyzerGenerator()` to `analyzers`. // It expects an initial state (analyzers, errSoFar) and returns the updated state, which // should be fed into subsequent call to the function. // As soon as an analyzerGenerator returns an error, all subsequent calls will // short-circuit and return the same error, leaving `analyzers` unchanged. -func addAnalyzer(analyzers []A, errSoFar error, analyzerGenerator func() (A, error)) ([]A, error) { +// See `SyncedAnalyzer` for more info on `syncTag`. +func addAnalyzer(analyzers []SyncedAnalyzer, errSoFar error, syncTag string, analyzerGenerator func() (A, error)) ([]SyncedAnalyzer, error) { if errSoFar != nil { return analyzers, errSoFar } @@ -240,10 +253,17 @@ func addAnalyzer(analyzers []A, errSoFar error, analyzerGenerator func() (A, err if errSoFar != nil { return analyzers, errSoFar } - analyzers = append(analyzers, a) + analyzers = append(analyzers, SyncedAnalyzer{Analyzer: a, SyncTag: syncTag}) return analyzers, nil } +var ( + syncTagConsensus = "consensus" + syncTagEmerald = string(common.RuntimeEmerald) + syncTagSapphire = string(common.RuntimeSapphire) + syncTagCipher = string(common.RuntimeCipher) +) + // NewService creates new Service. func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo ctx := context.Background() @@ -260,11 +280,11 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo } // Initialize fast-sync analyzers. - fastSyncAnalyzers := []A{} + fastSyncAnalyzers := []SyncedAnalyzer{} if cfg.Analyzers.Consensus != nil { if fastRange := cfg.Analyzers.Consensus.FastSyncRange(); fastRange != nil { for i := 0; i < cfg.Analyzers.Consensus.FastSync.Parallelism; i++ { - fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, func() (A, error) { + fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, syncTagConsensus, func() (A, error) { sourceClient, err1 := sources.Consensus(ctx) if err1 != nil { return nil, err1 @@ -281,7 +301,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo if config != nil { if fastRange := config.FastSyncRange(); fastRange != nil { for i := 0; i < config.FastSync.Parallelism; i++ { - fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, func() (A, error) { + fastSyncAnalyzers, err = addAnalyzer(fastSyncAnalyzers, err, string(runtimeName), func() (A, error) { sdkPT := cfg.Source.SDKParaTime(runtimeName) sourceClient, err1 := sources.Runtime(ctx, runtimeName) if err1 != nil { @@ -298,9 +318,9 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo addFastSyncRuntimeAnalyzers(common.RuntimeCipher, cfg.Analyzers.Cipher) // Initialize slow-sync analyzers. - analyzers := []A{} + analyzers := []SyncedAnalyzer{} if cfg.Analyzers.Consensus != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagConsensus, func() (A, error) { sourceClient, err1 := sources.Consensus(ctx) if err1 != nil { return nil, err1 @@ -309,7 +329,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.Emerald != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) { runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeEmerald) sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) if err1 != nil { @@ -319,7 +339,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.Sapphire != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) { runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeSapphire) sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) if err1 != nil { @@ -329,7 +349,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.Cipher != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagCipher, func() (A, error) { runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeCipher) sourceClient, err1 := sources.Runtime(ctx, common.RuntimeCipher) if err1 != nil { @@ -339,7 +359,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.EmeraldEvmTokens != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) if err1 != nil { return nil, err1 @@ -348,7 +368,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.SapphireEvmTokens != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) if err1 != nil { return nil, err1 @@ -357,7 +377,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.EmeraldEvmNfts != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) if err1 != nil { return nil, err1 @@ -370,7 +390,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.SapphireEvmNfts != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) if err1 != nil { return nil, err1 @@ -384,7 +404,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo } if cfg.Analyzers.EmeraldEvmTokenBalances != nil { runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeEmerald) - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) if err1 != nil { return nil, err1 @@ -394,7 +414,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo } if cfg.Analyzers.SapphireEvmTokenBalances != nil { runtimeMetadata := cfg.Source.SDKParaTime(common.RuntimeSapphire) - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) if err1 != nil { return nil, err1 @@ -403,7 +423,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.EmeraldContractCode != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) if err1 != nil { return nil, err1 @@ -412,7 +432,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.SapphireContractCode != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) { sourceClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) if err1 != nil { return nil, err1 @@ -421,22 +441,22 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.EmeraldContractVerifier != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagEmerald, func() (A, error) { return evmverifier.NewAnalyzer(cfg.Source.ChainName, common.RuntimeEmerald, cfg.Analyzers.EmeraldContractVerifier.ItemBasedAnalyzerConfig, cfg.Analyzers.EmeraldContractVerifier.SourcifyServerUrl, dbClient, logger) }) } if cfg.Analyzers.SapphireContractVerifier != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, syncTagSapphire, func() (A, error) { return evmverifier.NewAnalyzer(cfg.Source.ChainName, common.RuntimeSapphire, cfg.Analyzers.SapphireContractVerifier.ItemBasedAnalyzerConfig, cfg.Analyzers.SapphireContractVerifier.SourcifyServerUrl, dbClient, logger) }) } if cfg.Analyzers.MetadataRegistry != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) { return metadata_registry.NewAnalyzer(cfg.Analyzers.MetadataRegistry.ItemBasedAnalyzerConfig, dbClient, logger) }) } if cfg.Analyzers.NodeStats != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) { sourceClient, err1 := sources.Consensus(ctx) if err1 != nil { return nil, err1 @@ -445,7 +465,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo }) } if cfg.Analyzers.AggregateStats != nil { - analyzers, err = addAnalyzer(analyzers, err, func() (A, error) { + analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) { return aggregate_stats.NewAggregateStatsAnalyzer(dbClient, logger) }) } @@ -475,33 +495,45 @@ func (a *Service) Start() { defer cancelAnalyzers() // Start() only returns when analyzers are done, so this should be a no-op, but it makes the compiler happier. // Start fast-sync analyzers. - var fastSyncWg sync.WaitGroup + fastSyncWg := map[string]*sync.WaitGroup{} // syncTag -> wg with all fast-sync analyzers with that tag for _, an := range a.fastSyncAnalyzers { - fastSyncWg.Add(1) - go func(an analyzer.Analyzer) { - defer fastSyncWg.Done() - an.Start(ctx) + wg, ok := fastSyncWg[an.SyncTag] + if !ok { + wg = &sync.WaitGroup{} + fastSyncWg[an.SyncTag] = wg + } + wg.Add(1) + go func(an SyncedAnalyzer) { + defer wg.Done() + an.Analyzer.Start(ctx) }(an) } - fastSyncAnalyzersDone := util.ClosingChannel(&fastSyncWg) // Prepare slow-sync analyzers (to be started after fast-sync analyzers are done). - var wg sync.WaitGroup + var slowSyncWg sync.WaitGroup for _, an := range a.analyzers { - wg.Add(1) - go func(an analyzer.Analyzer) { - defer wg.Done() + slowSyncWg.Add(1) + go func(an SyncedAnalyzer) { + defer slowSyncWg.Done() + + // Find the wait group for this analyzer's sync tag. + prereqWg, ok := fastSyncWg[an.SyncTag] + if !ok || an.SyncTag == "" { + // No fast-sync analyzers with this tag, start the analyzer immediately. + prereqWg = &sync.WaitGroup{} + } + // Start the analyzer after fast-sync analyzers, // unless the context is canceled first (e.g. by ctrl+C during fast-sync). select { case <-ctx.Done(): return - case <-fastSyncAnalyzersDone: - an.Start(ctx) + case <-util.ClosingChannel(prereqWg): + an.Analyzer.Start(ctx) } }(an) } - analyzersDone := util.ClosingChannel(&wg) + analyzersDone := util.ClosingChannel(&slowSyncWg) // Trap Ctrl+C and SIGTERM; the latter is issued by Kubernetes to request a shutdown. signalChan := make(chan os.Signal, 1) @@ -535,6 +567,10 @@ func (a *Service) Start() { // cleanup cleans up resources used by the service. func (a *Service) cleanup() { + if a.sources == nil { + return + } + if err := a.sources.Close(); err != nil { a.logger.Error("failed to cleanly close data source", "firstErr", err.Error(), diff --git a/cmd/analyzer/analyzer_test.go b/cmd/analyzer/analyzer_migrations_test.go similarity index 80% rename from cmd/analyzer/analyzer_test.go rename to cmd/analyzer/analyzer_migrations_test.go index f0fbbfedd..6d9ed67d7 100644 --- a/cmd/analyzer/analyzer_test.go +++ b/cmd/analyzer/analyzer_migrations_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/oasisprotocol/nexus/cmd/analyzer" + cmdAnalyzer "github.com/oasisprotocol/nexus/cmd/analyzer" "github.com/oasisprotocol/nexus/storage/postgres/testutil" "github.com/oasisprotocol/nexus/tests" ) @@ -27,5 +27,5 @@ func TestMigrations(t *testing.T) { require.NoError(t, client.Wipe(ctx), "failed to wipe database") // Run migrations. - require.NoError(t, analyzer.RunMigrations(migrationsPath, os.Getenv("CI_TEST_CONN_STRING")), "failed to run migrations") + require.NoError(t, cmdAnalyzer.RunMigrations(migrationsPath, os.Getenv("CI_TEST_CONN_STRING")), "failed to run migrations") } diff --git a/cmd/analyzer/analyzer_sequencing_test.go b/cmd/analyzer/analyzer_sequencing_test.go new file mode 100644 index 000000000..8256e54ea --- /dev/null +++ b/cmd/analyzer/analyzer_sequencing_test.go @@ -0,0 +1,75 @@ +package analyzer + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/oasisprotocol/nexus/analyzer" + "github.com/oasisprotocol/nexus/log" +) + +// A trivial analyzer that runs for `duration`, then appends its `name` to `finishLog`. +type DummyAnalyzer struct { + name string + duration time.Duration + finishLog *[]string +} + +var _ analyzer.Analyzer = (*DummyAnalyzer)(nil) + +var finishLogLock = &sync.Mutex{} // for use by all DummyAnalyzer instances + +func (a *DummyAnalyzer) Start(ctx context.Context) { + time.Sleep(a.duration) + finishLogLock.Lock() + defer finishLogLock.Unlock() + *a.finishLog = append(*a.finishLog, a.name) +} + +func (a *DummyAnalyzer) Name() string { + return a.name +} + +func dummyAnalyzer(syncTag string, name string, duration time.Duration, finishLog *[]string) SyncedAnalyzer { + return SyncedAnalyzer{ + SyncTag: syncTag, + Analyzer: &DummyAnalyzer{ + name: name, + duration: duration, + finishLog: finishLog, + }, + } +} + +func TestSequencing(t *testing.T) { + // Log of analyzer completions. Each analyzer, when it completes, will append its names to this list. + finishLog := []string{} + // Fast analyzers: Tag "a" finishes after 1 second, tag "b" finishes after 3 seconds. + fastA := dummyAnalyzer("a", "fastA", 1*time.Second, &finishLog) + fastB1 := dummyAnalyzer("b", "fastB1", 500*time.Millisecond, &finishLog) + fastB2 := dummyAnalyzer("b", "fastB2", 3*time.Second, &finishLog) + // Slow analyzers + slowA := dummyAnalyzer("a", "slowA", 1*time.Second, &finishLog) + slowB := dummyAnalyzer("b", "slowB", 1*time.Second, &finishLog) + slowX := dummyAnalyzer("", "slowX", 0*time.Second, &finishLog) + + s := Service{ + fastSyncAnalyzers: []SyncedAnalyzer{fastA, fastB1, fastB2}, + analyzers: []SyncedAnalyzer{slowA, slowB, slowX}, + logger: log.NewDefaultLogger("analyzer"), + } + + s.Start() + require.Equal(t, []string{ + "slowX", // finishes immediately, at t=0s, because it depends on no fast analyzers + "fastB1", // finishes at t=0.5s + "fastA", // finishes at t=1s + "slowA", // finishes at t=2s because it waits for fastA (1s), then runs for 1s + "fastB2", // finishes at t=3s + "slowB", // finishes at t=4s because it waits for fastB1+fastB2 (3s), then runs for 1s + }, finishLog, "analyzers did not finish in the expected order") +}