Skip to content

Commit

Permalink
protofsm: use prefixed logger instead
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Dec 10, 2024
1 parent a86da09 commit 6526c62
Showing 1 changed file with 29 additions and 48 deletions.
77 changes: 29 additions & 48 deletions protofsm/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type stateQuery[Event any, Env Environment] struct {
type StateMachine[Event any, Env Environment] struct {
cfg StateMachineCfg[Event, Env]

log btclog.Logger

// events is the channel that will be used to send new events to the
// FSM.
events chan Event
Expand Down Expand Up @@ -198,7 +200,10 @@ func NewStateMachine[Event any, Env Environment](
cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {

return StateMachine[Event, Env]{
cfg: cfg,
cfg: cfg,
log: log.WithPrefix(
fmt.Sprintf("FSM(%v)", cfg.Env.Name()),
),
events: make(chan Event, 1),
stateQuery: make(chan stateQuery[Event, Env]),
wg: *fn.NewGoroutineManager(),
Expand All @@ -212,7 +217,7 @@ func NewStateMachine[Event any, Env Environment](
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
s.startOnce.Do(func() {
_ = s.wg.Go(ctx, func(ctx context.Context) {
s.driveMachine(s.decorateCtx(ctx))
s.driveMachine(ctx)
})
})
}
Expand All @@ -230,12 +235,7 @@ func (s *StateMachine[Event, Env]) Stop() {
//
// TODO(roasbeef): bool if processed?
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
s.sendEvent(s.decorateCtx(ctx), event)
}

// sendEvent sends a new event to the state machine.
func (s *StateMachine[Event, Env]) sendEvent(ctx context.Context, event Event) {
log.DebugS(ctx, "Sending event",
s.log.DebugS(ctx, "Sending event",
"event", lnutils.SpewLogClosure(event))

select {
Expand Down Expand Up @@ -268,24 +268,13 @@ func (s *StateMachine[Event, Env]) Name() string {
func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
msg lnwire.Message) bool {

return s.sendMessage(s.decorateCtx(ctx), msg)

}

// sendMessage attempts to send a wire message to the state machine. If the
// message can be mapped using the default message mapper, then true is
// returned indicating that the message was processed. Otherwise, false is
// returned.
func (s *StateMachine[Event, Env]) sendMessage(ctx context.Context,
msg lnwire.Message) bool {

// If we have no message mapper, then return false as we can't process
// this message.
if !s.cfg.MsgMapper.IsSome() {
return false
}

log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))
s.log.DebugS(ctx, "Sending msg", "msg", lnutils.SpewLogClosure(msg))

// Otherwise, try to map the message using the default message mapper.
// If we can't extract an event, then we'll return false to indicate
Expand All @@ -295,7 +284,7 @@ func (s *StateMachine[Event, Env]) sendMessage(ctx context.Context,
event := mapper.MapMsg(msg)

event.WhenSome(func(event Event) {
s.sendEvent(ctx, event)
s.SendEvent(ctx, event)

processed = true
})
Expand Down Expand Up @@ -354,7 +343,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// any preconditions as well as post-send events.
case *SendMsgEvent[Event]:
sendAndCleanUp := func() error {
log.DebugS(ctx, "Sending message",
s.log.DebugS(ctx, "Sending message",
btclog.Hex6("target", daemonEvent.TargetPeer.SerializeCompressed()),
"msgs", lnutils.SpewLogClosure(daemonEvent.Msgs))

Expand All @@ -371,10 +360,10 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
launched := s.wg.Go(
ctx, func(ctx context.Context) {
log.DebugS(ctx, "Sending post-send event",
s.log.DebugS(ctx, "Sending post-send event",
"event", lnutils.SpewLogClosure(event))

s.sendEvent(ctx, event)
s.SendEvent(ctx, event)
},
)

Expand All @@ -401,7 +390,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
)
defer predicateTicker.Stop()

log.InfoS(ctx, "Waiting for send predicate to be true")
s.log.InfoS(ctx, "Waiting for send predicate to be true")

for {
select {
Expand All @@ -414,12 +403,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
)

if canSend {
log.InfoS(ctx, "Send active "+
s.log.InfoS(ctx, "Send active "+
"predicate")

err := sendAndCleanUp()
if err != nil {
log.ErrorS(ctx, "Unable to send message", err)
s.log.ErrorS(ctx, "Unable to send message", err)
}

return
Expand All @@ -440,7 +429,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// If this is a broadcast transaction event, then we'll broadcast with
// the label attached.
case *BroadcastTxn:
log.DebugS(ctx, "Broadcasting txn",
s.log.DebugS(ctx, "Broadcasting txn",
"txid", daemonEvent.Tx.TxHash())

err := s.cfg.Daemon.BroadcastTransaction(
Expand All @@ -455,7 +444,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// The state machine has requested a new event to be sent once a
// transaction spending a specified outpoint has confirmed.
case *RegisterSpend[Event]:
log.DebugS(ctx, "Registering spend",
s.log.DebugS(ctx, "Registering spend",
"outpoint", daemonEvent.OutPoint)

spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
Expand All @@ -480,7 +469,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
postSpend := daemonEvent.PostSpendEvent
postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:ll
customEvent := f(spend)
s.sendEvent(ctx, customEvent)
s.SendEvent(ctx, customEvent)
})

return
Expand All @@ -500,7 +489,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// The state machine has requested a new event to be sent once a
// specified txid+pkScript pair has confirmed.
case *RegisterConf[Event]:
log.DebugS(ctx, "Registering conf",
s.log.DebugS(ctx, "Registering conf",
"txid", daemonEvent.Txid)

numConfs := daemonEvent.NumConfs.UnwrapOr(1)
Expand All @@ -524,7 +513,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// dispatchAfterRecv w/ above
postConf := daemonEvent.PostConfEvent
postConf.WhenSome(func(e Event) {
s.sendEvent(ctx, e)
s.SendEvent(ctx, e)
})

return
Expand Down Expand Up @@ -552,7 +541,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
currentState State[Event, Env], newEvent Event) (State[Event, Env],
error) {

log.DebugS(ctx, "Applying new event",
s.log.DebugS(ctx, "Applying new event",
"event", lnutils.SpewLogClosure(newEvent),
)
eventQueue := fn.NewQueue(newEvent)
Expand All @@ -565,7 +554,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
//nolint:ll
for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
err := fn.MapOptionZ(nextEvent, func(event Event) error {
log.DebugS(ctx, "Processing event",
s.log.DebugS(ctx, "Processing event",
"event", lnutils.SpewLogClosure(event),
)

Expand All @@ -586,7 +575,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
//
//nolint:ll
err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error {
log.DebugS(ctx, "Processing daemon events",
s.log.DebugS(ctx, "Processing daemon events",
"num_events", len(dEvents))

for _, dEvent := range dEvents {
Expand All @@ -610,7 +599,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
//nolint:ll
events.InternalEvent.WhenSome(func(es []Event) {
for _, inEvent := range es {
log.DebugS(ctx, "Adding new internal "+
s.log.DebugS(ctx, "Adding new internal "+
"event to queue event",
lnutils.SpewLogClosure(inEvent))

Expand All @@ -624,7 +613,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
return err
}

log.InfoS(ctx, "State transition",
s.log.InfoS(ctx, "State transition",
btclog.Fmt("from_state", "%T", currentState),
btclog.Fmt("to_state", "%T", transition.NextState))

Expand Down Expand Up @@ -652,7 +641,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
// incoming events, and then drives the state machine forward until it reaches
// a terminal state.
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
log.DebugS(ctx, "Starting state machine")
s.log.DebugS(ctx, "Starting state machine")

currentState := s.cfg.InitialState

Expand All @@ -662,7 +651,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
return s.executeDaemonEvent(ctx, event)
})
if err != nil {
log.ErrorS(ctx, "Unable to execute init event", err)
s.log.ErrorS(ctx, "Unable to execute init event", err)
return
}

Expand All @@ -682,7 +671,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
if err != nil {
s.cfg.ErrorReporter.ReportError(err)

log.ErrorS(ctx, "Unable to apply event", err)
s.log.ErrorS(ctx, "Unable to apply event", err)

// An error occurred, so we'll tear down the
// entire state machine as we can't proceed.
Expand All @@ -705,11 +694,3 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
}
}
}

// decorateCtx adds the FSM name to the context as a logging attribute.
func (s *StateMachine[Event, Env]) decorateCtx(
ctx context.Context) context.Context {

return btclog.WithCtx(ctx,
btclog.Fmt("fsm_name", s.cfg.Env.Name())) //nolint:govet
}

0 comments on commit 6526c62

Please sign in to comment.