diff --git a/pkg/machine/machine.go b/pkg/machine/machine.go index 13efb19..cdf2958 100644 --- a/pkg/machine/machine.go +++ b/pkg/machine/machine.go @@ -246,9 +246,6 @@ func (m *Machine) GetRelationsOf(fromState string) []Relation { // listening on 2 When() channels within the same `select` to GC the 2nd one. func (m *Machine) When(states []string, ctx context.Context) chan struct{} { ch := make(chan struct{}) - if ctx == nil { - ctx = m.Ctx - } // if all active, close early if m.Is(states) { @@ -256,7 +253,6 @@ func (m *Machine) When(states []string, ctx context.Context) chan struct{} { return ch } - // TODO deadlock with processWhenBindings() m.activeStatesLock.Lock() setMap := stateIsActive{} matched := 0 @@ -274,12 +270,14 @@ func (m *Machine) When(states []string, ctx context.Context) chan struct{} { total: len(states), matched: matched, } - // TODO backport to #2 - go func() { - // dispose the binding on ctx.Done() and m.Ctx.Done() - select { - case <-ctx.Done(): - case <-m.Ctx.Done(): + // dispose with context + if ctx != nil { + go func() { + <-ctx.Done() + // GC only if needed + if m.disposed { + return + } m.activeStatesLock.Lock() for _, s := range states { if _, ok := m.indexWhen[s]; ok { @@ -287,8 +285,9 @@ func (m *Machine) When(states []string, ctx context.Context) chan struct{} { } } m.activeStatesLock.Unlock() - } - }() + }() + } + // insert the binding for _, s := range states { if _, ok := m.indexWhen[s]; !ok { m.indexWhen[s] = []*whenBinding{binding} @@ -307,9 +306,6 @@ func (m *Machine) When(states []string, ctx context.Context) chan struct{} { // listening on 2 WhenNot() channels within the same `select` to GC the 2nd one. func (m *Machine) WhenNot(states []string, ctx context.Context) chan struct{} { ch := make(chan struct{}) - if ctx == nil { - ctx = m.Ctx - } // if all active, close early if m.Not(states) { @@ -334,12 +330,14 @@ func (m *Machine) WhenNot(states []string, ctx context.Context) chan struct{} { total: len(states), matched: matched, } - // TODO backport to #2 - go func() { - // dispose the binding on ctx.Done() and m.Ctx.Done() - select { - case <-ctx.Done(): - case <-m.Ctx.Done(): + // dispose with context + if ctx != nil { + go func() { + <-ctx.Done() + // GC only if needed + if m.disposed { + return + } m.activeStatesLock.Lock() for _, s := range states { if _, ok := m.indexWhen[s]; ok { @@ -347,8 +345,9 @@ func (m *Machine) WhenNot(states []string, ctx context.Context) chan struct{} { } } m.activeStatesLock.Unlock() - } - }() + }() + } + // insert the binding for _, s := range states { if _, ok := m.indexWhen[s]; !ok { m.indexWhen[s] = []*whenBinding{binding} @@ -565,7 +564,6 @@ func (m *Machine) On(events []string, ctx context.Context) chan *Event { m.indexEventChLock.Lock() defer m.indexEventChLock.Unlock() - // emitter := m.newEmitter(fmt.Sprintf("ON %s", j(events))) if ctx == nil { ctx = m.Ctx } @@ -576,6 +574,28 @@ func (m *Machine) On(events []string, ctx context.Context) chan *Event { m.indexEventCh[e] = append(m.indexEventCh[e], ch) } } + // dispose with context + if ctx != nil { + go func() { + <-ctx.Done() + // GC only if needed + if m.disposed { + return + } + m.indexEventChLock.Lock() + for _, e := range events { + if _, ok := m.indexEventCh[e]; ok { + if len(m.indexEventCh[e]) == 1 { + // delete the whole map, as theres many possible events + delete(m.indexEventCh, e) + } else { + m.indexEventCh[e] = lo.Without(m.indexEventCh[e], ch) + } + } + } + m.indexEventChLock.Unlock() + }() + } return ch } @@ -759,9 +779,7 @@ func (m *Machine) processStateCtxBindings() { m.activeStatesLock.Lock() var toCancel []context.CancelFunc for _, s := range deactivated { - for _, cancel := range m.indexStateCtx[s] { - toCancel = append(toCancel, cancel) - } + toCancel = append(toCancel, m.indexStateCtx[s]...) delete(m.indexStateCtx, s) } m.activeStatesLock.Unlock() @@ -897,10 +915,7 @@ func (m *Machine) emit(name string, args A, step *TransitionStep) Result { if step.IsSelf { self = ":self" } - stepID := name - if step != nil { - stepID = step.ID[:5] - } + stepID := step.ID[:5] m.log(LogOps, "[cancel%s:%s] (%s) by %s", self, stepID, targetStates, name) // queue-end lacks a transition @@ -968,7 +983,8 @@ func (m *Machine) processEmitters(e *Event) Result { case <-m.Ctx.Done(): break case <-time.After(m.HandlerTimeout): - m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID, j(m.Transition.TargetStates)) + m.log(LogOps, "[cancel:%s] (%s) by timeout", stepID, + j(m.Transition.TargetStates)) break case ret = <-m.handlerDone: // ok diff --git a/pkg/machine/misc.go b/pkg/machine/misc.go index c7ff2b4..88a880c 100644 --- a/pkg/machine/misc.go +++ b/pkg/machine/misc.go @@ -296,24 +296,6 @@ func (eh *ExceptionHandler) ExceptionState(e *Event) { // utils -func isMapTrue(m map[string]bool) bool { - for _, value := range m { - if !value { - return false - } - } - return true -} - -func isMapFalse(setMap map[string]bool) bool { - for _, value := range setMap { - if value { - return false - } - } - return true -} - // j joins state names func j(states []string) string { return strings.Join(states, " ")