Skip to content

Commit

Permalink
naming, comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mitjat committed Nov 4, 2023
1 parent a8b719e commit 25b25c2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
42 changes: 23 additions & 19 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func wipeStorage(cfg *config.StorageConfig) error {

// Service is Oasis Nexus's analysis service.
type Service struct {
analyzers []SyncedA
fastSyncAnalyzers []SyncedA
analyzers []SyncedAnalyzer
fastSyncAnalyzers []SyncedAnalyzer

sources *sourceFactory
target storage.TargetStorage
Expand Down Expand Up @@ -225,31 +225,35 @@ func (s *sourceFactory) IPFS(_ context.Context) (ipfsclient.Client, error) {
return s.ipfs, nil
}

type (
A = analyzer.Analyzer
SyncedA struct {
Analyzer analyzer.Analyzer
SyncTag string
}
)
// Shorthand for use within this file.
type A = analyzer.Analyzer

// An Analyzer that is tagged with a `syncTag`.
// The `syncTag` is used for sequencing analyzers: For any non-empty tag, nexus will
// first run all fast-sync analyzers with that tag to completion, and only then start
// other analyzers with the same tag. The empty tag "" is special; it can be used
// by slow-sync analyzers that don't need to wait for any fast-sync analyzers to complete.
// This mechanism is a simple(ish) alternative to supporting a full-blown execution/dependency graph between analyzers.
type SyncedAnalyzer struct {
Analyzer analyzer.Analyzer
SyncTag string
}

// addAnalyzer adds the analyzer produced by `analyzerGenerator()` to `analyzers`.
// It expects an initial state (analyzers, errSoFar) and returns the updated state, which
// should be fed into subsequent call to the function.
// As soon as an analyzerGenerator returns an error, all subsequent calls will
// short-circuit and return the same error, leaving `analyzers` unchanged.
// The `syncTag` is used for sequencing analyzers: For any non-empty tag, nexus will
// first run all fast-sync analyzers with that tag to completion, and only then start
// other analyzers with the same tag.
func addAnalyzer(analyzers []SyncedA, errSoFar error, syncTag string, analyzerGenerator func() (A, error)) ([]SyncedA, error) {
// See `SyncedAnalyzer` for more info on `syncTag`.
func addAnalyzer(analyzers []SyncedAnalyzer, errSoFar error, syncTag string, analyzerGenerator func() (A, error)) ([]SyncedAnalyzer, error) {
if errSoFar != nil {
return analyzers, errSoFar
}
a, errSoFar := analyzerGenerator()
if errSoFar != nil {
return analyzers, errSoFar
}
analyzers = append(analyzers, SyncedA{Analyzer: a, SyncTag: syncTag})
analyzers = append(analyzers, SyncedAnalyzer{Analyzer: a, SyncTag: syncTag})
return analyzers, nil
}

Expand All @@ -276,7 +280,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
}

// Initialize fast-sync analyzers.
fastSyncAnalyzers := []SyncedA{}
fastSyncAnalyzers := []SyncedAnalyzer{}
if cfg.Analyzers.Consensus != nil {
if fastRange := cfg.Analyzers.Consensus.FastSyncRange(); fastRange != nil {
for i := 0; i < cfg.Analyzers.Consensus.FastSync.Parallelism; i++ {
Expand Down Expand Up @@ -314,7 +318,7 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
addFastSyncRuntimeAnalyzers(common.RuntimeCipher, cfg.Analyzers.Cipher)

// Initialize slow-sync analyzers.
analyzers := []SyncedA{}
analyzers := []SyncedAnalyzer{}
if cfg.Analyzers.Consensus != nil {
analyzers, err = addAnalyzer(analyzers, err, syncTagConsensus, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
Expand Down Expand Up @@ -499,7 +503,7 @@ func (a *Service) Start() {
fastSyncWg[an.SyncTag] = wg
}
wg.Add(1)
go func(an SyncedA) {
go func(an SyncedAnalyzer) {
defer wg.Done()
an.Analyzer.Start(ctx)
}(an)
Expand All @@ -509,7 +513,7 @@ func (a *Service) Start() {
var slowSyncWg sync.WaitGroup
for _, an := range a.analyzers {
slowSyncWg.Add(1)
go func(an SyncedA) {
go func(an SyncedAnalyzer) {
defer slowSyncWg.Done()

// Find the wait group for this analyzer's sync tag.
Expand Down Expand Up @@ -588,7 +592,7 @@ type ServiceTester struct {
Service
}

func (a *ServiceTester) SetAnalyzers(fastSyncAnalyzers []SyncedA, analyzers []SyncedA) {
func (a *ServiceTester) SetAnalyzers(fastSyncAnalyzers []SyncedAnalyzer, analyzers []SyncedAnalyzer) {
a.fastSyncAnalyzers = fastSyncAnalyzers
a.analyzers = analyzers
a.logger = log.NewDefaultLogger("analyzer")
Expand Down
8 changes: 4 additions & 4 deletions cmd/analyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (a *DummyAnalyzer) Name() string {
return a.name
}

func dummyAnalyzer(syncTag string, name string, duration time.Duration, finishLog *[]string) cmdAnalyzer.SyncedA {
return cmdAnalyzer.SyncedA{
func dummyAnalyzer(syncTag string, name string, duration time.Duration, finishLog *[]string) cmdAnalyzer.SyncedAnalyzer {
return cmdAnalyzer.SyncedAnalyzer{
SyncTag: syncTag,
Analyzer: &DummyAnalyzer{
name: name,
Expand All @@ -78,8 +78,8 @@ func TestSequencing(t *testing.T) {
slowB := dummyAnalyzer("b", "slowB", 1*time.Second, &finishLog)
slowX := dummyAnalyzer("", "slowX", 0*time.Second, &finishLog)
s.SetAnalyzers(
[]cmdAnalyzer.SyncedA{fastA, fastB1, fastB2},
[]cmdAnalyzer.SyncedA{slowA, slowB, slowX},
[]cmdAnalyzer.SyncedAnalyzer{fastA, fastB1, fastB2},
[]cmdAnalyzer.SyncedAnalyzer{slowA, slowB, slowX},
)

s.Start()
Expand Down

0 comments on commit 25b25c2

Please sign in to comment.