Skip to content

Commit

Permalink
multi: add new method ChainArbitrator.RedispatchBlockbeat
Browse files Browse the repository at this point in the history
This commit adds a new method to enable us resending the blockbeat in
`ChainArbitrator`, which is needed for the channel restore as the chain
watcher and channel arbitrator are added after the start of the chain
arbitrator.
  • Loading branch information
yyforyongyu committed Nov 21, 2024
1 parent 8441f3e commit c8b27b2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
13 changes: 12 additions & 1 deletion chanrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
Expand Down Expand Up @@ -286,6 +287,9 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e

ltndLog.Infof("Informing chain watchers of new restored channels")

// Create a slice of channel points.
chanPoints := make([]wire.OutPoint, 0, len(channelShells))

// Finally, we'll need to inform the chain arbitrator of these new
// channels so we'll properly watch for their ultimate closure on chain
// and sweep them via the DLP.
Expand All @@ -294,8 +298,15 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e
if err != nil {
return err
}

chanPoints = append(
chanPoints, restoredChannel.Chan.FundingOutpoint,
)
}

// With all the channels restored, we'll now re-send the blockbeat.
c.chainArb.RedispatchBlockbeat(chanPoints)

return nil
}

Expand All @@ -314,7 +325,7 @@ func (s *server) ConnectPeer(nodePub *btcec.PublicKey, addrs []net.Addr) error {
// to ensure the new connection is created after this new link/channel
// is known.
if err := s.DisconnectPeer(nodePub); err != nil {
ltndLog.Infof("Peer(%v) is already connected, proceeding "+
ltndLog.Infof("Peer(%x) is already connected, proceeding "+
"with chan restore", nodePub.SerializeCompressed())
}

Expand Down
39 changes: 39 additions & 0 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1376,3 +1376,42 @@ func (c *ChainArbitrator) loadPendingCloseChannels() error {

return nil
}

// RedispatchBlockbeat resends the current blockbeat to the channels specified
// by the chanPoints. It is used when a channel is added to the chain
// arbitrator after it has been started, e.g., during the channel restore
// process.
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
// Get the current blockbeat.
beat := c.beat

// Prepare two sets of consumers.
channels := make([]chainio.Consumer, 0, len(chanPoints))
watchers := make([]chainio.Consumer, 0, len(chanPoints))

// Read the active channels in a lock.
c.Lock()
for _, op := range chanPoints {
if channel, ok := c.activeChannels[op]; ok {
channels = append(channels, channel)
}

if watcher, ok := c.activeWatchers[op]; ok {
watchers = append(watchers, watcher)
}
}
c.Unlock()

// Iterate all the copied watchers and send the blockbeat to them.
err := chainio.DispatchConcurrent(beat, watchers)
if err != nil {
log.Errorf("Notify blockbeat failed: %v", err)
}

// Iterate all the copied channels and send the blockbeat to them.
err = chainio.DispatchConcurrent(beat, channels)
if err != nil {
// Shutdown lnd if there's an error processing the block.
log.Errorf("Notify blockbeat failed: %v", err)
}
}

0 comments on commit c8b27b2

Please sign in to comment.