Skip to content

Commit

Permalink
chainio: use errgroup to limit num of goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Nov 12, 2024
1 parent a3bff00 commit 032866b
Showing 1 changed file with 30 additions and 40 deletions.
70 changes: 30 additions & 40 deletions chainio/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/chainntnfs"
"golang.org/x/sync/errgroup"
)

// DefaultProcessBlockTimeout is the timeout value used when waiting for one
Expand Down Expand Up @@ -170,41 +171,34 @@ func (b *BlockbeatDispatcher) dispatchBlocks(

// notifyQueues notifies each queue concurrently about the latest block epoch.
func (b *BlockbeatDispatcher) notifyQueues() error {
// errChans is a map of channels that will be used to receive errors
// returned from notifying the consumers.
errChans := make(map[uint32]chan error, len(b.consumerQueues))
eg := &errgroup.Group{}

// Notify each queue in goroutines.
for qid, consumers := range b.consumerQueues {
b.log().Debugf("Notifying queue=%d with %d consumers", qid,
len(consumers))

// Create a signal chan.
errChan := make(chan error, 1)
errChans[qid] = errChan

// Notify each queue concurrently.
go func(qid uint32, c []Consumer, beat Blockbeat) {
eg.Go(func() error {
// Notify each consumer in this queue sequentially.
errChan <- DispatchSequential(beat, c)
}(qid, consumers, b.beat)
}
err := DispatchSequential(b.beat, consumers)

// Wait for all consumers in each queue to finish.
for qid, errChan := range errChans {
select {
case err := <-errChan:
if err != nil {
return fmt.Errorf("queue=%d got err: %w", qid,
err)
// Exit early if there's no error.
if err == nil {
return nil
}

b.log().Debugf("Notified queue=%d", qid)
return fmt.Errorf("queue=%d got err: %w", qid, err)
})
}

case <-b.quit:
}
// Wait for all consumers in each queue to finish.
if err := eg.Wait(); err != nil {
return err
}

b.log().Debugf("Notified all queues")

return nil
}

Expand All @@ -228,36 +222,32 @@ func DispatchSequential(beat Blockbeat, consumers []Consumer) error {

// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
func DispatchConcurrent(beat Blockbeat, consumers []Consumer) error {
// errChans is a map of channels that will be used to receive errors
// returned from notifying the consumers.
errChans := make(map[string]chan error, len(consumers))
eg := &errgroup.Group{}

// Notify each queue in goroutines.
for _, c := range consumers {
// Create a signal chan.
errChan := make(chan error, 1)
errChans[c.Name()] = errChan

// Notify each consumer concurrently.
go func(c Consumer, beat Blockbeat) {
eg.Go(func() error {
b := beat.copy()

// Send the copy of the beat to the consumer.
errChan <- notifyAndWait(
b, c, DefaultProcessBlockTimeout,
)
}(c, beat)
}
err := notifyAndWait(b, c, DefaultProcessBlockTimeout)

// Exit early if there's no error.
if err == nil {
return nil
}

// Wait for all consumers in each queue to finish.
for name, errChan := range errChans {
err := <-errChan
if err != nil {
beat.logger().Errorf("Consumer=%v failed to process "+
"block: %v", name, err)
"block: %v", c.Name(), err)

return err
}
})
}

// Wait for all consumers in each queue to finish.
if err := eg.Wait(); err != nil {
return err
}

return nil
Expand Down

0 comments on commit 032866b

Please sign in to comment.