Skip to content

Commit

Permalink
support capacities in stream sync
Browse files Browse the repository at this point in the history
  • Loading branch information
GheisMohammadi committed Oct 24, 2024
1 parent bcf05fc commit 4e5cbc0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
3 changes: 2 additions & 1 deletion api/service/stagedstreamsync/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type syncProtocol interface {
RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string)
SubscribeAddStreamEvent(ch chan<- streammanager.EvtStreamAdded) event.Subscription
Streams() []sttypes.Stream
NumStreams() int
AvailableCapacity() int
}

type blockChain interface {
engine.ChainReader
Engine() engine.Engine
Expand Down
10 changes: 8 additions & 2 deletions api/service/stagedstreamsync/short_range_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ type srHelper struct {
func (sh *srHelper) getHashChain(ctx context.Context, bns []uint64) ([]common.Hash, []sttypes.StreamID, error) {
results := newBlockHashResults(bns)

concurrency := sh.config.Concurrency
availableCapacity := sh.syncProtocol.AvailableCapacity()
if concurrency > availableCapacity {
concurrency = availableCapacity
}

var wg sync.WaitGroup
wg.Add(sh.config.Concurrency)
wg.Add(concurrency)

for i := 0; i != sh.config.Concurrency; i++ {
for i := 0; i != concurrency; i++ {
go func(index int) {
defer wg.Done()

Expand Down

0 comments on commit 4e5cbc0

Please sign in to comment.