Skip to content

Commit

Permalink
contractcourt: fix linter funlen
Browse files Browse the repository at this point in the history
Refactor the `Start` method to fix the linter error:
```
contractcourt/chain_arbitrator.go:568: Function 'Start' is too long (242 > 200) (funlen)
```
  • Loading branch information
yyforyongyu committed Nov 12, 2024
1 parent a37be9c commit 255b40a
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 123 deletions.
270 changes: 147 additions & 123 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,137 +580,17 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {

// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
if err != nil {
if err := c.loadOpenChannels(); err != nil {
return err
}

if len(openChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v active channels",
len(openChannels))
}

// For each open channel, we'll configure then launch a corresponding
// ChannelArbitrator.
for _, channel := range openChannels {
chanPoint := channel.FundingOutpoint
channel := channel

// First, we'll create an active chainWatcher for this channel
// to ensure that we detect any relevant on chain events.
breachClosure := func(ret *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(chanPoint, ret)
}

chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: breachClosure,
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
},
)
if err != nil {
return err
}

c.activeWatchers[chanPoint] = chainWatcher
channelArb, err := newActiveChannelArbitrator(
channel, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
}

c.activeChannels[chanPoint] = channelArb

// Republish any closing transactions for this channel.
err = c.republishClosingTxs(channel)
if err != nil {
log.Errorf("Failed to republish closing txs for "+
"channel %v", chanPoint)
}
}

// In addition to the channels that we know to be open, we'll also
// launch arbitrators to finishing resolving any channels that are in
// the pending close state.
closingChannels, err := c.chanSource.ChannelStateDB().FetchClosedChannels(
true,
)
if err != nil {
if err := c.loadPendingCloseChannels(); err != nil {
return err
}

if len(closingChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v closing channels",
len(closingChannels))
}

// Next, for each channel is the closing state, we'll launch a
// corresponding more restricted resolver, as we don't have to watch
// the chain any longer, only resolve the contracts on the confirmed
// commitment.
//nolint:lll
for _, closeChanInfo := range closingChannels {
// We can leave off the CloseContract and ForceCloseChan
// methods as the channel is already closed at this point.
chanPoint := closeChanInfo.ChanPoint
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
ShortChanID: closeChanInfo.ShortChanID,
ChainArbitratorConfig: c.cfg,
ChainEvents: &ChainEventSubscription{},
IsPendingClose: true,
ClosingHeight: closeChanInfo.CloseHeight,
CloseType: closeChanInfo.CloseType,
PutResolverReport: func(tx kvdb.RwTx,
report *channeldb.ResolverReport) error {

return c.chanSource.PutResolverReport(
tx, c.cfg.ChainHash, &chanPoint, report,
)
},
FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
chanStateDB := c.chanSource.ChannelStateDB()
return chanStateDB.FetchHistoricalChannel(&chanPoint)
},
FindOutgoingHTLCDeadline: func(
htlc channeldb.HTLC) fn.Option[int32] {

return c.FindOutgoingHTLCDeadline(
closeChanInfo.ShortChanID, htlc,
)
},
}
chanLog, err := newBoltArbitratorLog(
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return err
}
arbCfg.MarkChannelResolved = func() error {
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(chanPoint)
}

return c.ResolveContract(chanPoint)
}

// We create an empty map of HTLC's here since it's possible
// that the channel is in StateDefault and updateActiveHTLCs is
// called. We want to avoid writing to an empty map. Since the
// channel is already in the process of being resolved, no new
// HTLCs will be added.
c.activeChannels[chanPoint] = NewChannelArbitrator(
arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
)
}

// Now, we'll start all chain watchers in parallel to shorten start up
// duration. In neutrino mode, this allows spend registrations to take
// advantage of batch spend reporting, instead of doing a single rescan
Expand Down Expand Up @@ -762,7 +642,7 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
// transaction.
var startStates map[wire.OutPoint]*chanArbStartState

