Skip to content

Commit

Permalink
protofsm: update to take ctx and use structured logging
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Dec 10, 2024
1 parent f6dccd8 commit 8219f86
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 58 deletions.
2 changes: 1 addition & 1 deletion protofsm/log.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package protofsm

import (
"github.com/btcsuite/btclog"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
)

Expand Down
103 changes: 48 additions & 55 deletions protofsm/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnutils"
Expand Down Expand Up @@ -101,8 +102,8 @@ type DaemonAdapters interface {
// TODO(roasbeef): could abstract further?
RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte,
numConfs, heightHint uint32,
opts ...chainntnfs.NotifierOption,
) (*chainntnfs.ConfirmationEvent, error)
opts ...chainntnfs.NotifierOption) (
*chainntnfs.ConfirmationEvent, error)

// RegisterSpendNtfn registers an intent to be notified once the target
// outpoint is successfully spent within a transaction. The script that
Expand Down Expand Up @@ -211,7 +212,7 @@ func NewStateMachine[Event any, Env Environment](
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
s.startOnce.Do(func() {
_ = s.wg.Go(ctx, func(ctx context.Context) {
s.driveMachine(ctx)
s.driveMachine(s.decorateCtx(ctx))
})
})
}
Expand All @@ -229,17 +230,18 @@ func (s *StateMachine[Event, Env]) Stop() {
//
// TODO(roasbeef): bool if processed?
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
s.sendEvent(ctx, event)
s.sendEvent(s.decorateCtx(ctx), event)
}

// sendEvent sends a new event to the state machine.
func (s *StateMachine[Event, Env]) sendEvent(ctx context.Context, event Event) {
log.Debugf("FSM(%v): sending event: %v", s.cfg.Env.Name(),
lnutils.SpewLogClosure(event))
log.DebugS(ctx, "Sending event",
"event", lnutils.SpewLogClosure(event))

select {
case s.events <- event:
case <-ctx.Done():
return
case <-s.quit:
return
}
Expand All @@ -266,7 +268,8 @@ func (s *StateMachine[Event, Env]) Name() string {
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
msg lnwire.Message) bool {

return s.sendMessage(ctx, msg)
return s.sendMessage(s.decorateCtx(ctx), msg)

}

// sendMessage attempts to send a wire message to the state machine. If the
Expand All @@ -282,9 +285,7 @@ func (s *StateMachine[Event, Env]) sendMessage(ctx context.Context,
return false
}

log.Debugf("FSM(%v): sending msg: %v", s.cfg.Env.Name(),
lnutils.SpewLogClosure(msg),
)
log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))

