From 63439795810cbf4d1f73a049dd1f5c60b93ce9c2 Mon Sep 17 00:00:00 2001 From: pancsta <155631569+pancsta@users.noreply.github.com> Date: Thu, 18 Jan 2024 10:04:30 +0100 Subject: [PATCH] feat: bring back panic support --- examples/temporal-expense/expense_test.go | 3 +- .../temporal-fileprocessing/fileprocessing.go | 3 +- pkg/machine/machine.go | 161 ++++++++++-------- pkg/machine/misc.go | 2 +- 4 files changed, 91 insertions(+), 78 deletions(-) diff --git a/examples/temporal-expense/expense_test.go b/examples/temporal-expense/expense_test.go index 28b0562..0677bcb 100644 --- a/examples/temporal-expense/expense_test.go +++ b/examples/temporal-expense/expense_test.go @@ -233,11 +233,10 @@ func ExpenseFlow( }) // bind handlers and wait for Ready - binding, err := machine.BindHandlers(&MachineHandlers{}) + err := machine.BindHandlers(&MachineHandlers{}) if err != nil { return machine, err } - <-binding.Ready // reusable error channel errCh := machine.WhenErr(nil) diff --git a/examples/temporal-fileprocessing/fileprocessing.go b/examples/temporal-fileprocessing/fileprocessing.go index 65b6eaf..c42e929 100644 --- a/examples/temporal-fileprocessing/fileprocessing.go +++ b/examples/temporal-fileprocessing/fileprocessing.go @@ -225,11 +225,10 @@ func FileProcessingFlow(ctx context.Context, log Logger, filename string) (*am.M }) // bind handlers and wait for Ready - binding, err := machine.BindHandlers(&MachineHandlers{}) + err := machine.BindHandlers(&MachineHandlers{}) if err != nil { return machine, err } - <-binding.Ready // start it up! machine.Add(am.S{"DownloadingFile"}, am.A{"filename": filename}) diff --git a/pkg/machine/machine.go b/pkg/machine/machine.go index f6e862b..13efb19 100644 --- a/pkg/machine/machine.go +++ b/pkg/machine/machine.go @@ -20,6 +20,7 @@ import ( "runtime/debug" "strings" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -64,20 +65,22 @@ type Machine struct { // List of all the registered state names. StateNames S - emitters []*emitter - clock map[string]uint64 - cancel context.CancelFunc - logLevel LogLevel - logger Logger - // Queue mutex. - queueLock sync.Mutex + emitters []*emitter + clock map[string]uint64 + cancel context.CancelFunc + logLevel LogLevel + logger Logger + queueLock sync.RWMutex + queueProcessing atomic.Value activeStatesLock sync.Mutex - recoveryCh chan bool + panicCaught bool disposed bool indexWhen indexWhen indexStateCtx indexStateCtx indexEventCh indexEventCh indexEventChLock sync.Mutex + handlerDone chan bool + handlerPanic chan any } // New creates a new Machine instance, bound to context and modified with @@ -92,11 +95,13 @@ func New(ctx context.Context, states States, opts *Opts) *Machine { PrintExceptions: true, PanicToException: true, LogID: true, - recoveryCh: make(chan bool, 1), indexWhen: indexWhen{}, indexStateCtx: indexStateCtx{}, indexEventCh: indexEventCh{}, + handlerDone: make(chan bool), + handlerPanic: make(chan any), } + m.queueProcessing.Store(false) if opts != nil { if opts.ID != "" { m.ID = opts.ID @@ -494,12 +499,14 @@ func (m *Machine) queueMutation(mutationType MutationType, states S, args A) { m.log(LogOps, "[queue:%s] %s", mutationType, j(statesParsed)) } + m.queueLock.Lock() m.Queue = append(m.Queue, Mutation{ Type: mutationType, CalledStates: statesParsed, Args: args, Auto: false, }) + m.queueLock.Unlock() } // GetStateCtx returns a context, bound to the current clock tick of the @@ -552,7 +559,7 @@ func (m *Machine) BindHandlers(handlers any) error { // passed events happen. It's quick substitute for a predefined transition // handler, although it does not guarantee a deterministic order of execution. // ctx: optional context to dispose the emitter earlier. -// TODO dont call inside of handlers +// It's not supported to nest On() calls, as it would cause a deadlock. func (m *Machine) On(events []string, ctx context.Context) chan *Event { ch := make(chan *Event) m.indexEventChLock.Lock() @@ -573,40 +580,17 @@ func (m *Machine) On(events []string, ctx context.Context) chan *Event { } // recoverToErr recovers to the Exception state by catching panics. -// TODO refresh `m.indexWhen[]states` stateIsActive map -func (m *Machine) recoverToErr(emitter *emitter, handlerMethods reflect.Value, - binding *HandlerBinding, -) { +func (m *Machine) recoverToErr(emitter *emitter, r any) { if m.Ctx.Err() != nil { return } - r := recover() - if r == nil { - return - } + m.panicCaught = true t := m.Transition // dont double handle an exception (no nesting) if lo.Contains(t.Mutation.CalledStates, "Exception") { return } m.log(LogOps, "[recover] handling panic...") - // TODO - // defer func() { - // // re-bind the handlers - // if emitter.EventChLocked { - // emitter.EventChMutex.Unlock() - // emitter.EventChLocked = false - // } - // go m.handleEmitterLoop(emitter, handlerMethods, binding) - // m.log(LogEverything, "[recover] %s during (%s)", emitter.id, - // j(t.Mutation.CalledStates)) - // <-binding.Ready - // m.log(LogEverything, "[recover] new binding ready") - // // continue the queue - // if m.Ctx.Err() == nil { - // m.recoveryCh <- true - // } - // }() err, ok := r.(error) if !ok { err = errors.New(fmt.Sprint(r)) @@ -640,6 +624,8 @@ func (m *Machine) recoverToErr(emitter *emitter, handlerMethods reflect.Value, m.setActiveStates(t.CalledStates(), activeStates, t.IsAuto()) t.IsCompleted = true } + m.log(LogOps, "[cancel:%s] (%s) by recover", t.latestStep.ID[:5], + j(t.TargetStates)) if t.Mutation == nil { // TODO can this even happen? panic(fmt.Sprintf("no mutation panic in %s: %s", emitter.id, err)) @@ -721,13 +707,15 @@ func (m *Machine) setActiveStates(calledStates S, targetStates S, // processQueue processes the queue of mutations. It's the main loop of the // machine. func (m *Machine) processQueue() Result { + m.queueLock.RLock() lenQueue := len(m.Queue) + m.queueLock.RUnlock() // empty queue if lenQueue == 0 { return Canceled } - // acquire the mutex or log and return - if !m.queueLock.TryLock() { + // acquire the atomic lock + if !m.queueProcessing.CompareAndSwap(false, true) { label := "items" if lenQueue == 1 { label = "item" @@ -737,23 +725,29 @@ func (m *Machine) processQueue() Result { } var ret []Result - defer func() { - m.emit("queue-end", nil, nil) - m.Transition = nil - m.queueLock.Unlock() - }() // execute the queue - for len(m.Queue) > 0 { + for lenQueue > 0 { // shift the queue + m.queueLock.Lock() item := &m.Queue[0] m.Queue = m.Queue[1:] + m.queueLock.Unlock() m.Transition = newTransition(m, item) // execute the transition ret = append(ret, m.Transition.emitEvents()) m.processWhenBindings() m.processStateCtxBindings() + m.queueLock.RLock() + lenQueue = len(m.Queue) + m.queueLock.RUnlock() } + + m.emit("queue-end", nil, nil) + m.Transition = nil + // release the atomic lock + m.queueProcessing.Swap(false) + if len(ret) == 0 { return Canceled } @@ -784,7 +778,6 @@ func (m *Machine) processWhenBindings() { all = append(all, activated...) all = append(all, deactivated...) var toClose []chan struct{} - // deadlock with When m.activeStatesLock.Lock() for _, s := range all { for k, binding := range m.indexWhen[s] { @@ -875,7 +868,6 @@ func (m *Machine) SetLogger(fn Logger) { // emit is a synchronous (blocking) emit with cancellation via a return channel. // Can block indefinitely if the handler doesn't return or the emitter isn't // accepting events. -// TODO sync ver func (m *Machine) emit(name string, args A, step *TransitionStep) Result { e := &Event{ Name: name, @@ -890,8 +882,11 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result { targetStates = j(t.TargetStates) } // call the handlers - // TODO timeout, recover res := m.processEmitters(e) + if m.panicCaught { + res = Canceled + m.panicCaught = false + } // check if this is an internal event if step == nil { return Executed @@ -918,18 +913,11 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result { } func (m *Machine) processEmitters(e *Event) Result { - // TODO catch panics - // if m.PanicToException { - // // catch panics and restart the emitter - // defer m.log(LogEverything, "[end] handleEmitterLoop %s", emitter.id) - // defer m.recoverToErr(emitter, handlerMethods, binding) - // } else { - // // dispose and exit - // defer m.disposeEmitter(emitter) - // defer closeSafe(binding.Ready) - // defer m.log(LogEverything, "[end] handleEmitterLoop %s", emitter.id) - // } - for _, emitter := range m.emitters { + var emitter *emitter + for _, emitter = range m.emitters { + if m.Ctx.Err() != nil { + break + } method := e.Name stepID := method if e.step != nil { @@ -939,11 +927,6 @@ func (m *Machine) processEmitters(e *Event) Result { if e.step == nil { break } - // if no handler, skip - if !emitter.methods.MethodByName(method).IsValid() { - continue - } - m.log(LogOps, "[handler] %s", method) if e.step != nil { emitterID := emitter.id if len(emitterID) > 15 { @@ -954,21 +937,53 @@ func (m *Machine) processEmitters(e *Event) Result { stepID, method) m.Transition.addSteps(e.step) } + // if no handler, skip + if !emitter.methods.MethodByName(method).IsValid() { + continue + } + m.log(LogOps, "[handler] %s", method) // call the handler - callRet := emitter.methods.MethodByName(e.Name).Call( - []reflect.Value{reflect.ValueOf(e)}) var ret bool + + go func() { + if m.PanicToException { + // catch panics and fwd + defer func() { + r := recover() + if r != nil { + m.handlerPanic <- r + } + }() + } + callRet := emitter.methods.MethodByName(e.Name).Call( + []reflect.Value{reflect.ValueOf(e)}) + if len(callRet) > 0 { + m.handlerDone <- callRet[0].Interface().(bool) + } + // handlers returns true by default + m.handlerDone <- true + }() + // wait on the result / timeout / context + select { + case <-m.Ctx.Done(): + break + case <-time.After(m.HandlerTimeout): + m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID, j(m.Transition.TargetStates)) + break + case ret = <-m.handlerDone: + // ok + case r := <-m.handlerPanic: + // recover partial state + m.recoverToErr(emitter, r) + ret = false + } switch { - // returns from these handlers are ignored case strings.HasSuffix(e.Name, "State"): case strings.HasSuffix(e.Name, "End"): - ret = true + // returns from State and End handlers are ignored default: - if len(callRet) > 0 { - ret = callRet[0].Interface().(bool) - if !ret { - return Canceled - } + if !ret { + return Canceled } } } @@ -978,8 +993,8 @@ func (m *Machine) processEmitters(e *Event) Result { return Executed } +// processEventChs sends the event to all On() dynamic handlers. func (m *Machine) processEventChs(e *Event) Result { - // TODO deadlock m.indexEventChLock.Lock() defer m.indexEventChLock.Unlock() for _, ch := range m.indexEventCh[e.Name] { diff --git a/pkg/machine/misc.go b/pkg/machine/misc.go index c7640d4..c7ff2b4 100644 --- a/pkg/machine/misc.go +++ b/pkg/machine/misc.go @@ -273,7 +273,7 @@ type ExceptionArgsPanic struct { StackTrace []byte } -// ExceptionState is a (final) handler for the Exception state. +// ExceptionState is a final entry handler for the Exception state. // Args: // - err error: The error that caused the Exception state. // - panic *ExceptionArgsPanic: Optional details about the panic.