Skip to content

Commit

Permalink
feat: bring back panic support
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Jan 18, 2024
1 parent bdad2d4 commit 6343979
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 78 deletions.
3 changes: 1 addition & 2 deletions examples/temporal-expense/expense_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,10 @@ func ExpenseFlow(
})

// bind handlers and wait for Ready
binding, err := machine.BindHandlers(&MachineHandlers{})
err := machine.BindHandlers(&MachineHandlers{})
if err != nil {
return machine, err
}
<-binding.Ready

// reusable error channel
errCh := machine.WhenErr(nil)
Expand Down
3 changes: 1 addition & 2 deletions examples/temporal-fileprocessing/fileprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,10 @@ func FileProcessingFlow(ctx context.Context, log Logger, filename string) (*am.M
})

// bind handlers and wait for Ready
binding, err := machine.BindHandlers(&MachineHandlers{})
err := machine.BindHandlers(&MachineHandlers{})
if err != nil {
return machine, err
}
<-binding.Ready

// start it up!
machine.Add(am.S{"DownloadingFile"}, am.A{"filename": filename})
Expand Down
161 changes: 88 additions & 73 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -64,20 +65,22 @@ type Machine struct {
// List of all the registered state names.
StateNames S

emitters []*emitter
clock map[string]uint64
cancel context.CancelFunc
logLevel LogLevel
logger Logger
// Queue mutex.
queueLock sync.Mutex
emitters []*emitter
clock map[string]uint64
cancel context.CancelFunc
logLevel LogLevel
logger Logger
queueLock sync.RWMutex
queueProcessing atomic.Value
activeStatesLock sync.Mutex
recoveryCh chan bool
panicCaught bool
disposed bool
indexWhen indexWhen
indexStateCtx indexStateCtx
indexEventCh indexEventCh
indexEventChLock sync.Mutex
handlerDone chan bool
handlerPanic chan any
}

