diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index aa11767eff..c83659fe2f 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -131,6 +131,8 @@ type stateQuery[Event any, Env Environment] struct { type StateMachine[Event any, Env Environment] struct { cfg StateMachineCfg[Event, Env] + log btclog.Logger + // events is the channel that will be used to send new events to the // FSM. events chan Event @@ -198,7 +200,10 @@ func NewStateMachine[Event any, Env Environment]( cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] { return StateMachine[Event, Env]{ - cfg: cfg, + cfg: cfg, + log: log.WithPrefix( + fmt.Sprintf("FSM(%v)", cfg.Env.Name()), + ), events: make(chan Event, 1), stateQuery: make(chan stateQuery[Event, Env]), wg: *fn.NewGoroutineManager(), @@ -212,7 +217,7 @@ func NewStateMachine[Event any, Env Environment]( func (s *StateMachine[Event, Env]) Start(ctx context.Context) { s.startOnce.Do(func() { _ = s.wg.Go(ctx, func(ctx context.Context) { - s.driveMachine(s.decorateCtx(ctx)) + s.driveMachine(ctx) }) }) } @@ -230,12 +235,7 @@ func (s *StateMachine[Event, Env]) Stop() { // // TODO(roasbeef): bool if processed? func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) { - s.sendEvent(s.decorateCtx(ctx), event) -} - -// sendEvent sends a new event to the state machine. -func (s *StateMachine[Event, Env]) sendEvent(ctx context.Context, event Event) { - log.DebugS(ctx, "Sending event", + s.log.DebugS(ctx, "Sending event", "event", lnutils.SpewLogClosure(event)) select { @@ -268,24 +268,13 @@ func (s *StateMachine[Event, Env]) Name() string { func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context, msg lnwire.Message) bool { - return s.sendMessage(s.decorateCtx(ctx), msg) - -} - -// sendMessage attempts to send a wire message to the state machine. If the -// message can be mapped using the default message mapper, then true is -// returned indicating that the message was processed. Otherwise, false is -// returned. -func (s *StateMachine[Event, Env]) sendMessage(ctx context.Context, - msg lnwire.Message) bool { - // If we have no message mapper, then return false as we can't process // this message. if !s.cfg.MsgMapper.IsSome() { return false } - log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg)) + s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg)) // Otherwise, try to map the message using the default message mapper. // If we can't extract an event, then we'll return false to indicate @@ -295,7 +284,7 @@ func (s *StateMachine[Event, Env]) sendMessage(ctx context.Context, event := mapper.MapMsg(msg) event.WhenSome(func(event Event) { - s.sendEvent(ctx, event) + s.SendEvent(ctx, event) processed = true }) @@ -354,7 +343,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // any preconditions as well as post-send events. case *SendMsgEvent[Event]: sendAndCleanUp := func() error { - log.DebugS(ctx, "Sending message", + s.log.DebugS(ctx, "Sending message", btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()), "msgs", lnutils.SpewLogClosure(daemonEvent.Msgs)) @@ -371,10 +360,10 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll launched := s.wg.Go( ctx, func(ctx context.Context) { - log.DebugS(ctx, "Sending post-send event", + s.log.DebugS(ctx, "Sending post-send event", "event", lnutils.SpewLogClosure(event)) - s.sendEvent(ctx, event) + s.SendEvent(ctx, event) }, ) @@ -401,7 +390,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, ) defer predicateTicker.Stop() - log.InfoS(ctx, "Waiting for send predicate to be true") + s.log.InfoS(ctx, "Waiting for send predicate to be true") for { select { @@ -414,12 +403,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, ) if canSend { - log.InfoS(ctx, "Send active "+ + s.log.InfoS(ctx, "Send active "+ "predicate") err := sendAndCleanUp() if err != nil { - log.ErrorS(ctx, "Unable to send message", err) + s.log.ErrorS(ctx, "Unable to send message", err) } return @@ -440,7 +429,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // If this is a broadcast transaction event, then we'll broadcast with // the label attached. case *BroadcastTxn: - log.DebugS(ctx, "Broadcasting txn", + s.log.DebugS(ctx, "Broadcasting txn", "txid", daemonEvent.Tx.TxHash()) err := s.cfg.Daemon.BroadcastTransaction( @@ -455,7 +444,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // The state machine has requested a new event to be sent once a // transaction spending a specified outpoint has confirmed. case *RegisterSpend[Event]: - log.DebugS(ctx, "Registering spend", + s.log.DebugS(ctx, "Registering spend", "outpoint", daemonEvent.OutPoint) spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn( @@ -480,7 +469,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, postSpend := daemonEvent.PostSpendEvent postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll customEvent := f(spend) - s.sendEvent(ctx, customEvent) + s.SendEvent(ctx, customEvent) }) return @@ -500,7 +489,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // The state machine has requested a new event to be sent once a // specified txid+pkScript pair has confirmed. case *RegisterConf[Event]: - log.DebugS(ctx, "Registering conf", + s.log.DebugS(ctx, "Registering conf", "txid", daemonEvent.Txid) numConfs := daemonEvent.NumConfs.UnwrapOr(1) @@ -524,7 +513,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // dispatchAfterRecv w/ above postConf := daemonEvent.PostConfEvent postConf.WhenSome(func(e Event) { - s.sendEvent(ctx, e) + s.SendEvent(ctx, e) }) return @@ -552,7 +541,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, currentState State[Event, Env], newEvent Event) (State[Event, Env], error) { - log.DebugS(ctx, "Applying new event", + s.log.DebugS(ctx, "Applying new event", "event", lnutils.SpewLogClosure(newEvent), ) eventQueue := fn.NewQueue(newEvent) @@ -565,7 +554,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, //nolint:ll for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() { err := fn.MapOptionZ(nextEvent, func(event Event) error { - log.DebugS(ctx, "Processing event", + s.log.DebugS(ctx, "Processing event", "event", lnutils.SpewLogClosure(event), ) @@ -586,7 +575,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, // //nolint:ll err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error { - log.DebugS(ctx, "Processing daemon events", + s.log.DebugS(ctx, "Processing daemon events", "num_events", len(dEvents)) for _, dEvent := range dEvents { @@ -610,7 +599,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, //nolint:ll events.InternalEvent.WhenSome(func(es []Event) { for _, inEvent := range es { - log.DebugS(ctx, "Adding new internal "+ + s.log.DebugS(ctx, "Adding new internal "+ "event to queue event", lnutils.SpewLogClosure(inEvent)) @@ -624,7 +613,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, return err } - log.InfoS(ctx, "State transition", + s.log.InfoS(ctx, "State transition", btclog.Fmt("from_state", "%T", currentState), btclog.Fmt("to_state", "%T", transition.NextState)) @@ -652,7 +641,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, // incoming events, and then drives the state machine forward until it reaches // a terminal state. func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { - log.DebugS(ctx, "Starting state machine") + s.log.DebugS(ctx, "Starting state machine") currentState := s.cfg.InitialState @@ -662,7 +651,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { return s.executeDaemonEvent(ctx, event) }) if err != nil { - log.ErrorS(ctx, "Unable to execute init event", err) + s.log.ErrorS(ctx, "Unable to execute init event", err) return } @@ -682,7 +671,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { if err != nil { s.cfg.ErrorReporter.ReportError(err) - log.ErrorS(ctx, "Unable to apply event", err) + s.log.ErrorS(ctx, "Unable to apply event", err) // An error occurred, so we'll tear down the // entire state machine as we can't proceed. @@ -705,11 +694,3 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { } } } - -// decorateCtx adds the FSM name to the context as a logging attribute. -func (s *StateMachine[Event, Env]) decorateCtx( - ctx context.Context) context.Context { - - return btclog.WithCtx(ctx, - btclog.Fmt("fsm_name", s.cfg.Env.Name())) //nolint:govet -}