diff --git a/internal/dbgelib/loop.go b/internal/dbgelib/loop.go new file mode 100644 index 0000000..ef377cb --- /dev/null +++ b/internal/dbgelib/loop.go @@ -0,0 +1,7 @@ +// +build dbgelib dbgelib.loop + +package dbgelib + +import "github.com/platinasystems/dbg" + +const Loop = dbg.Func diff --git a/internal/dbgelib/noloop.go b/internal/dbgelib/noloop.go new file mode 100644 index 0000000..cdfa3f5 --- /dev/null +++ b/internal/dbgelib/noloop.go @@ -0,0 +1,7 @@ +// +build !dbgelib,!dbgelib.loop + +package dbgelib + +import "github.com/platinasystems/dbg" + +const Loop = dbg.NoOp diff --git a/internal/dbgelib/nosyslog.go b/internal/dbgelib/nosyslog.go new file mode 100644 index 0000000..5e7ff17 --- /dev/null +++ b/internal/dbgelib/nosyslog.go @@ -0,0 +1,7 @@ +// +build !dbgelib,!dbgelib.syslog + +package dbgelib + +import "github.com/platinasystems/dbg" + +const Syslog = dbg.NoOp diff --git a/internal/dbgelib/syslog.go b/internal/dbgelib/syslog.go new file mode 100644 index 0000000..b06d5f5 --- /dev/null +++ b/internal/dbgelib/syslog.go @@ -0,0 +1,7 @@ +// +build dbgelib dbgelib.syslog + +package dbgelib + +import "github.com/platinasystems/dbg" + +const Syslog = dbg.Func diff --git a/loop/cli.go b/loop/cli.go index e746c9b..ed5ef96 100644 --- a/loop/cli.go +++ b/loop/cli.go @@ -8,6 +8,7 @@ import ( "github.com/platinasystems/elib" "github.com/platinasystems/elib/cli" "github.com/platinasystems/elib/elog" + "github.com/platinasystems/elib/internal/dbgelib" "github.com/platinasystems/elib/iomux" "fmt" @@ -350,6 +351,26 @@ func (l *Loop) comment(c cli.Commander, w cli.Writer, in *cli.Input) (err error) return } +const maxEventHistory = 1000 + +var eventHistory []string + +func logEvent(s string) { + if len(eventHistory) > maxEventHistory { + eventHistory = eventHistory[1:] + } + dbgelib.Syslog.Log(s) + s = fmt.Sprintf("%v ", time.Now().Format(time.UnixDate)) + s + eventHistory = append(eventHistory, s) +} + +func (l *Loop) showLastEvents(c cli.Commander, w cli.Writer, in *cli.Input) (err error) { + for _, line := range eventHistory { + fmt.Fprintln(w, line) + } + return nil +} + func (l *Loop) cliInit() { l.RegisterEventPoller(iomux.Default) c := &l.Cli @@ -369,6 +390,13 @@ func (l *Loop) cliInit() { ShortHelp: "show events in event log", Action: l.showEventLog, }) + if dbgelib.Loop > 0 { + c.AddCommand(&cli.Command{ + Name: "show last-events", + ShortHelp: fmt.Sprintf("show last %v events", maxEventHistory), + Action: l.showLastEvents, + }) + } c.AddCommand(&cli.Command{ Name: "clear event-log", ShortHelp: "clear events in event log", diff --git a/loop/event.go b/loop/event.go index 47c357d..8c42d7b 100644 --- a/loop/event.go +++ b/loop/event.go @@ -10,8 +10,10 @@ import ( "github.com/platinasystems/elib/cpu" "github.com/platinasystems/elib/elog" "github.com/platinasystems/elib/event" + "github.com/platinasystems/elib/internal/dbgelib" "fmt" + "os" "runtime/debug" "sort" "sync" @@ -82,11 +84,12 @@ func (l *eventMain) getLoopEvent(a event.Actor, dst Noder, p elog.PointerToFirst func (l *eventMain) putLoopEvent(x *nodeEvent) { l.nodeEventPool.Put(x) } type nodeEvent struct { - l *Loop - d *Node - actor event.Actor - time cpu.Time - caller elog.Caller + l *Loop + d *Node + actor event.Actor + time cpu.Time + caller elog.Caller + prev_actor string } func (e *nodeEvent) EventTime() cpu.Time { return e.time } @@ -99,7 +102,6 @@ func (l *Loop) signalEvent(le *nodeEvent) { } } -// SignalEvent adds event whose action will be called on the next loop iteration. func (n *Node) SignalEventp(a event.Actor, dst Noder, p elog.PointerToFirstArg) { e := n.l.getLoopEvent(a, dst, p) n.l.signalEvent(e) @@ -162,16 +164,23 @@ func (e *nodeEvent) do() { e.l.putLoopEvent(e) } -func (e *nodeEvent) String() string { return e.actor.String() } +func (e *nodeEvent) String() string { + if e.actor == nil { + return "nil(was " + e.prev_actor + ")" + } + return e.actor.String() +} func (d *Node) eventDone() { n := &d.e n.s.setDone(d) + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("eventHandler %v: signalLoop(true), done executing actor %v", d.name, n.currentEvent.e)) + } n.currentEvent.e = nil n.activeCount-- n.log(d, event_elog_node_signal_done) n.ft.signalLoop(true) - //fmt.Printf("signalLoop(true)================\n") //debug print } func (l *Loop) eventHandler(r Noder) { @@ -184,7 +193,9 @@ func (l *Loop) eventHandler(r Noder) { return } err = fmt.Errorf("%v: %v", d.name, err) - fmt.Printf("eventHandler: panic %v\n", err) //debug print + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("eventHandler %v: panic %v", d.name, err)) + } elog.Panic(err) l.Panic(err, debug.Stack()) d.eventDone() @@ -198,44 +209,37 @@ func (l *Loop) eventHandler(r Noder) { //wait on average every 7 seconds even if no activity. //Use waitLoop_with_timeout for better debuggability and set t to 30 seconds in case something hung and wouldn't exit //goes will exit (i.e. crash) with error message if timed out - //n.ft.waitLoop() - if true { - actor_name := "nil" + { t := 30 * time.Second - if n.currentEvent.e != nil { - if n.currentEvent.e.actor != nil { - actor_name = n.currentEvent.e.actor.String() - } + if n.currentEvent.e == nil { + n.ft.waitLoop_with_timeout(t, d.name+"(eventHandler)", "empty nodeEvent", n.rxEvents) + } else { + n.ft.waitLoop_with_timeout(t, d.name+"(eventHandler)", fmt.Sprintf("%v", n.currentEvent.e), n.rxEvents) } - n.ft.waitLoop_with_timeout(t, d.name+"(eventHandler)", actor_name, len(n.rxEvents)) } n.log(d, event_elog_node_wake) - e := <-n.rxEvents + var e *nodeEvent + doneGetEvent := false + for !doneGetEvent { + select { + case e = <-n.rxEvents: + doneGetEvent = true + case <-time.After(1 * time.Second): + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("eventHandler get event timed out len(rxEvent) = %v", len(n.rxEvents))) + } + } + } if poller_panics && e.d != d { - panic(fmt.Errorf("expected node %s got %s: %p %s", d.name, e.d.name, e, e.actor.String())) + dbgelib.Loop.Logf("eventHandler panic expected node %s got %s: %p %v", d.name, e.d.name, e, e) + panic(fmt.Errorf("expected node %s got %s: %p %v", d.name, e.d.name, e, e)) } n.currentEvent.e = e - if false { //debug print - fmt.Printf("eventHandler do Action\n") - actor_name := "nil" - if n.currentEvent.e != nil { - if n.currentEvent.e.actor != nil { - actor_name = n.currentEvent.e.actor.String() - } - } - fmt.Printf("eventHandler: node: %v, actor %v, ch length = %d\n", d.name, actor_name, len(n.rxEvents)) + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("eventHandler %v: execute actor %v", d.name, e)) } e.do() d.eventDone() - if false { //debug print - actor_name := "nil" - if n.currentEvent.e != nil { - if n.currentEvent.e.actor != nil { - actor_name = n.currentEvent.e.actor.String() - } - } - fmt.Printf(" done: node: %v, actor %v, ch length = %d\n", d.name, actor_name, len(n.rxEvents)) - } } } @@ -248,10 +252,7 @@ func (e *Event) String() string { if e.e == nil { return "nil nodeEvent" } - if e.e.actor == nil { - return "nil actor" - } - return e.e.actor.String() + return fmt.Sprintf("%v", e.e) } func (e *Event) Actor() event.Actor { @@ -263,6 +264,11 @@ func (e *Event) Actor() event.Actor { return nil } +func (x *Event) Name() (actor_name string) { + actor_name = fmt.Sprintf("%v", x) + return +} + type EventActor interface { getLoopEvent() *Event } @@ -270,6 +276,7 @@ type EventActor interface { func (e *Event) getLoopEvent() *Event { return e } func (n *Node) CurrentEvent() (e *Event) { x := &n.e.currentEvent + // return Event only if it has a none nil nodeEvent if x.e != nil { e = x } @@ -290,6 +297,9 @@ func (x *Event) Suspend() { n.log(d, event_elog_suspend) n.eventStats.current.suspends++ t0 := cpu.TimeNow() + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("doEvents %v: signalLoop(false), suspend node, last event: %v", d.name, x.e)) + } n.ft.signalLoop(false) n.ft.waitLoop() // Don't charge node for time suspended. @@ -298,32 +308,14 @@ func (x *Event) Suspend() { n.log(d, event_elog_resumed) } -func (x *Event) Name() (actor_name string) { - actor_name = "nil" - if x.e != nil { - if x.e.actor != nil { - actor_name = x.e.actor.String() - } - } - return -} - // An eventNode has fromToNode struct, ft, which has a toNode channel (chan struct{}) and a fromNode channel (chan bool). // signalLoop(v bool) send v to the fromNode channel; waitNode() returns the element from fromNode. Use signalLoop(true) to signal nodeEvent is done. // signalNode() sends empty struct to toNode; waitLoop() waits in infinite loop for a signal from toNode. Use signalNode() to stop waitLoop. // doEvents() sends signalNode() to all active nodes // func (l *Loop) Run() is the infinite loop that does doEvents() continuously -// func (l *Loop) doPollers() has has a call to signalNode() func (x *Event) SuspendWTimeout(t time.Duration) { d := x.e.d //d is the *Node for event x, e here is the nodeEvent n := &d.e //e here is the eventNode for d - if false { //debug print - actor_name := "nil" - if x.e.actor != nil { - actor_name = x.e.actor.String() - } - fmt.Printf("SuspendWTimeout() point 1 node %s; actor %s; rxEvent ch length=%d \n", d.name, actor_name, len(n.rxEvents)) - } if !n.isActive() { panic("event.go SuspendWTimeout() suspending inactive node") } @@ -334,20 +326,16 @@ func (x *Event) SuspendWTimeout(t time.Duration) { n.log(d, event_elog_suspend) n.eventStats.current.suspends++ t0 := cpu.TimeNow() + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("doEvents %v: signalLoop(false), suspend node, last event: %v", d.name, x.e)) + } n.ft.signalLoop(false) { - actor_name := "nil" - actor_name_x := "nil" - if n.currentEvent.e != nil { - if n.currentEvent.e.actor != nil { - actor_name = n.currentEvent.e.actor.String() - } - } - if x.e.actor != nil { - actor_name_x = x.e.actor.String() //should be the same as the currentEvent actor? + if n.currentEvent.e == nil { + n.ft.waitLoop_with_timeout(t, d.name+"(suspend)", "empty nodeEvent", n.rxEvents) + } else { + n.ft.waitLoop_with_timeout(t, d.name+"(suspend)", n.currentEvent.e.String(), n.rxEvents) } - //goes will exit (i.e. crash) with error message if timed out - n.ft.waitLoop_with_timeout(t, d.name+"(suspend)", actor_name+" or "+actor_name_x, len(n.rxEvents)) } // Don't charge node for time suspended. @@ -357,7 +345,10 @@ func (x *Event) SuspendWTimeout(t time.Duration) { } func (e *nodeEvent) isResume() bool { return e.actor == nil } -func (e *nodeEvent) setResume() { e.actor = nil } +func (e *nodeEvent) setResume() { + e.prev_actor = e.String() + e.actor = nil +} func (x *Event) Resume() (ok bool) { e := x.e @@ -371,7 +362,19 @@ func (x *Event) Resume() (ok bool) { } n.log(d, event_elog_queue_resume) e.setResume() - d.l.events <- e + for { + select { + case d.l.events <- e: + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("doEvents %v: resume node, last event: %v", e.d.name, e.prev_actor)) + } + return + case <-time.After(1 * time.Second): + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("resume: put event %v timed out len(d.l.events) = %v", e.prev_actor, len(d.l.events))) + } + } + } return } @@ -405,6 +408,7 @@ func (d *Node) maybeStartEventHandler() { n.activeIndex = ^uint(0) n.ft.init() elog.F("loop starting event handler %v", d.elogNodeName) + dbgelib.Loop.Logf("***start eventHandler for %v", d.name) go l.eventHandler(d.noder) }) } @@ -415,7 +419,7 @@ func (l *Loop) eventPoller(p EventPoller) { if elog.Enabled() { if err := recover(); err != nil { elog.Panic(err) - err = fmt.Errorf("event-poller: %v", err) + err = fmt.Errorf("event-poller panic: %v", err) elog.Panic(err) l.Panic(err, debug.Stack()) } @@ -449,7 +453,20 @@ func (e *nodeEvent) EventAction() { if n.activeCount == 1 { d.l.eventMain.addActive(d) } - n.rxEvents <- e + for { + select { + case n.rxEvents <- e: + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("doEvents %v: put event %v into rxEvents channel", e.d.name, e)) + } + return + case <-time.After(1 * time.Second): + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("doEvents %v: put event %v timed out len(rxEvent) = %v", e.d.name, e, len(n.rxEvents))) + } + } + } + } func (m *eventMain) doNodeEvent(e *nodeEvent) (quit *quitEvent) { @@ -460,9 +477,6 @@ func (m *eventMain) doNodeEvent(e *nodeEvent) (quit *quitEvent) { if e.isResume() { m.addActive(e.d) e.resume() - if false { //debug print - fmt.Printf("doNodeEvent resume active node %v\n", e.d.name) - } } else { e.EventAction() } @@ -470,27 +484,21 @@ func (m *eventMain) doNodeEvent(e *nodeEvent) (quit *quitEvent) { } func (l *Loop) doEventNoWait() (quit *quitEvent) { - //fmt.Printf("doEventNoWait\n") //debug print select { default: // nothing to do case e := <-l.events: quit = l.doNodeEvent(e) } - //fmt.Printf("doEventNoWait, done quit=%v\n", quit) //debug print return } func (l *Loop) doEventWait() (quit *quitEvent, timeout bool) { - //fmt.Printf("doEventWait\n") //debug print m := &l.eventMain m.event_timer_elog(event_timer_elog_waiting, m.timerDuration) select { case e := <-l.events: quit = l.doNodeEvent(e) - //fmt.Printf("doEventWait, normal quit=%v\n", quit) //debug print case <-m.timer.C: - //fmt.Printf("doEventWait, timer expired\n") //debug print - // Log difference between time now and timer cpu time. m.event_timer_elog(event_timer_elog_timeout, l.duration(m.timerCpuTime)) m.timer.Reset(maxDuration) @@ -516,8 +524,6 @@ func (l *Loop) doEvents() (quitLoop bool) { // Try waiting if we have no active nodes. if len(m.activeNodes) == 0 { - //fmt.Printf("doEvents no active nodes, try waiting\n") //debug print - // Try to change active poller state to event wait. // This can and does return false if an active poller comes along racing with our call. if _, didWait = l.activePollerState.setEventWait(); didWait { @@ -551,7 +557,6 @@ func (l *Loop) doEvents() (quitLoop bool) { quit = l.doEventNoWait() } - //fmt.Printf("doEvents handle expired timed events\n") //debug print // Handle expired timed events. tp := &l.timedEventPool if waitTimeout { @@ -560,6 +565,8 @@ func (l *Loop) doEvents() (quitLoop bool) { tp.AdvanceAdd(nextTime, &ev) l.timedEventPoolLock.Unlock() if poller_panics && waitTimeout && len(ev) == 0 { + dbgelib.Loop.Log("doEvent wait timeout panic") + os.Exit(3) panic("wait timeout but not events expired") } if len(ev) > 0 { @@ -575,13 +582,14 @@ func (l *Loop) doEvents() (quitLoop bool) { } } - //fmt.Printf("doEvents look for active nodes\n") //debug print // Signal all active nodes to start. for _, d := range m.activeNodes { n := &d.e n.log(d, event_elog_start) n.ft.signalNode() - //fmt.Printf("signalNode active nodes %v to start=============\n", d.name) //debug print + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("doEvents %v: signalNode, current actor %v", d.name, n.currentEvent.e)) + } } // Wait for all event active nodes to finish. @@ -591,17 +599,14 @@ func (l *Loop) doEvents() (quitLoop bool) { n.log(d, event_elog_wait) //Use a timed wait instead of indefinite wait. Assuming no events takes more than t seconds to do //goes will exit (i.e. crash) with error message if timed out - //nodeEventDone := n.ft.waitNode() var nodeEventDone bool - if true { - actor_name := "nil" + { t := 30 * time.Second if n.currentEvent.e != nil { - if n.currentEvent.e.actor != nil { - actor_name = n.currentEvent.e.actor.String() - } + nodeEventDone = n.ft.waitNode_with_timeout(t, d.name+"(doEvents)", fmt.Sprintf("%v", n.currentEvent.e), n.rxEvents) + } else { + nodeEventDone = n.ft.waitNode_with_timeout(t, d.name+"(doEvents)", "empty nodeEvent", n.rxEvents) } - nodeEventDone = n.ft.waitNode_with_timeout(t, d.name+"(doEvents)", actor_name, len(n.rxEvents)) } // Inactivate nodes which have no more queued events or are suspended. if !nodeEventDone || n.activeCount == 0 { @@ -617,6 +622,9 @@ func (l *Loop) doEvents() (quitLoop bool) { m.inactiveNodes = m.inactiveNodes[:0] } + if l.isPanic() { + dbgelib.Loop.Logf("doEvents panic %v", l.panicErr) + } quitLoop = (quit != nil && quit.Type == quitEventExit) || l.isPanic() return } @@ -802,9 +810,14 @@ var ( ErrInterrupt = &quitEvent{Type: quitEventInterrupt} ) -func (e *quitEvent) String() string { return quitEventTypeStrings[e.Type] } -func (e *quitEvent) Error() string { return e.String() } -func (e *quitEvent) EventAction() {} +func (e *quitEvent) String() string { + if int(e.Type) < len(quitEventTypeStrings) { + return quitEventTypeStrings[e.Type] + } + return fmt.Sprintf("unknown quitEventType %v", e.Type) +} +func (e *quitEvent) Error() string { return e.String() } +func (e *quitEvent) EventAction() {} func (l *Loop) Quit() { e := l.getLoopEvent(ErrQuit, nil, elog.PointerToFirstArg(&l)) l.signalEvent(e) @@ -846,7 +859,7 @@ func (l *Loop) showRuntimeEvents(w cli.Writer) (err error) { dt := time.Since(l.timeLastRuntimeClear).Seconds() eventsPerSec := float64(s.vectors) / dt clocksPerEvent := float64(s.clocks) / float64(s.vectors) - fmt.Fprintf(w, "Events: %d, Events/sec: %.2e, Clocks/event: %.2f\n", + fmt.Fprintf(w, "Events: %d, Events/sec: %.2e, Clocks/event: %.2f", s.vectors, eventsPerSec, clocksPerEvent) } diff --git a/loop/loop.go b/loop/loop.go index dbd3543..bf40da6 100644 --- a/loop/loop.go +++ b/loop/loop.go @@ -8,6 +8,7 @@ import ( "github.com/platinasystems/elib/cpu" "github.com/platinasystems/elib/dep" "github.com/platinasystems/elib/elog" + "github.com/platinasystems/elib/internal/dbgelib" "fmt" "io" @@ -105,6 +106,7 @@ func (l *Loop) Seconds(t cpu.Time) float64 { return float64(t) * l.secsPerCycle func (l *Loop) startDataPoller(r inLooper) { n := r.GetNode() n.ft.init() + dbgelib.Loop.Logf("***start dataPoller for %v", n.name) go l.dataPoll(r) } func (l *Loop) startPollers() { @@ -230,6 +232,7 @@ func (l *Loop) Run() { } for { if quit := l.doEvents(); quit { + dbgelib.Loop.Log("quit loop !!!!!!!!!!!!") break } l.doPollers() diff --git a/loop/poller.go b/loop/poller.go index f52ca57..066cb8d 100644 --- a/loop/poller.go +++ b/loop/poller.go @@ -8,6 +8,7 @@ import ( "github.com/platinasystems/elib" "github.com/platinasystems/elib/cpu" "github.com/platinasystems/elib/elog" + "github.com/platinasystems/elib/internal/dbgelib" "fmt" "os" @@ -18,6 +19,9 @@ import ( "time" ) +var numWaitLoop uint32 +var numWaitNode uint32 + type fromToNode struct { toNode chan struct{} fromNode chan bool @@ -28,44 +32,74 @@ func (x *fromToNode) init() { x.fromNode = make(chan bool, 1) } -func (x *fromToNode) signalNode() { x.toNode <- struct{}{} } +func (x *fromToNode) signalNode() { + x.toNode <- struct{}{} +} func (x *fromToNode) waitNode() bool { return <-x.fromNode } -func (x *fromToNode) waitNode_with_timeout(d time.Duration, node_name string, actor_name string, ch_length int) bool { - //fmt.Printf("poller.go waitNode_with_timeout() start, node: %s, actor: %s, channel length = %d\n", node_name, actor_name, ch_length) //debug print - select { - case done := <-x.fromNode: - //fmt.Printf("poller.go waitNode_with_timeout() done, node: %s, actor: %s, channel length = %d\n", node_name, actor_name, ch_length) - return done - case <-time.After(d): - if true { - fmt.Printf("poller.go waitNode_with_timeout() timeout, node: %s, actor: %s, channel length = %d, os.Exit(3)\n", node_name, actor_name, ch_length) - } - if true { - os.Exit(3) + +// d = 0 means wait infinite time +func (x *fromToNode) waitNode_with_timeout(d time.Duration, node_name string, actor_name string, rxEvents chan *nodeEvent) bool { + numWaitNode++ + t := 1 * time.Second + s := t + if d == 0 { + t = 1 * time.Minute + } + for { + select { + case done := <-x.fromNode: + numWaitNode-- + return done + case <-time.After(t): + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("%v %v waited %v out of %v, num waitNode = %v", node_name, actor_name, s, d, numWaitNode)) + } + s += t + if s > d && d != 0 { + fmt.Printf("waitNode timeout %v exceeded, node: %s, actor: %s, channel length = %d\n", d, node_name, actor_name, len(rxEvents)) + fmt.Printf("eventHistory of %v:\n", maxEventHistory) + // dump last maxEventHistory logs + for _, line := range eventHistory { + fmt.Println(line) + } + os.Exit(3) + } } } return false } func (x *fromToNode) signalLoop(v bool) { x.fromNode <- v } -func (x *fromToNode) waitLoop() { <-x.toNode } -func (x *fromToNode) waitLoop_with_timeout(d time.Duration, node_name string, actor_name string, ch_length int) { - //fmt.Printf("poller.go waitLoop_with_timeout() start, node: %s, actor: %s, channel length = %d\n", node_name, actor_name, ch_length) - start := time.Now() - select { - case <-x.toNode: - if false { // debug print - t := time.Since(start) - fmt.Printf("poller.go waitLoop_with_timeout() done, node: %s, actor: %s, channel length = %d, in %s\n", node_name, actor_name, ch_length, t.String()) - } - return - case <-time.After(d): - //panic(" poller.go waitLoop_with_timeout() timeout") //doesn't seem enough to exit vnet, still hangs - if true { - fmt.Printf("poller.go waitLoop_with_timeout() timeout, node: %s, actor: %s, channel length = %d, os.Exit(3)\n", node_name, actor_name, ch_length) - } - if true { - os.Exit(3) + +func (x *fromToNode) waitLoop() { <-x.toNode } + +// d = 0 means wait infinite time +func (x *fromToNode) waitLoop_with_timeout(d time.Duration, node_name string, actor_name string, rxEvents chan *nodeEvent) { + numWaitLoop++ + t := 1 * time.Second + s := t + if d == 0 { + t = 1 * time.Minute + } + for { + select { + case <-x.toNode: + numWaitLoop-- + return + case <-time.After(t): + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("waitLoop %v %v waited %v out of %v, num waitLoop = %v", node_name, actor_name, s, d, numWaitLoop)) + } + s += t + if s > d && d != 0 { + fmt.Printf("waitLoop timeout %v exceeded, node: %s, actor: %s, channel length = %d\n", d, node_name, actor_name, len(rxEvents)) + fmt.Printf("eventHistory of %v:\n", maxEventHistory) + // dump last maxEventHistory logs + for _, line := range eventHistory { + fmt.Println(line) + } + os.Exit(3) + } } } } @@ -299,6 +333,9 @@ func (l *Loop) AddSuspendActivity(in *In, i int, lim *SuspendLimits) { if poll_active { a.toLoop <- struct{}{} } else { + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("signalLoop(false) from %v AddSuspendActivity", n.name)) + } n.ft.signalLoop(false) } // Wait for continue (resume) signal from main loop. @@ -306,16 +343,11 @@ func (l *Loop) AddSuspendActivity(in *In, i int, lim *SuspendLimits) { if poll_active { <-a.fromLoop } else { - n.ft.waitLoop() - if false { //debug print - actor_name := "nil" - t := 10 * time.Second - if n.CurrentEvent().e != nil { - if n.CurrentEvent().e.actor != nil { - actor_name = n.CurrentEvent().e.actor.String() - } - } - n.ft.waitLoop_with_timeout(t, n.name+"(AddSuspendActivity)", actor_name, len(n.e.rxEvents)) + t := 0 * time.Second // means wait infinite time + if n.CurrentEvent() == nil { + n.ft.waitLoop_with_timeout(t, n.name+"(AddSuspendActivity)", "empty nodeEvent", n.e.rxEvents) + } else { + n.ft.waitLoop_with_timeout(t, n.name+"(AddSuspendActivity)", fmt.Sprintf("%v", n.CurrentEvent().e), n.e.rxEvents) } } // Don't charge node for time suspended. @@ -591,6 +623,7 @@ func (a *activePoller) dataPoll(l *Loop) { defer func() { if elog.Enabled() { if err := recover(); err != nil { + fmt.Printf("activePoller dataPoll exit due to panic poller%d: %v\n", a.index, err) elog.Panic(fmt.Errorf("poller%d: %v", a.index, err)) panic(err) } @@ -617,26 +650,25 @@ func (l *Loop) dataPoll(p inLooper) { if elog.Enabled() { if err := recover(); err != nil { err = fmt.Errorf("%s: %v", n.name, err) + fmt.Printf("loop dataPoll exited due to panic %v\n", err) elog.Panic(err) l.Panic(err, debug.Stack()) + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("signalLoop(true) from dataPoll panic")) + } n.ft.signalLoop(true) } } }() for { n.poller_elog(poller_elog_node_wait) - n.ft.waitLoop() - if false { //debug print - actor_name := "nil" - t := 10 * time.Second - if n.CurrentEvent() != nil { - if n.CurrentEvent().e != nil { - if n.CurrentEvent().e.actor != nil { - actor_name = n.CurrentEvent().e.actor.String() - } - } + { + t := 0 * time.Second // means wait infinite time + if n.CurrentEvent() == nil { + n.ft.waitLoop_with_timeout(t, n.name+"(dataPoll)", "empty nodeEvent", n.e.rxEvents) + } else { + n.ft.waitLoop_with_timeout(t, n.name+"(dataPoll)", fmt.Sprintf("%v", n.CurrentEvent().e), n.e.rxEvents) } - n.ft.waitLoop_with_timeout(t, n.name+"(dataPoll)", actor_name, len(n.e.rxEvents)) } n.poller_elog(poller_elog_node_wake) ap := n.getActivePoller() @@ -649,6 +681,9 @@ func (l *Loop) dataPoll(p inLooper) { ap.pollerStats.update(nVec, t0) l.pollerStats.update(nVec) n.poller_elog(poller_elog_node_signal) + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("signalLoop(true) from %v dataPoll", n.name)) + } n.ft.signalLoop(true) } } @@ -681,8 +716,22 @@ func (l *Loop) doPollers() { // Start poller who will be blocked waiting on fromLoop. if poll_active { - a.fromLoop <- n.noder.(inLooper) + done := false + for !done { + select { + case a.fromLoop <- n.noder.(inLooper): + done = true + case <-time.After(1 * time.Second): + // shouldn't hit this + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("DEBUG doPoller %v start timed out", n.name)) + } + } + } } else { + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("signalNode from doPoller poller %v, i=%v", n.name, i)) + } n.ft.signalNode() } } @@ -698,18 +747,24 @@ func (l *Loop) doPollers() { if n.s.is_polling { n.poller_elog(poller_elog_wait) if poll_active { - <-a.toLoop - } else { - done = n.ft.waitNode() - if false { - actor_name := "nil" - t := 3 * time.Second - if n.e.currentEvent.e != nil { - if n.e.currentEvent.e.actor != nil { - actor_name = n.e.currentEvent.e.actor.String() + done := false + for !done { + select { + case <-a.toLoop: + done = true + case <-time.After(1 * time.Second): + // shouldn't hit this + if dbgelib.Loop > 0 { + logEvent(fmt.Sprintf("DEBUG poll %v finish timed out", n.name)) } } - done = n.ft.waitNode_with_timeout(t, n.name+"(doPollers)", actor_name, len(n.e.rxEvents)) + } + } else { + t := 0 * time.Second // wait for infinite time + if n.CurrentEvent() == nil { + done = n.ft.waitNode_with_timeout(t, n.name+"(doPollers)", "empty nodeEvent", n.e.rxEvents) + } else { + done = n.ft.waitNode_with_timeout(t, n.name+"(doPollers)", fmt.Sprintf("%v", n.CurrentEvent().e), n.e.rxEvents) } } n.poller_elog(poller_elog_wait_done)