// New creates a new Machine instance, bound to context and modified with
Expand All @@ -92,11 +95,13 @@ func New(ctx context.Context, states States, opts *Opts) *Machine {
PrintExceptions: true,
PanicToException: true,
LogID: true,
recoveryCh: make(chan bool, 1),
indexWhen: indexWhen{},
indexStateCtx: indexStateCtx{},
indexEventCh: indexEventCh{},
handlerDone: make(chan bool),
handlerPanic: make(chan any),
}
m.queueProcessing.Store(false)
if opts != nil {
if opts.ID != "" {
m.ID = opts.ID
Expand Down Expand Up @@ -494,12 +499,14 @@ func (m *Machine) queueMutation(mutationType MutationType, states S, args A) {
m.log(LogOps, "[queue:%s] %s", mutationType, j(statesParsed))
}

m.queueLock.Lock()
m.Queue = append(m.Queue, Mutation{
Type: mutationType,
CalledStates: statesParsed,
Args: args,
Auto: false,
})
m.queueLock.Unlock()
}

// GetStateCtx returns a context, bound to the current clock tick of the
Expand Down Expand Up @@ -552,7 +559,7 @@ func (m *Machine) BindHandlers(handlers any) error {
// passed events happen. It's quick substitute for a predefined transition
// handler, although it does not guarantee a deterministic order of execution.
// ctx: optional context to dispose the emitter earlier.
// TODO dont call inside of handlers
// It's not supported to nest On() calls, as it would cause a deadlock.
func (m *Machine) On(events []string, ctx context.Context) chan *Event {
ch := make(chan *Event)
m.indexEventChLock.Lock()
Expand All @@ -573,40 +580,17 @@ func (m *Machine) On(events []string, ctx context.Context) chan *Event {
}

// recoverToErr recovers to the Exception state by catching panics.
// TODO refresh `m.indexWhen[]states` stateIsActive map
func (m *Machine) recoverToErr(emitter *emitter, handlerMethods reflect.Value,
binding *HandlerBinding,
) {
func (m *Machine) recoverToErr(emitter *emitter, r any) {
if m.Ctx.Err() != nil {
return
}
r := recover()
if r == nil {
return
}
m.panicCaught = true
t := m.Transition
// dont double handle an exception (no nesting)
if lo.Contains(t.Mutation.CalledStates, "Exception") {
return
}
m.log(LogOps, "[recover] handling panic...")
// TODO
// defer func() {
// // re-bind the handlers
// if emitter.EventChLocked {
// emitter.EventChMutex.Unlock()
// emitter.EventChLocked = false
// }
// go m.handleEmitterLoop(emitter, handlerMethods, binding)
// m.log(LogEverything, "[recover] %s during (%s)", emitter.id,
// j(t.Mutation.CalledStates))
// <-binding.Ready
// m.log(LogEverything, "[recover] new binding ready")
// // continue the queue
// if m.Ctx.Err() == nil {
// m.recoveryCh <- true
// }
// }()
err, ok := r.(error)
if !ok {
err = errors.New(fmt.Sprint(r))
Expand Down Expand Up @@ -640,6 +624,8 @@ func (m *Machine) recoverToErr(emitter *emitter, handlerMethods reflect.Value,
m.setActiveStates(t.CalledStates(), activeStates, t.IsAuto())
t.IsCompleted = true
}
m.log(LogOps, "[cancel:%s] (%s) by recover", t.latestStep.ID[:5],
j(t.TargetStates))
if t.Mutation == nil {
// TODO can this even happen?
panic(fmt.Sprintf("no mutation panic in %s: %s", emitter.id, err))
Expand Down Expand Up @@ -721,13 +707,15 @@ func (m *Machine) setActiveStates(calledStates S, targetStates S,
// processQueue processes the queue of mutations. It's the main loop of the
// machine.
func (m *Machine) processQueue() Result {
m.queueLock.RLock()
lenQueue := len(m.Queue)
m.queueLock.RUnlock()
// empty queue
if lenQueue == 0 {
return Canceled
}
// acquire the mutex or log and return
if !m.queueLock.TryLock() {
// acquire the atomic lock
if !m.queueProcessing.CompareAndSwap(false, true) {
label := "items"
if lenQueue == 1 {
label = "item"
Expand All @@ -737,23 +725,29 @@ func (m *Machine) processQueue() Result {
}

var ret []Result
defer func() {
m.emit("queue-end", nil, nil)
m.Transition = nil
m.queueLock.Unlock()
}()

// execute the queue
for len(m.Queue) > 0 {
for lenQueue > 0 {
// shift the queue
m.queueLock.Lock()
item := &m.Queue[0]
m.Queue = m.Queue[1:]
m.queueLock.Unlock()
m.Transition = newTransition(m, item)
// execute the transition
ret = append(ret, m.Transition.emitEvents())
m.processWhenBindings()
m.processStateCtxBindings()
m.queueLock.RLock()
lenQueue = len(m.Queue)
m.queueLock.RUnlock()
}

m.emit("queue-end", nil, nil)
m.Transition = nil
// release the atomic lock
m.queueProcessing.Swap(false)

if len(ret) == 0 {
return Canceled
}
Expand Down Expand Up @@ -784,7 +778,6 @@ func (m *Machine) processWhenBindings() {
all = append(all, activated...)
all = append(all, deactivated...)
var toClose []chan struct{}
// deadlock with When
m.activeStatesLock.Lock()
for _, s := range all {
for k, binding := range m.indexWhen[s] {
Expand Down Expand Up @@ -875,7 +868,6 @@ func (m *Machine) SetLogger(fn Logger) {
// emit is a synchronous (blocking) emit with cancellation via a return channel.
// Can block indefinitely if the handler doesn't return or the emitter isn't
// accepting events.
// TODO sync ver
func (m *Machine) emit(name string, args A, step *TransitionStep) Result {
e := &Event{
Name: name,
Expand All @@ -890,8 +882,11 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result {
targetStates = j(t.TargetStates)
}
// call the handlers
// TODO timeout, recover
res := m.processEmitters(e)
if m.panicCaught {
res = Canceled
m.panicCaught = false
}
// check if this is an internal event
if step == nil {
return Executed
Expand All @@ -918,18 +913,11 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result {
}

func (m *Machine) processEmitters(e *Event) Result {
// TODO catch panics
// if m.PanicToException {
// // catch panics and restart the emitter
// defer m.log(LogEverything, "[end] handleEmitterLoop %s", emitter.id)
// defer m.recoverToErr(emitter, handlerMethods, binding)
// } else {
// // dispose and exit
// defer m.disposeEmitter(emitter)
// defer closeSafe(binding.Ready)
// defer m.log(LogEverything, "[end] handleEmitterLoop %s", emitter.id)
// }
for _, emitter := range m.emitters {
var emitter *emitter
for _, emitter = range m.emitters {
if m.Ctx.Err() != nil {
break
}
method := e.Name
stepID := method
if e.step != nil {
Expand All @@ -939,11 +927,6 @@ func (m *Machine) processEmitters(e *Event) Result {
if e.step == nil {
break
}
// if no handler, skip
if !emitter.methods.MethodByName(method).IsValid() {
continue
}
m.log(LogOps, "[handler] %s", method)
if e.step != nil {
emitterID := emitter.id
if len(emitterID) > 15 {
Expand All @@ -954,21 +937,53 @@ func (m *Machine) processEmitters(e *Event) Result {
stepID, method)
m.Transition.addSteps(e.step)
}
// if no handler, skip
if !emitter.methods.MethodByName(method).IsValid() {
continue
}
m.log(LogOps, "[handler] %s", method)
// call the handler
callRet := emitter.methods.MethodByName(e.Name).Call(
[]reflect.Value{reflect.ValueOf(e)})
var ret bool

go func() {
if m.PanicToException {
// catch panics and fwd
defer func() {
r := recover()
if r != nil {
m.handlerPanic <- r
}
}()
}
callRet := emitter.methods.MethodByName(e.Name).Call(
[]reflect.Value{reflect.ValueOf(e)})
if len(callRet) > 0 {
m.handlerDone <- callRet[0].Interface().(bool)
}
// handlers returns true by default
m.handlerDone <- true
}()
// wait on the result / timeout / context
select {
case <-m.Ctx.Done():
break
case <-time.After(m.HandlerTimeout):
m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID, j(m.Transition.TargetStates))
break
case ret = <-m.handlerDone:
// ok
case r := <-m.handlerPanic:
// recover partial state
m.recoverToErr(emitter, r)
ret = false
}
switch {
// returns from these handlers are ignored
case strings.HasSuffix(e.Name, "State"):
case strings.HasSuffix(e.Name, "End"):
ret = true
// returns from State and End handlers are ignored
default:
if len(callRet) > 0 {
ret = callRet[0].Interface().(bool)
if !ret {
return Canceled
}
if !ret {
return Canceled
}
}
}
Expand All @@ -978,8 +993,8 @@ func (m *Machine) processEmitters(e *Event) Result {
return Executed
}

// processEventChs sends the event to all On() dynamic handlers.
func (m *Machine) processEventChs(e *Event) Result {
// TODO deadlock
m.indexEventChLock.Lock()
defer m.indexEventChLock.Unlock()
for _, ch := range m.indexEventCh[e.Name] {
Expand Down
2 changes: 1 addition & 1 deletion pkg/machine/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ type ExceptionArgsPanic struct {
StackTrace []byte
}

// ExceptionState is a (final) handler for the Exception state.
// ExceptionState is a final entry handler for the Exception state.
// Args:
// - err error: The error that caused the Exception state.
// - panic *ExceptionArgsPanic: Optional details about the panic.
Expand Down

0 comments on commit 6343979

Please sign in to comment.