err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
err := kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
for _, arbitrator := range c.activeChannels {
startState, err := arbitrator.getStartState(tx)
if err != nil {
Expand Down Expand Up @@ -1336,3 +1216,147 @@ func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
func (c *ChainArbitrator) Name() string {
return "ChainArbitrator"
}

// loadOpenChannels loads all channels that are currently open in the database
// and registers them with the chainWatcher for future notification.
func (c *ChainArbitrator) loadOpenChannels() error {
openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
if err != nil {
return err
}

if len(openChannels) == 0 {
return nil
}

log.Infof("Creating ChannelArbitrators for %v active channels",
len(openChannels))

// For each open channel, we'll configure then launch a corresponding
// ChannelArbitrator.
for _, channel := range openChannels {
chanPoint := channel.FundingOutpoint
channel := channel

// First, we'll create an active chainWatcher for this channel
// to ensure that we detect any relevant on chain events.
breachClosure := func(ret *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(chanPoint, ret)
}

chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: breachClosure,
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
},
)
if err != nil {
return err
}

c.activeWatchers[chanPoint] = chainWatcher
channelArb, err := newActiveChannelArbitrator(
channel, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
}

c.activeChannels[chanPoint] = channelArb

// Republish any closing transactions for this channel.
err = c.republishClosingTxs(channel)
if err != nil {
log.Errorf("Failed to republish closing txs for "+
"channel %v", chanPoint)
}
}

return nil
}

// loadPendingCloseChannels loads all channels that are currently pending
// closure in the database and registers them with the ChannelArbitrator to
// continue the resolution process.
func (c *ChainArbitrator) loadPendingCloseChannels() error {
chanStateDB := c.chanSource.ChannelStateDB()

closingChannels, err := chanStateDB.FetchClosedChannels(true)
if err != nil {
return err
}

if len(closingChannels) == 0 {
return nil
}

log.Infof("Creating ChannelArbitrators for %v closing channels",
len(closingChannels))

// Next, for each channel is the closing state, we'll launch a
// corresponding more restricted resolver, as we don't have to watch
// the chain any longer, only resolve the contracts on the confirmed
// commitment.
//nolint:lll
for _, closeChanInfo := range closingChannels {
// We can leave off the CloseContract and ForceCloseChan
// methods as the channel is already closed at this point.
chanPoint := closeChanInfo.ChanPoint
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
ShortChanID: closeChanInfo.ShortChanID,
ChainArbitratorConfig: c.cfg,
ChainEvents: &ChainEventSubscription{},
IsPendingClose: true,
ClosingHeight: closeChanInfo.CloseHeight,
CloseType: closeChanInfo.CloseType,
PutResolverReport: func(tx kvdb.RwTx,
report *channeldb.ResolverReport) error {

return c.chanSource.PutResolverReport(
tx, c.cfg.ChainHash, &chanPoint, report,
)
},
FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
return chanStateDB.FetchHistoricalChannel(&chanPoint)
},
FindOutgoingHTLCDeadline: func(
htlc channeldb.HTLC) fn.Option[int32] {

return c.FindOutgoingHTLCDeadline(
closeChanInfo.ShortChanID, htlc,
)
},
}
chanLog, err := newBoltArbitratorLog(
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return err
}
arbCfg.MarkChannelResolved = func() error {
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(chanPoint)
}

return c.ResolveContract(chanPoint)
}

// We create an empty map of HTLC's here since it's possible
// that the channel is in StateDefault and updateActiveHTLCs is
// called. We want to avoid writing to an empty map. Since the
// channel is already in the process of being resolved, no new
// HTLCs will be added.
c.activeChannels[chanPoint] = NewChannelArbitrator(
arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
)
}

return nil
}
4 changes: 4 additions & 0 deletions contractcourt/commit_sweep_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (c *commitSweepResolver) getCommitTxConfHeight() (uint32, error) {
// returned.
//
// NOTE: This function MUST be run as a goroutine.

// TODO(yy): fix the funlen in the next PR.
//
//nolint:funlen
func (c *commitSweepResolver) Resolve() (ContractResolver, error) {
// If we're already resolved, then we can exit early.
if c.resolved {
Expand Down

0 comments on commit 255b40a

Please sign in to comment.