Skip to content

Commit

Permalink
lnd: start blockbeatDispatcher and register consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
yyforyongyu committed Nov 12, 2024
1 parent b8625eb commit a37be9c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
2 changes: 2 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
Expand Down Expand Up @@ -192,6 +193,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
AddSubLogger(
root, blindedpath.Subsystem, interceptor, blindedpath.UseLogger,
)
AddSubLogger(root, chainio.Subsystem, interceptor, chainio.UseLogger)
}

// AddSubLogger is a helper method to conveniently create and register the
Expand Down
43 changes: 43 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ type server struct {
// txPublisher is a publisher with fee-bumping capability.
txPublisher *sweep.TxPublisher

// blockbeatDispatcher is a block dispatcher that notifies subscribers
// of new blocks.
blockbeatDispatcher *chainio.BlockbeatDispatcher

quit chan struct{}

wg sync.WaitGroup
Expand Down Expand Up @@ -605,6 +609,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
readPool: readPool,
chansToRestore: chansToRestore,

blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
cc.ChainNotifier,
),
channelNotifier: channelnotifier.New(
dbs.ChanStateDB.ChannelStateDB(),
),
Expand Down Expand Up @@ -1791,6 +1798,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
s.connMgr = cmgr

// Finally, register the subsystems in blockbeat.
s.registerBlockConsumers()

return s, nil
}

Expand Down Expand Up @@ -1823,6 +1833,25 @@ func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
routerCfg.MaxMcHistory = cfg.MaxMcHistory
}

// registerBlockConsumers registers the subsystems that consume block events.
// By calling `RegisterQueue`, a list of subsystems are registered in the
// blockbeat for block notifications. When a new block arrives, the subsystems
// in the same queue are notified sequentially, and different queues are
// notified concurrently.
//
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
// a new `RegisterQueue` call.
func (s *server) registerBlockConsumers() {
// In this queue, when a new block arrives, it will be received and
// processed in this order: chainArb -> sweeper -> txPublisher.
consumers := []chainio.Consumer{
s.chainArb,
s.sweeper,
s.txPublisher,
}
s.blockbeatDispatcher.RegisterQueue(consumers)
}

// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
// used for option_scid_alias channels where the ChannelUpdate to be sent back
// may differ from what is on disk.
Expand Down Expand Up @@ -2460,6 +2489,17 @@ func (s *server) Start() error {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}

// Start the blockbeat after all other subsystems have been
// started so they are ready to receive new blocks.
cleanup = cleanup.add(func() error {
s.blockbeatDispatcher.Stop()
return nil
})
if err := s.blockbeatDispatcher.Start(); err != nil {
startErr = err
return
}

// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&s.active, 1)
Expand All @@ -2484,6 +2524,9 @@ func (s *server) Stop() error {
// Shutdown connMgr first to prevent conns during shutdown.
s.connMgr.Stop()

// Stop dispatching blocks to other systems immediately.
s.blockbeatDispatcher.Stop()

// Shutdown the wallet, funding manager, and the rpc server.
if err := s.chanStatusMgr.Stop(); err != nil {
srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
Expand Down

0 comments on commit a37be9c

Please sign in to comment.