diff --git a/api/service/stagedstreamsync/adapter.go b/api/service/stagedstreamsync/adapter.go index 56c42b661c..41ab8f9569 100644 --- a/api/service/stagedstreamsync/adapter.go +++ b/api/service/stagedstreamsync/adapter.go @@ -29,9 +29,10 @@ type syncProtocol interface { RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream StreamFailed(stID sttypes.StreamID, reason string) SubscribeAddStreamEvent(ch chan<- streammanager.EvtStreamAdded) event.Subscription + Streams() []sttypes.Stream NumStreams() int + AvailableCapacity() int } - type blockChain interface { engine.ChainReader Engine() engine.Engine diff --git a/api/service/stagedstreamsync/short_range_helper.go b/api/service/stagedstreamsync/short_range_helper.go index 64ff694432..ad4b184f73 100644 --- a/api/service/stagedstreamsync/short_range_helper.go +++ b/api/service/stagedstreamsync/short_range_helper.go @@ -23,10 +23,16 @@ type srHelper struct { func (sh *srHelper) getHashChain(ctx context.Context, bns []uint64) ([]common.Hash, []sttypes.StreamID, error) { results := newBlockHashResults(bns) + concurrency := sh.config.Concurrency + availableCapacity := sh.syncProtocol.AvailableCapacity() + if concurrency > availableCapacity { + concurrency = availableCapacity + } + var wg sync.WaitGroup - wg.Add(sh.config.Concurrency) + wg.Add(concurrency) - for i := 0; i != sh.config.Concurrency; i++ { + for i := 0; i != concurrency; i++ { go func(index int) { defer wg.Done()