diff --git a/build/version.go b/build/version.go index 632ebafce4..c3db5dc703 100644 --- a/build/version.go +++ b/build/version.go @@ -122,11 +122,5 @@ func WithBuildInfo(ctx context.Context, cfg *LogConfig) (context.Context, return nil, fmt.Errorf("unable to decode commit hash: %w", err) } - // Include the first 3 bytes of the commit hash in the context as an - // slog attribute. - if len(commitHash) > 3 { - commitHash = commitHash[:3] - } - - return btclog.WithCtx(ctx, btclog.Hex("rev", commitHash)), nil + return btclog.WithCtx(ctx, btclog.Hex3("rev", commitHash)), nil } diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 0b576fc7de..8dc8b4e25e 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -226,6 +226,9 @@ The underlying functionality between those two options remain the same. [GoroutineManager](https://github.com/lightningnetwork/lnd/pull/9141) so that its constructor does not take a context. +* [Update protofsm StateMachine] to use the new GoroutineManager API along with + structured logging. + ## Tooling and Documentation * [Improved `lncli create` command help text](https://github.com/lightningnetwork/lnd/pull/9077) diff --git a/go.mod b/go.mod index 36766d0d81..bebbf447c9 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb github.com/lightningnetwork/lnd/cert v1.2.2 github.com/lightningnetwork/lnd/clock v1.1.1 - github.com/lightningnetwork/lnd/fn/v2 v2.0.2 + github.com/lightningnetwork/lnd/fn/v2 v2.0.5 github.com/lightningnetwork/lnd/healthcheck v1.2.6 github.com/lightningnetwork/lnd/kvdb v1.4.11 github.com/lightningnetwork/lnd/queue v1.1.1 @@ -213,3 +213,5 @@ replace google.golang.org/protobuf => github.com/lightninglabs/protobuf-go-hex-d go 1.22.6 retract v0.0.2 + +replace github.com/btcsuite/btclog/v2 => github.com/ellemouton/btclog/v2 v2.0.0-20241210110018-997ee6596623 diff --git a/go.sum b/go.sum index 69c57c20e8..2d20938076 100644 --- a/go.sum +++ b/go.sum @@ -92,8 +92,6 @@ github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtyd github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c h1:4HxD1lBUGUddhzgaNgrCPsFWd7cGYNpeFUgd9ZIgyM0= github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c/go.mod h1:w7xnGOhwT3lmrS4H3b/D1XAXxvh+tbhUm8xeHN2y3TQ= -github.com/btcsuite/btclog/v2 v2.0.0 h1:ZfOBItEeLWfU0voi88K72j8vtxP4/dHhxRFf2bxZkVo= -github.com/btcsuite/btclog/v2 v2.0.0/go.mod h1:XItGUfVOxotJL8kkuk2Hj3EVow5KCugXl3wWfQ6K0AE= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcwallet v0.16.10-0.20241113134707-b4ff60753aaa h1:x7vYpwkPL5zeJEWPPaRunybH9ERRMGWeNf7x/0aU/38= github.com/btcsuite/btcwallet v0.16.10-0.20241113134707-b4ff60753aaa/go.mod h1:1HJXYbjJzgumlnxOC2+ViR1U+gnHWoOn7WeK5OfY1eU= @@ -188,6 +186,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/ellemouton/btclog/v2 v2.0.0-20241210110018-997ee6596623 h1:qcTPGQ0m2YL+GkptbgDnICKcNYGfVy0Oy9WTnC9BosA= +github.com/ellemouton/btclog/v2 v2.0.0-20241210110018-997ee6596623/go.mod h1:XItGUfVOxotJL8kkuk2Hj3EVow5KCugXl3wWfQ6K0AE= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U= github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0= github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ= -github.com/lightningnetwork/lnd/fn/v2 v2.0.2 h1:M7o2lYrh/zCp+lntPB3WP/rWTu5U+4ssyHW+kqNJ0fs= -github.com/lightningnetwork/lnd/fn/v2 v2.0.2/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= +github.com/lightningnetwork/lnd/fn/v2 v2.0.5 h1:2IrZ9r+HIKNYTryZWEssqlwGmPPZMJsQChyn/Wm68sI= +github.com/lightningnetwork/lnd/fn/v2 v2.0.5/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= github.com/lightningnetwork/lnd/kvdb v1.4.11 h1:fk1HMVFrsVK3xqU7q+JWHRgBltw/a2qIg1E3zazMb/8= diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 6e67e87def..b42a8bbc6e 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -2,6 +2,7 @@ package htlcswitch import ( "bytes" + "context" crand "crypto/rand" "crypto/sha256" "errors" @@ -408,10 +409,10 @@ type channelLink struct { // the result. quiescenceReqs chan StfuReq - // ContextGuard is a helper that encapsulates a wait group and quit - // channel and allows contexts that either block or cancel on those - // depending on the use case. - *fn.ContextGuard + // cg is a helper that encapsulates a wait group and quit channel and + // allows contexts that either block or cancel on those depending on + // the use case. + cg *fn.ContextGuard } // hookMap is a data structure that is used to track the hooks that need to be @@ -517,7 +518,7 @@ func NewChannelLink(cfg ChannelLinkConfig, incomingCommitHooks: newHookMap(), quiescer: qsm, quiescenceReqs: quiescenceReqs, - ContextGuard: fn.NewContextGuard(), + cg: fn.NewContextGuard(), } } @@ -596,8 +597,8 @@ func (l *channelLink) Start() error { l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout()) - l.Wg.Add(1) - go l.htlcManager() + l.cg.WgAdd(1) + go l.htlcManager(context.TODO()) return nil } @@ -636,8 +637,8 @@ func (l *channelLink) Stop() { l.hodlQueue.Stop() } - close(l.Quit) - l.Wg.Wait() + l.cg.Quit() + l.cg.WgWait() // Now that the htlcManager has completely exited, reset the packet // courier. This allows the mailbox to revaluate any lingering Adds that @@ -662,7 +663,7 @@ func (l *channelLink) Stop() { // WaitForShutdown blocks until the link finishes shutting down, which includes // termination of all dependent goroutines. func (l *channelLink) WaitForShutdown() { - l.Wg.Wait() + l.cg.WgWait() } // EligibleToForward returns a bool indicating if the channel is able to @@ -740,7 +741,7 @@ func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool { func (l *channelLink) OnFlushedOnce(hook func()) { select { case l.flushHooks.newTransients <- hook: - case <-l.Quit: + case <-l.cg.Done(): } } @@ -759,7 +760,7 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) { select { case queue <- hook: - case <-l.Quit: + case <-l.cg.Done(): } } @@ -777,7 +778,7 @@ func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] { select { case l.quiescenceReqs <- req: - case <-l.Quit: + case <-l.cg.Done(): req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown)) } @@ -887,7 +888,7 @@ func (l *channelLink) createFailureWithUpdate(incoming bool, // This method is to be called upon reconnection after the initial funding // flow. We'll compare out commitment chains with the remote party, and re-send // either a danging commit signature, a revocation, or both. -func (l *channelLink) syncChanStates() error { +func (l *channelLink) syncChanStates(ctx context.Context) error { chanState := l.channel.State() l.log.Infof("Attempting to re-synchronize channel: %v", chanState) @@ -989,7 +990,7 @@ func (l *channelLink) syncChanStates() error { // We've just received a ChanSync message from the remote // party, so we'll process the message in order to determine // if we need to re-transmit any messages to the remote party. - ctx, cancel := l.WithCtxQuitNoTimeout() + ctx, cancel := l.cg.Create(ctx) defer cancel() msgsToReSend, openedCircuits, closedCircuits, err = l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg) @@ -1021,7 +1022,7 @@ func (l *channelLink) syncChanStates() error { l.cfg.Peer.SendMessage(false, msg) } - case <-l.Quit: + case <-l.cg.Done(): return ErrLinkShuttingDown } @@ -1033,7 +1034,7 @@ func (l *channelLink) syncChanStates() error { // we previously received are reinstated in memory, and forwarded to the switch // if necessary. After a restart, this will also delete any previously // completed packages. -func (l *channelLink) resolveFwdPkgs() error { +func (l *channelLink) resolveFwdPkgs(ctx context.Context) error { fwdPkgs, err := l.channel.LoadFwdPkgs() if err != nil { return err @@ -1050,7 +1051,7 @@ func (l *channelLink) resolveFwdPkgs() error { // If any of our reprocessing steps require an update to the commitment // txn, we initiate a state transition to capture all relevant changes. if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 { - return l.updateCommitTx() + return l.updateCommitTx(ctx) } return nil @@ -1111,7 +1112,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { // // NOTE: This MUST be run as a goroutine. func (l *channelLink) fwdPkgGarbager() { - defer l.Wg.Done() + defer l.cg.WgDone() l.cfg.FwdPkgGCTicker.Resume() defer l.cfg.FwdPkgGCTicker.Stop() @@ -1128,7 +1129,7 @@ func (l *channelLink) fwdPkgGarbager() { err) continue } - case <-l.Quit: + case <-l.cg.Done(): return } } @@ -1248,10 +1249,10 @@ func (l *channelLink) handleChanSyncErr(err error) { // and also the htlc trickle queue+timer for this active channels. // // NOTE: This MUST be run as a goroutine. -func (l *channelLink) htlcManager() { +func (l *channelLink) htlcManager(ctx context.Context) { defer func() { l.cfg.BatchTicker.Stop() - l.Wg.Done() + l.cg.WgDone() l.log.Infof("exited") }() @@ -1271,7 +1272,7 @@ func (l *channelLink) htlcManager() { // re-synchronize state with the remote peer. settledHtlcs is a map of // HTLC's that we re-settled as part of the channel state sync. if l.cfg.SyncStates { - err := l.syncChanStates() + err := l.syncChanStates(ctx) if err != nil { l.handleChanSyncErr(err) return @@ -1322,7 +1323,7 @@ func (l *channelLink) htlcManager() { // the channel is not pending, otherwise we should have no htlcs to // reforward. if l.ShortChanID() != hop.Source { - err := l.resolveFwdPkgs() + err := l.resolveFwdPkgs(ctx) switch err { // No error was encountered, success. case nil: @@ -1345,7 +1346,7 @@ func (l *channelLink) htlcManager() { // With our link's in-memory state fully reconstructed, spawn a // goroutine to manage the reclamation of disk space occupied by // completed forwarding packages. - l.Wg.Add(1) + l.cg.WgAdd(1) go l.fwdPkgGarbager() } @@ -1447,7 +1448,8 @@ func (l *channelLink) htlcManager() { // If we do, then we'll send a new UpdateFee message to // the remote party, to be locked in with a new update. - if err := l.updateChannelFee(newCommitFee); err != nil { + err = l.updateChannelFee(ctx, newCommitFee) + if err != nil { l.log.Errorf("unable to update fee rate: %v", err) continue @@ -1475,7 +1477,7 @@ func (l *channelLink) htlcManager() { // including all the currently pending entries. If the // send was unsuccessful, then abandon the update, // waiting for the revocation window to open up. - if !l.updateCommitTxOrFail() { + if !l.updateCommitTxOrFail(ctx) { return } @@ -1493,19 +1495,19 @@ func (l *channelLink) htlcManager() { // that the link is an intermediate hop in a multi-hop HTLC // circuit. case pkt := <-l.downstream: - l.handleDownstreamPkt(pkt) + l.handleDownstreamPkt(ctx, pkt) // A message from the connected peer was just received. This // indicates that we have a new incoming HTLC, either directly // for us, or part of a multi-hop HTLC circuit. case msg := <-l.upstream: - l.handleUpstreamMsg(msg) + l.handleUpstreamMsg(ctx, msg) // A htlc resolution is received. This means that we now have a // resolution for a previously accepted htlc. case hodlItem := <-l.hodlQueue.ChanOut(): htlcResolution := hodlItem.(invoices.HtlcResolution) - err := l.processHodlQueue(htlcResolution) + err := l.processHodlQueue(ctx, htlcResolution) switch err { // No error, success. case nil: @@ -1543,7 +1545,7 @@ func (l *channelLink) htlcManager() { } } - case <-l.Quit: + case <-l.cg.Done(): return } } @@ -1552,7 +1554,7 @@ func (l *channelLink) htlcManager() { // processHodlQueue processes a received htlc resolution and continues reading // from the hodl queue until no more resolutions remain. When this function // returns without an error, the commit tx should be updated. -func (l *channelLink) processHodlQueue( +func (l *channelLink) processHodlQueue(ctx context.Context, firstResolution invoices.HtlcResolution) error { // Try to read all waiting resolution messages, so that they can all be @@ -1584,7 +1586,7 @@ loop: } // Update the commitment tx. - if err := l.updateCommitTx(); err != nil { + if err := l.updateCommitTx(ctx); err != nil { return err } @@ -1671,7 +1673,9 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration { // handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the // downstream HTLC Switch. -func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { +func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context, + pkt *htlcPacket) error { + htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) if !ok { return errors.New("not an UpdateAddHTLC packet") @@ -1775,7 +1779,7 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { getEventType(pkt), ) - l.tryBatchUpdateCommitTx() + l.tryBatchUpdateCommitTx(ctx) return nil } @@ -1786,7 +1790,9 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { // cleared HTLCs with the upstream peer. // // TODO(roasbeef): add sync ntfn to ensure switch always has consistent view? -func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { +func (l *channelLink) handleDownstreamPkt(ctx context.Context, + pkt *htlcPacket) { + if pkt.htlc.MsgType().IsChannelUpdate() && !l.quiescer.CanSendUpdates() { @@ -1800,7 +1806,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { case *lnwire.UpdateAddHTLC: // Handle add message. The returned error can be ignored, // because it is also sent through the mailbox. - _ = l.handleDownstreamUpdateAdd(pkt) + _ = l.handleDownstreamUpdateAdd(ctx, pkt) case *lnwire.UpdateFulfillHTLC: // If hodl.SettleOutgoing mode is active, we exit early to @@ -1867,7 +1873,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { ) // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail() + l.updateCommitTxOrFail(ctx) case *lnwire.UpdateFailHTLC: // If hodl.FailOutgoing mode is active, we exit early to @@ -1957,19 +1963,19 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { } // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail() + l.updateCommitTxOrFail(ctx) } } // tryBatchUpdateCommitTx updates the commitment transaction if the batch is // full. -func (l *channelLink) tryBatchUpdateCommitTx() { +func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) { pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) if pending < uint64(l.cfg.BatchSize) { return } - l.updateCommitTxOrFail() + l.updateCommitTxOrFail(ctx) } // cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef @@ -2039,7 +2045,7 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { // handleUpstreamMsg processes wire messages related to commitment state // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. -func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { +func (l *channelLink) handleUpstreamMsg(ctx context.Context, msg lnwire.Message) { // First check if the message is an update and we are capable of // receiving updates right now. if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() { @@ -2418,7 +2424,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } select { - case <-l.Quit: + case <-l.cg.Done(): return default: } @@ -2430,7 +2436,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // reply with a signature as both sides already have a // commitment with the latest accepted. if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail() { + if !l.updateCommitTxOrFail(ctx) { return } } @@ -2488,7 +2494,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } select { - case <-l.Quit: + case <-l.cg.Done(): return default: } @@ -2542,7 +2548,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // but there are still remote updates that are not in the remote // commit tx yet, send out an update. if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail() { + if !l.updateCommitTxOrFail(ctx) { return } } @@ -2732,8 +2738,8 @@ func (l *channelLink) ackDownStreamPackets() error { // updateCommitTxOrFail updates the commitment tx and if that fails, it fails // the link. -func (l *channelLink) updateCommitTxOrFail() bool { - err := l.updateCommitTx() +func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool { + err := l.updateCommitTx(ctx) switch err { // No error encountered, success. case nil: @@ -2759,7 +2765,7 @@ func (l *channelLink) updateCommitTxOrFail() bool { // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point. -func (l *channelLink) updateCommitTx() error { +func (l *channelLink) updateCommitTx(ctx context.Context) error { // Preemptively write all pending keystones to disk, just in case the // HTLCs we have in memory are included in the subsequent attempt to // sign a commitment state. @@ -2782,7 +2788,7 @@ func (l *channelLink) updateCommitTx() error { return nil } - ctx, done := l.WithCtxQuitNoTimeout() + ctx, done := l.cg.Create(ctx) defer done() newCommit, err := l.channel.SignNextCommitment(ctx) @@ -2822,7 +2828,7 @@ func (l *channelLink) updateCommitTx() error { } select { - case <-l.Quit: + case <-l.cg.Done(): return ErrLinkShuttingDown default: } @@ -3529,7 +3535,7 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error { // NOTE: Part of the ChannelLink interface. func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { select { - case <-l.Quit: + case <-l.cg.Done(): // Return early if the link is already in the process of // quitting. It doesn't make sense to hand the message to the // mailbox here. @@ -3545,7 +3551,9 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { // updateChannelFee updates the commitment fee-per-kw on this channel by // committing to an update_fee message. -func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error { +func (l *channelLink) updateChannelFee(ctx context.Context, + feePerKw chainfee.SatPerKWeight) error { + l.log.Infof("updating commit fee to %v", feePerKw) // We skip sending the UpdateFee message if the channel is not @@ -3583,7 +3591,8 @@ func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error { if err := l.cfg.Peer.SendMessage(false, msg); err != nil { return err } - return l.updateCommitTx() + + return l.updateCommitTx(ctx) } // processRemoteSettleFails accepts a batch of settle/fail payment descriptors @@ -4290,7 +4299,7 @@ func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) { filteredPkts = append(filteredPkts, pkt) } - err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...) + err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...) if err != nil { log.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 1747105597..cdeae6d812 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -2257,7 +2257,7 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.cg.Done(): close(doneChan) return } @@ -2326,7 +2326,7 @@ func handleStateUpdate(link *channelLink, } link.HandleChannelUpdate(remoteRev) - ctx, done := link.WithCtxQuitNoTimeout() + ctx, done := link.cg.Create(context.Background()) defer done() remoteSigs, err := remoteChannel.SignNextCommitment(ctx) @@ -2372,7 +2372,7 @@ func updateState(batchTick chan time.Time, link *channelLink, // Trigger update by ticking the batchTicker. select { case batchTick <- time.Now(): - case <-link.Quit: + case <-link.cg.Done(): return fmt.Errorf("link shutting down") } return handleStateUpdate(link, remoteChannel) @@ -2380,7 +2380,7 @@ func updateState(batchTick chan time.Time, link *channelLink, // The remote is triggering the state update, emulate this by // signing and sending CommitSig to the link. - ctx, done := link.WithCtxQuitNoTimeout() + ctx, done := link.cg.Create(context.Background()) defer done() remoteSigs, err := remoteChannel.SignNextCommitment(ctx) @@ -4946,7 +4946,7 @@ func (h *persistentLinkHarness) restartLink( for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.cg.Done(): close(doneChan) return } @@ -5932,7 +5932,9 @@ func TestChannelLinkFail(t *testing.T) { // Sign a commitment that will include // signature for the HTLC just sent. - quitCtx, done := c.WithCtxQuitNoTimeout() + quitCtx, done := c.cg.Create( + context.Background(), + ) defer done() sigs, err := remoteChannel.SignNextCommitment( @@ -5979,7 +5981,9 @@ func TestChannelLinkFail(t *testing.T) { // Sign a commitment that will include // signature for the HTLC just sent. - quitCtx, done := c.WithCtxQuitNoTimeout() + quitCtx, done := c.cg.Create( + context.Background(), + ) defer done() sigs, err := remoteChannel.SignNextCommitment( diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 2123465884..43902dc336 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1190,7 +1190,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.cg.Done(): close(doneChan) return } diff --git a/lnwallet/chancloser/rbf_coop_test.go b/lnwallet/chancloser/rbf_coop_test.go index 07bc32e3dd..17d783eae6 100644 --- a/lnwallet/chancloser/rbf_coop_test.go +++ b/lnwallet/chancloser/rbf_coop_test.go @@ -2,6 +2,7 @@ package chancloser import ( "bytes" + "context" "encoding/hex" "errors" "fmt" @@ -144,7 +145,9 @@ func assertUnknownEventFail(t *testing.T, startingState ProtocolState) { closeHarness.expectFailure(ErrInvalidStateTransition) - closeHarness.chanCloser.SendEvent(&unknownEvent{}) + closeHarness.chanCloser.SendEvent( + context.Background(), &unknownEvent{}, + ) // There should be no further state transitions. closeHarness.assertNoStateTransitions() @@ -481,6 +484,7 @@ func (r *rbfCloserTestHarness) expectHalfSignerIteration( initEvent ProtocolEvent, balanceAfterClose, absoluteFee btcutil.Amount, dustExpect dustExpectation) { + ctx := context.Background() numFeeCalls := 2 // If we're using the SendOfferEvent as a trigger, we only need to call @@ -527,7 +531,7 @@ func (r *rbfCloserTestHarness) expectHalfSignerIteration( }) r.expectMsgSent(msgExpect) - r.chanCloser.SendEvent(initEvent) + r.chanCloser.SendEvent(ctx, initEvent) // Based on the init event, we'll either just go to the closing // negotiation state, or go through the channel flushing state first. @@ -582,6 +586,8 @@ func (r *rbfCloserTestHarness) assertSingleRbfIteration( initEvent ProtocolEvent, balanceAfterClose, absoluteFee btcutil.Amount, dustExpect dustExpectation) { + ctx := context.Background() + // We'll now send in the send offer event, which should trigger 1/2 of // the RBF loop, ending us in the LocalOfferSent state. r.expectHalfSignerIteration( @@ -607,7 +613,7 @@ func (r *rbfCloserTestHarness) assertSingleRbfIteration( balanceAfterClose, true, ) - r.chanCloser.SendEvent(localSigEvent) + r.chanCloser.SendEvent(ctx, localSigEvent) // We should transition to the pending closing state now. r.assertLocalClosePending() @@ -617,6 +623,8 @@ func (r *rbfCloserTestHarness) assertSingleRemoteRbfIteration( initEvent ProtocolEvent, balanceAfterClose, absoluteFee btcutil.Amount, sequence uint32, iteration bool) { + ctx := context.Background() + // If this is an iteration, then we expect some intermediate states, // before we enter the main RBF/sign loop. if iteration { @@ -635,7 +643,7 @@ func (r *rbfCloserTestHarness) assertSingleRemoteRbfIteration( absoluteFee, balanceAfterClose, false, ) - r.chanCloser.SendEvent(initEvent) + r.chanCloser.SendEvent(ctx, initEvent) // Our outer state should transition to ClosingNegotiation state. r.assertStateTransitions(&ClosingNegotiation{}) @@ -668,6 +676,8 @@ func assertStateT[T ProtocolState](h *rbfCloserTestHarness) T { func newRbfCloserTestHarness(t *testing.T, cfg *harnessCfg) *rbfCloserTestHarness { + ctx := context.Background() + startingHeight := 200 chanPoint := randOutPoint(t) @@ -747,7 +757,7 @@ func newRbfCloserTestHarness(t *testing.T, ).Return(nil) chanCloser := protofsm.NewStateMachine(protoCfg) - chanCloser.Start() + chanCloser.Start(ctx) harness.stateSub = chanCloser.RegisterStateEvents() @@ -769,6 +779,7 @@ func newCloser(t *testing.T, cfg *harnessCfg) *rbfCloserTestHarness { // TestRbfChannelActiveTransitions tests the transitions of from the // ChannelActive state. func TestRbfChannelActiveTransitions(t *testing.T) { + ctx := context.Background() localAddr := lnwire.DeliveryAddress(bytes.Repeat([]byte{0x01}, 20)) remoteAddr := lnwire.DeliveryAddress(bytes.Repeat([]byte{0x02}, 20)) @@ -782,7 +793,7 @@ func TestRbfChannelActiveTransitions(t *testing.T) { }) defer closeHarness.stopAndAssert() - closeHarness.chanCloser.SendEvent(&SpendEvent{}) + closeHarness.chanCloser.SendEvent(ctx, &SpendEvent{}) closeHarness.assertStateTransitions(&CloseFin{}) }) @@ -799,7 +810,7 @@ func TestRbfChannelActiveTransitions(t *testing.T) { // We don't specify an upfront shutdown addr, and don't specify // on here in the vent, so we should call new addr, but then // fail. - closeHarness.chanCloser.SendEvent(&SendShutdown{}) + closeHarness.chanCloser.SendEvent(ctx, &SendShutdown{}) // We shouldn't have transitioned to a new state. closeHarness.assertNoStateTransitions() @@ -824,9 +835,9 @@ func TestRbfChannelActiveTransitions(t *testing.T) { // If we send the shutdown event, we should transition to the // shutdown pending state. - closeHarness.chanCloser.SendEvent(&SendShutdown{ - IdealFeeRate: feeRate, - }) + closeHarness.chanCloser.SendEvent( + ctx, &SendShutdown{IdealFeeRate: feeRate}, + ) closeHarness.assertStateTransitions(&ShutdownPending{}) // If we examine the internal state, it should be consistent @@ -869,9 +880,9 @@ func TestRbfChannelActiveTransitions(t *testing.T) { // Next, we'll emit the recv event, with the addr of the remote // party. - closeHarness.chanCloser.SendEvent(&ShutdownReceived{ - ShutdownScript: remoteAddr, - }) + closeHarness.chanCloser.SendEvent( + ctx, &ShutdownReceived{ShutdownScript: remoteAddr}, + ) // We should transition to the shutdown pending state. closeHarness.assertStateTransitions(&ShutdownPending{}) @@ -899,6 +910,7 @@ func TestRbfChannelActiveTransitions(t *testing.T) { // shutdown ourselves. func TestRbfShutdownPendingTransitions(t *testing.T) { t.Parallel() + ctx := context.Background() startingState := &ShutdownPending{} @@ -913,7 +925,7 @@ func TestRbfShutdownPendingTransitions(t *testing.T) { }) defer closeHarness.stopAndAssert() - closeHarness.chanCloser.SendEvent(&SpendEvent{}) + closeHarness.chanCloser.SendEvent(ctx, &SpendEvent{}) closeHarness.assertStateTransitions(&CloseFin{}) }) @@ -936,7 +948,7 @@ func TestRbfShutdownPendingTransitions(t *testing.T) { // We'll now send in a ShutdownReceived event, but with a // different address provided in the shutdown message. This // should result in an error. - closeHarness.chanCloser.SendEvent(&ShutdownReceived{ + closeHarness.chanCloser.SendEvent(ctx, &ShutdownReceived{ ShutdownScript: localAddr, }) @@ -972,9 +984,9 @@ func TestRbfShutdownPendingTransitions(t *testing.T) { // We'll send in a shutdown received event, with the expected // co-op close addr. - closeHarness.chanCloser.SendEvent(&ShutdownReceived{ - ShutdownScript: remoteAddr, - }) + closeHarness.chanCloser.SendEvent( + ctx, &ShutdownReceived{ShutdownScript: remoteAddr}, + ) // We should transition to the channel flushing state. closeHarness.assertStateTransitions(&ChannelFlushing{}) @@ -1015,7 +1027,7 @@ func TestRbfShutdownPendingTransitions(t *testing.T) { closeHarness.expectFinalBalances(fn.None[ShutdownBalances]()) // We'll send in a shutdown received event. - closeHarness.chanCloser.SendEvent(&ShutdownComplete{}) + closeHarness.chanCloser.SendEvent(ctx, &ShutdownComplete{}) // We should transition to the channel flushing state. closeHarness.assertStateTransitions(&ChannelFlushing{}) @@ -1030,6 +1042,7 @@ func TestRbfShutdownPendingTransitions(t *testing.T) { // transition to the negotiation state. func TestRbfChannelFlushingTransitions(t *testing.T) { t.Parallel() + ctx := context.Background() localBalance := lnwire.NewMSatFromSatoshis(10_000) remoteBalance := lnwire.NewMSatFromSatoshis(50_000) @@ -1082,7 +1095,9 @@ func TestRbfChannelFlushingTransitions(t *testing.T) { // We'll now send in the event which should trigger // this code path. - closeHarness.chanCloser.SendEvent(&chanFlushedEvent) + closeHarness.chanCloser.SendEvent( + ctx, &chanFlushedEvent, + ) // With the event sent, we should now transition // straight to the ClosingNegotiation state, with no @@ -1149,6 +1164,7 @@ func TestRbfChannelFlushingTransitions(t *testing.T) { // rate. func TestRbfCloseClosingNegotiationLocal(t *testing.T) { t.Parallel() + ctx := context.Background() localBalance := lnwire.NewMSatFromSatoshis(40_000) remoteBalance := lnwire.NewMSatFromSatoshis(50_000) @@ -1232,7 +1248,7 @@ func TestRbfCloseClosingNegotiationLocal(t *testing.T) { // We should fail as the remote party sent us more than one // signature. - closeHarness.chanCloser.SendEvent(localSigEvent) + closeHarness.chanCloser.SendEvent(ctx, localSigEvent) }) // Next, we'll verify that if the balance of the remote party is dust, @@ -1333,7 +1349,7 @@ func TestRbfCloseClosingNegotiationLocal(t *testing.T) { singleMsgMatcher[*lnwire.Shutdown](nil), ) - closeHarness.chanCloser.SendEvent(sendShutdown) + closeHarness.chanCloser.SendEvent(ctx, sendShutdown) // We should first transition to the Channel Active state // momentarily, before transitioning to the shutdown pending @@ -1367,6 +1383,7 @@ func TestRbfCloseClosingNegotiationLocal(t *testing.T) { // party. func TestRbfCloseClosingNegotiationRemote(t *testing.T) { t.Parallel() + ctx := context.Background() localBalance := lnwire.NewMSatFromSatoshis(40_000) remoteBalance := lnwire.NewMSatFromSatoshis(50_000) @@ -1416,7 +1433,7 @@ func TestRbfCloseClosingNegotiationRemote(t *testing.T) { FeeSatoshis: absoluteFee * 10, }, } - closeHarness.chanCloser.SendEvent(feeOffer) + closeHarness.chanCloser.SendEvent(ctx, feeOffer) // We shouldn't have transitioned to a new state. closeHarness.assertNoStateTransitions() @@ -1460,7 +1477,7 @@ func TestRbfCloseClosingNegotiationRemote(t *testing.T) { }, }, } - closeHarness.chanCloser.SendEvent(feeOffer) + closeHarness.chanCloser.SendEvent(ctx, feeOffer) // We shouldn't have transitioned to a new state. closeHarness.assertNoStateTransitions() @@ -1489,7 +1506,7 @@ func TestRbfCloseClosingNegotiationRemote(t *testing.T) { }, }, } - closeHarness.chanCloser.SendEvent(feeOffer) + closeHarness.chanCloser.SendEvent(ctx, feeOffer) // We shouldn't have transitioned to a new state. closeHarness.assertNoStateTransitions() @@ -1561,9 +1578,9 @@ func TestRbfCloseClosingNegotiationRemote(t *testing.T) { // We'll now simulate the start of the RBF loop, by receiving a // new Shutdown message from the remote party. This signals // that they want to obtain a new commit sig. - closeHarness.chanCloser.SendEvent(&ShutdownReceived{ - ShutdownScript: remoteAddr, - }) + closeHarness.chanCloser.SendEvent( + ctx, &ShutdownReceived{ShutdownScript: remoteAddr}, + ) // Next, we'll receive an offer from the remote party, and // drive another RBF iteration. This time, we'll increase the diff --git a/protofsm/log.go b/protofsm/log.go index 8ff9c1b62f..6978f1e890 100644 --- a/protofsm/log.go +++ b/protofsm/log.go @@ -1,7 +1,7 @@ package protofsm import ( - "github.com/btcsuite/btclog" + "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/build" ) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 2cc1219022..49917bbe21 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -9,6 +9,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnutils" @@ -130,6 +131,8 @@ type stateQuery[Event any, Env Environment] struct { type StateMachine[Event any, Env Environment] struct { cfg StateMachineCfg[Event, Env] + log btclog.Logger + // events is the channel that will be used to send new events to the // FSM. events chan Event @@ -142,7 +145,7 @@ type StateMachine[Event any, Env Environment] struct { // query the internal state machine state. stateQuery chan stateQuery[Event, Env] - wg fn.GoroutineManager + wg *fn.GoroutineManager quit chan struct{} startOnce sync.Once @@ -193,14 +196,17 @@ type StateMachineCfg[Event any, Env Environment] struct { // an initial state, an environment, and an event to process as if emitted at // the onset of the state machine. Such an event can be used to set up tracking // state such as a txid confirmation event. -func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:ll -) StateMachine[Event, Env] { +func NewStateMachine[Event any, Env Environment]( + cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] { return StateMachine[Event, Env]{ - cfg: cfg, + cfg: cfg, + log: log.WithPrefix( + fmt.Sprintf("FSM(%v)", cfg.Env.Name()), + ), events: make(chan Event, 1), stateQuery: make(chan stateQuery[Event, Env]), - wg: *fn.NewGoroutineManager(context.Background()), + wg: fn.NewGoroutineManager(), newStateEvents: fn.NewEventDistributor[State[Event, Env]](), quit: make(chan struct{}), } @@ -208,10 +214,10 @@ func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env] // Start starts the state machine. This will spawn a goroutine that will drive // the state machine to completion. -func (s *StateMachine[Event, Env]) Start() { +func (s *StateMachine[Event, Env]) Start(ctx context.Context) { s.startOnce.Do(func() { - _ = s.wg.Go(func(ctx context.Context) { - s.driveMachine() + _ = s.wg.Go(ctx, func(ctx context.Context) { + s.driveMachine(ctx) }) }) } @@ -228,13 +234,14 @@ func (s *StateMachine[Event, Env]) Stop() { // SendEvent sends a new event to the state machine. // // TODO(roasbeef): bool if processed? -func (s *StateMachine[Event, Env]) SendEvent(event Event) { - log.Debugf("FSM(%v): sending event: %v", s.cfg.Env.Name(), - lnutils.SpewLogClosure(event), - ) +func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) { + s.log.DebugS(ctx, "Sending event", + "event", lnutils.SpewLogClosure(event)) select { case s.events <- event: + case <-ctx.Done(): + return case <-s.quit: return } @@ -258,16 +265,16 @@ func (s *StateMachine[Event, Env]) Name() string { // message can be mapped using the default message mapper, then true is // returned indicating that the message was processed. Otherwise, false is // returned. -func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool { +func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context, + msg lnwire.Message) bool { + // If we have no message mapper, then return false as we can't process // this message. if !s.cfg.MsgMapper.IsSome() { return false } - log.Debugf("FSM(%v): sending msg: %v", s.cfg.Env.Name(), - lnutils.SpewLogClosure(msg), - ) + s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg)) // Otherwise, try to map the message using the default message mapper. // If we can't extract an event, then we'll return false to indicate @@ -277,7 +284,7 @@ func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool { event := mapper.MapMsg(msg) event.WhenSome(func(event Event) { - s.SendEvent(event) + s.SendEvent(ctx, event) processed = true }) @@ -330,7 +337,7 @@ func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[ // machine. An error is returned if the type of event is unknown. // //nolint:funlen -func (s *StateMachine[Event, Env]) executeDaemonEvent( +func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, event DaemonEvent) error { switch daemonEvent := event.(type) { @@ -338,11 +345,9 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // any preconditions as well as post-send events. case *SendMsgEvent[Event]: sendAndCleanUp := func() error { - log.Debugf("FSM(%v): sending message to target(%x): "+ - "%v", s.cfg.Env.Name(), - daemonEvent.TargetPeer.SerializeCompressed(), - lnutils.SpewLogClosure(daemonEvent.Msgs), - ) + s.log.DebugS(ctx, "Sending message to target", + btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()), + "messages", lnutils.SpewLogClosure(daemonEvent.Msgs)) err := s.cfg.Daemon.SendMessages( daemonEvent.TargetPeer, daemonEvent.Msgs, @@ -355,15 +360,14 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // If a post-send event was specified, then we'll funnel // that back into the main state machine now as well. return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll - launched := s.wg.Go(func(ctx context.Context) { - log.Debugf("FSM(%v): sending "+ - "post-send event: %v", - s.cfg.Env.Name(), - lnutils.SpewLogClosure(event), - ) + launched := s.wg.Go( + ctx, func(ctx context.Context) { + s.log.DebugS(ctx, "Sending post-send event", + "event", lnutils.SpewLogClosure(event)) - s.SendEvent(event) - }) + s.SendEvent(ctx, event) + }, + ) if !launched { return ErrStateMachineShutdown @@ -382,14 +386,13 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // Otherwise, this has a SendWhen predicate, so we'll need // launch a goroutine to poll the SendWhen, then send only once // the predicate is true. - launched := s.wg.Go(func(ctx context.Context) { + launched := s.wg.Go(ctx, func(ctx context.Context) { predicateTicker := time.NewTicker( s.cfg.CustomPollInterval.UnwrapOr(pollInterval), ) defer predicateTicker.Stop() - log.Infof("FSM(%v): waiting for send predicate to "+ - "be true", s.cfg.Env.Name()) + s.log.InfoS(ctx, "Waiting for send predicate to be true") for { select { @@ -402,14 +405,11 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( ) if canSend { - log.Infof("FSM(%v): send "+ - "active predicate", - s.cfg.Env.Name()) + s.log.InfoS(ctx, "Send active predicate") err := sendAndCleanUp() if err != nil { - //nolint:ll - log.Errorf("FSM(%v): unable to send message: %v", err) + s.log.ErrorS(ctx, "Unable to send message", err) } return @@ -430,8 +430,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // If this is a broadcast transaction event, then we'll broadcast with // the label attached. case *BroadcastTxn: - log.Debugf("FSM(%v): broadcasting txn, txid=%v", - s.cfg.Env.Name(), daemonEvent.Tx.TxHash()) + s.log.DebugS(ctx, "Broadcasting txn", + "txid", daemonEvent.Tx.TxHash()) err := s.cfg.Daemon.BroadcastTransaction( daemonEvent.Tx, daemonEvent.Label, @@ -445,8 +445,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // The state machine has requested a new event to be sent once a // transaction spending a specified outpoint has confirmed. case *RegisterSpend[Event]: - log.Debugf("FSM(%v): registering spend: %v", s.cfg.Env.Name(), - daemonEvent.OutPoint) + s.log.DebugS(ctx, "Registering spend", + "outpoint", daemonEvent.OutPoint) spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn( &daemonEvent.OutPoint, daemonEvent.PkScript, @@ -456,7 +456,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( return fmt.Errorf("unable to register spend: %w", err) } - launched := s.wg.Go(func(ctx context.Context) { + launched := s.wg.Go(ctx, func(ctx context.Context) { for { select { case spend, ok := <-spendEvent.Spend: @@ -470,7 +470,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( postSpend := daemonEvent.PostSpendEvent postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll customEvent := f(spend) - s.SendEvent(customEvent) + s.SendEvent(ctx, customEvent) }) return @@ -490,8 +490,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // The state machine has requested a new event to be sent once a // specified txid+pkScript pair has confirmed. case *RegisterConf[Event]: - log.Debugf("FSM(%v): registering conf: %v", s.cfg.Env.Name(), - daemonEvent.Txid) + s.log.DebugS(ctx, "Registering conf", + "txid", daemonEvent.Txid) numConfs := daemonEvent.NumConfs.UnwrapOr(1) confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn( @@ -502,7 +502,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( return fmt.Errorf("unable to register conf: %w", err) } - launched := s.wg.Go(func(ctx context.Context) { + launched := s.wg.Go(ctx, func(ctx context.Context) { for { select { case <-confEvent.Confirmed: @@ -514,7 +514,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // dispatchAfterRecv w/ above postConf := daemonEvent.PostConfEvent postConf.WhenSome(func(e Event) { - s.SendEvent(e) + s.SendEvent(ctx, e) }) return @@ -538,12 +538,13 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // applyEvents applies a new event to the state machine. This will continue // until no further events are emitted by the state machine. Along the way, // we'll also ensure to execute any daemon events that are emitted. -func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], - newEvent Event) (State[Event, Env], error) { +func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, + currentState State[Event, Env], newEvent Event) (State[Event, Env], + error) { + + s.log.DebugS(ctx, "Applying new event", + "event", lnutils.SpewLogClosure(newEvent)) - log.Debugf("FSM(%v): applying new event", s.cfg.Env.Name(), - lnutils.SpewLogClosure(newEvent), - ) eventQueue := fn.NewQueue(newEvent) // Given the next event to handle, we'll process the event, then add @@ -554,10 +555,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], //nolint:ll for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() { err := fn.MapOptionZ(nextEvent, func(event Event) error { - log.Debugf("FSM(%v): processing event: %v", - s.cfg.Env.Name(), - lnutils.SpewLogClosure(event), - ) + s.log.DebugS(ctx, "Processing event", + "event", lnutils.SpewLogClosure(event)) // Apply the state transition function of the current // state given this new event and our existing env. @@ -575,7 +574,7 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // of this new state transition. for _, dEvent := range events.ExternalEvents { err := s.executeDaemonEvent( - dEvent, + ctx, dEvent, ) if err != nil { return err @@ -587,14 +586,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // //nolint:ll for _, inEvent := range events.InternalEvent { - log.Debugf("FSM(%v): adding "+ - "new internal event "+ - "to queue: %v", - s.cfg.Env.Name(), - lnutils.SpewLogClosure( - inEvent, - ), - ) + s.log.DebugS(ctx, "Adding new internal event to queue", + "event", lnutils.SpewLogClosure(inEvent)) eventQueue.Enqueue(inEvent) } @@ -605,10 +598,9 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], return err } - log.Infof("FSM(%v): state transition: from_state=%T, "+ - "to_state=%T", - s.cfg.Env.Name(), currentState, - transition.NextState) + s.log.InfoS(ctx, "State transition", + btclog.Fmt("from_state", "%T", currentState), + btclog.Fmt("to_state", "%T", transition.NextState)) // With our events processed, we'll now update our // internal state. @@ -633,18 +625,18 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // driveMachine is the main event loop of the state machine. It accepts any new // incoming events, and then drives the state machine forward until it reaches // a terminal state. -func (s *StateMachine[Event, Env]) driveMachine() { - log.Debugf("FSM(%v): starting state machine", s.cfg.Env.Name()) +func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { + s.log.DebugS(ctx, "Starting state machine") currentState := s.cfg.InitialState // Before we start, if we have an init daemon event specified, then // we'll handle that now. err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error { - return s.executeDaemonEvent(event) + return s.executeDaemonEvent(ctx, event) }) if err != nil { - log.Errorf("unable to execute init event: %w", err) + s.log.ErrorS(ctx, "Unable to execute init event", err) return } @@ -658,11 +650,13 @@ func (s *StateMachine[Event, Env]) driveMachine() { // machine forward until we either run out of internal events, // or we reach a terminal state. case newEvent := <-s.events: - newState, err := s.applyEvents(currentState, newEvent) + newState, err := s.applyEvents( + ctx, currentState, newEvent, + ) if err != nil { s.cfg.ErrorReporter.ReportError(err) - log.Errorf("unable to apply event: %v", err) + s.log.ErrorS(ctx, "Unable to apply event", err) // An error occurred, so we'll tear down the // entire state machine as we can't proceed. diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index ea596dc250..1ff0217d9f 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -14,6 +14,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "golang.org/x/net/context" ) type dummyEvents interface { @@ -223,6 +224,8 @@ func (d *dummyAdapters) RegisterSpendNtfn(outpoint *wire.OutPoint, // TestStateMachineOnInitDaemonEvent tests that the state machine will properly // execute any init-level daemon events passed into it. func TestStateMachineOnInitDaemonEvent(t *testing.T) { + ctx := context.Background() + // First, we'll create our state machine given the env, and our // starting state. env := &dummyEnv{} @@ -254,7 +257,7 @@ func TestStateMachineOnInitDaemonEvent(t *testing.T) { stateSub := stateMachine.RegisterStateEvents() defer stateMachine.RemoveStateSub(stateSub) - stateMachine.Start() + stateMachine.Start(ctx) defer stateMachine.Stop() // Assert that we go from the starting state to the final state. The @@ -275,6 +278,7 @@ func TestStateMachineOnInitDaemonEvent(t *testing.T) { // transition. func TestStateMachineInternalEvents(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create our state machine given the env, and our // starting state. @@ -296,12 +300,12 @@ func TestStateMachineInternalEvents(t *testing.T) { stateSub := stateMachine.RegisterStateEvents() defer stateMachine.RemoveStateSub(stateSub) - stateMachine.Start() + stateMachine.Start(ctx) defer stateMachine.Stop() // For this transition, we'll send in the emitInternal event, which'll // send us back to the starting event, but emit an internal event. - stateMachine.SendEvent(&emitInternal{}) + stateMachine.SendEvent(ctx, &emitInternal{}) // We'll now also assert the path we took to get here to ensure the // internal events were processed. @@ -323,6 +327,7 @@ func TestStateMachineInternalEvents(t *testing.T) { // daemon emitted as part of the state transition process. func TestStateMachineDaemonEvents(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create our state machine given the env, and our // starting state. @@ -348,7 +353,7 @@ func TestStateMachineDaemonEvents(t *testing.T) { stateSub := stateMachine.RegisterStateEvents() defer stateMachine.RemoveStateSub(stateSub) - stateMachine.Start() + stateMachine.Start(ctx) defer stateMachine.Stop() // As soon as we send in the daemon event, we expect the @@ -360,7 +365,7 @@ func TestStateMachineDaemonEvents(t *testing.T) { // We'll start off by sending in the daemon event, which'll trigger the // state machine to execute the series of daemon events. - stateMachine.SendEvent(&daemonEvents{}) + stateMachine.SendEvent(ctx, &daemonEvents{}) // We should transition back to the starting state now, after we // started from the very same state. @@ -402,6 +407,8 @@ func (d *dummyMsgMapper) MapMsg(wireMsg lnwire.Message) fn.Option[dummyEvents] { // TestStateMachineMsgMapper tests that given a message mapper, we can properly // send in wire messages get mapped to FSM events. func TestStateMachineMsgMapper(t *testing.T) { + ctx := context.Background() + // First, we'll create our state machine given the env, and our // starting state. env := &dummyEnv{} @@ -436,7 +443,7 @@ func TestStateMachineMsgMapper(t *testing.T) { stateSub := stateMachine.RegisterStateEvents() defer stateMachine.RemoveStateSub(stateSub) - stateMachine.Start() + stateMachine.Start(ctx) defer stateMachine.Stop() // First, we'll verify that the CanHandle method works as expected. @@ -445,7 +452,7 @@ func TestStateMachineMsgMapper(t *testing.T) { // Next, we'll attempt to send the wire message into the state machine. // We should transition to the final state. - require.True(t, stateMachine.SendMessage(wireError)) + require.True(t, stateMachine.SendMessage(ctx, wireError)) // We should transition to the final state. expectedStates := []State[dummyEvents, *dummyEnv]{