// Otherwise, try to map the message using the default message mapper.
// If we can't extract an event, then we'll return false to indicate
Expand Down Expand Up @@ -355,11 +356,9 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// any preconditions as well as post-send events.
case *SendMsgEvent[Event]:
sendAndCleanUp := func() error {
log.Debugf("FSM(%v): sending message to target(%x): "+
"%v", s.cfg.Env.Name(),
daemonEvent.TargetPeer.SerializeCompressed(),
lnutils.SpewLogClosure(daemonEvent.Msgs),
)
log.DebugS(ctx, "Sending message",
btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
"msgs", lnutils.SpewLogClosure(daemonEvent.Msgs))

err := s.cfg.Daemon.SendMessages(
daemonEvent.TargetPeer, daemonEvent.Msgs,
Expand All @@ -374,10 +373,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
launched := s.wg.Go(
ctx, func(ctx context.Context) {
log.Debugf("FSM(%v): sending "+
"post-send event: %v",
s.cfg.Env.Name(),
lnutils.SpewLogClosure(event))
log.DebugS(ctx, "Sending post-send event",
"event", lnutils.SpewLogClosure(event))

s.sendEvent(ctx, event)
},
Expand Down Expand Up @@ -406,8 +403,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
)
defer predicateTicker.Stop()

log.Infof("FSM(%v): waiting for send predicate to "+
"be true", s.cfg.Env.Name())
log.InfoS(ctx, "Waiting for send predicate to be true")

for {
select {
Expand All @@ -420,14 +416,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
)

if canSend {
log.Infof("FSM(%v): send "+
"active predicate",
s.cfg.Env.Name())
log.InfoS(ctx, "Send active "+
"predicate")

err := sendAndCleanUp()
if err != nil {
//nolint:ll
log.Errorf("FSM(%v): unable to send message: %v", err)
log.ErrorS(ctx, "Unable to send message", err)
}

return
Expand All @@ -448,8 +442,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// If this is a broadcast transaction event, then we'll broadcast with
// the label attached.
case *BroadcastTxn:
log.Debugf("FSM(%v): broadcasting txn, txid=%v",
s.cfg.Env.Name(), daemonEvent.Tx.TxHash())
log.DebugS(ctx, "Broadcasting txn",
"txid", daemonEvent.Tx.TxHash())

err := s.cfg.Daemon.BroadcastTransaction(
daemonEvent.Tx, daemonEvent.Label,
Expand All @@ -463,8 +457,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// The state machine has requested a new event to be sent once a
// transaction spending a specified outpoint has confirmed.
case *RegisterSpend[Event]:
log.Debugf("FSM(%v): registering spend: %v", s.cfg.Env.Name(),
daemonEvent.OutPoint)
log.DebugS(ctx, "Registering spend",
"outpoint", daemonEvent.OutPoint)

spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
&daemonEvent.OutPoint, daemonEvent.PkScript,
Expand Down Expand Up @@ -508,8 +502,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// The state machine has requested a new event to be sent once a
// specified txid+pkScript pair has confirmed.
case *RegisterConf[Event]:
log.Debugf("FSM(%v): registering conf: %v", s.cfg.Env.Name(),
daemonEvent.Txid)
log.DebugS(ctx, "Registering conf",
"txid", daemonEvent.Txid)

numConfs := daemonEvent.NumConfs.UnwrapOr(1)
confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
Expand Down Expand Up @@ -560,8 +554,8 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
currentState State[Event, Env], newEvent Event) (State[Event, Env],
error) {

log.Debugf("FSM(%v): applying new event", s.cfg.Env.Name(),
lnutils.SpewLogClosure(newEvent),
log.DebugS(ctx, "Applying new event",
"event", lnutils.SpewLogClosure(newEvent),
)
eventQueue := fn.NewQueue(newEvent)

Expand All @@ -573,9 +567,8 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
//nolint:ll
for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
err := fn.MapOptionZ(nextEvent, func(event Event) error {
log.Debugf("FSM(%v): processing event: %v",
s.cfg.Env.Name(),
lnutils.SpewLogClosure(event),
log.DebugS(ctx, "Processing event",
"event", lnutils.SpewLogClosure(event),
)

// Apply the state transition function of the current
Expand All @@ -595,9 +588,8 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
//
//nolint:ll
err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error {
log.Debugf("FSM(%v): processing "+
"daemon %v daemon events",
s.cfg.Env.Name(), len(dEvents))
log.DebugS(ctx, "Processing daemon events",
"num_events", len(dEvents))

for _, dEvent := range dEvents {
err := s.executeDaemonEvent(
Expand All @@ -620,14 +612,9 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
//nolint:ll
events.InternalEvent.WhenSome(func(es []Event) {
for _, inEvent := range es {
log.Debugf("FSM(%v): adding "+
"new internal event "+
"to queue: %v",
s.cfg.Env.Name(),
lnutils.SpewLogClosure(
inEvent,
),
)
log.DebugS(ctx, "Adding new internal "+
"event to queue event",
lnutils.SpewLogClosure(inEvent))

eventQueue.Enqueue(inEvent)
}
Expand All @@ -639,10 +626,9 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
return err
}

log.Infof("FSM(%v): state transition: from_state=%T, "+
"to_state=%T",
s.cfg.Env.Name(), currentState,
transition.NextState)
log.InfoS(ctx, "State transition",
btclog.Fmt("from_state", "%T", currentState),
btclog.Fmt("to_state", "%T", transition.NextState))

// With our events processed, we'll now update our
// internal state.
Expand All @@ -668,7 +654,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
// incoming events, and then drives the state machine forward until it reaches
// a terminal state.
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
log.Debugf("FSM(%v): starting state machine", s.cfg.Env.Name())
log.DebugS(ctx, "Starting state machine")

currentState := s.cfg.InitialState

Expand All @@ -678,7 +664,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
return s.executeDaemonEvent(ctx, event)
})
if err != nil {
log.Errorf("unable to execute init event: %w", err)
log.ErrorS(ctx, "Unable to execute init event", err)
return
}

Expand All @@ -698,7 +684,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
if err != nil {
s.cfg.ErrorReporter.ReportError(err)

log.Errorf("unable to apply event: %v", err)
log.ErrorS(ctx, "Unable to apply event", err)

// An error occurred, so we'll tear down the
// entire state machine as we can't proceed.
Expand All @@ -721,3 +707,10 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
}
}
}

// decorateCtx adds the FSM name to the context as a logging attribute.
func (s *StateMachine[Event, Env]) decorateCtx(
ctx context.Context) context.Context {

return btclog.WithCtx(ctx, btclog.Fmt("fsm_name", s.cfg.Env.Name()))
}
3 changes: 1 addition & 2 deletions protofsm/state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,7 @@ func TestStateMachineMsgMapper(t *testing.T) {

// Next, we'll attempt to send the wire message into the state machine.
// We should transition to the final state.
require.True(t, stateMachine.SendMessage(ctx,
wireError))
require.True(t, stateMachine.SendMessage(ctx, wireError))

// We should transition to the final state.
expectedStates := []State[dummyEvents, *dummyEnv]{
Expand Down

0 comments on commit 8219f86

Please sign in to comment.