diff --git a/pkg/telemetry/prometheus/prometheus.go b/pkg/telemetry/prometheus/prometheus.go index 9312432..ca3d325 100644 --- a/pkg/telemetry/prometheus/prometheus.go +++ b/pkg/telemetry/prometheus/prometheus.go @@ -1,4 +1,4 @@ -// Package prometheus provides Prometheus metrics for asyncmachine. +// Package prometheus provides Prometheus metrics for asyncam. // The metrics are collected from the machine's transitions and states. // // Exported metrics: @@ -13,18 +13,100 @@ import ( "sync" "time" - "github.com/pancsta/asyncmachine-go/pkg/machine" "github.com/prometheus/client_golang/prometheus" + + am "github.com/pancsta/asyncmachine-go/pkg/machine" ) -// Metrics is a set of Prometheus metrics for a machine. +type promTracer struct { + am.NoOpTracer + + m *Metrics + txStartTime time.Time + prevTime uint64 +} + +func (t *promTracer) StructChange(machine *am.Machine, old am.Struct) { + // TODO refresh state and relation metrics +} + +func (t *promTracer) TransitionInit(tx *am.Transition) { + t.txStartTime = time.Now() +} + +func (t *promTracer) TransitionEnd(tx *am.Transition) { + if t.m.closed || !tx.Accepted { + return + } + + statesIndex := tx.Machine.StateNames() + + // try to refresh, then lock + t.m.Refresh() + t.m.mx.Lock() + defer t.m.mx.Unlock() + + queueLen := tx.QueueLen + t.m.queueSize = uint64(queueLen) + t.m.queueSizeLen++ + t.m.stepsAmount += uint64(len(tx.Steps)) + t.m.stepsAmountLen++ + // TODO log slow txs (Opts and default to 1ms) + t.m.txTime += uint64(time.Since(t.txStartTime).Milliseconds()) + t.m.txTimeLen++ + + // executed handlers + handlersCount := 0 + for _, step := range tx.Steps { + if step.Type == am.StepHandler { + handlersCount++ + } + } + t.m.handlersAmount += uint64(handlersCount) + t.m.handlersAmountLen++ + + // tx states + added, removed, touched := getTxStates(tx, statesIndex) + t.m.statesAdded += uint64(len(added)) + t.m.statesAddedLen++ + t.m.statesRemoved += uint64(len(removed)) + t.m.statesRemovedLen++ + t.m.statesTouched += uint64(len(touched)) + + // time sum + currTime := tx.Machine.TimeSum(nil) + t.m.txTick += currTime - t.prevTime + t.m.txTickLen++ + t.prevTime = currTime + + // active / inactive states + active := 0 + inactive := 0 + for _, t := range tx.TimeBefore { + if am.IsActiveTick(t) { + active++ + } else { + inactive++ + } + } + t.m.statesActiveAmount += uint64(active) + t.m.statesActiveAmountLen++ + t.m.statesInactiveAmount += uint64(inactive) +} + +func (t *promTracer) ExceptionState(err error) { + t.m.ExceptionsCount.Inc() +} + +// Metrics is a set of Prometheus metrics for a am. type Metrics struct { mx sync.Mutex closed bool lastUpdate time.Time interval time.Duration + tracer am.Tracer - ////// mach definition + // //// mach definition // number of registered states StatesAmount prometheus.Gauge @@ -35,7 +117,7 @@ type Metrics struct { // number of state referenced by relations for all registered states RefStatesAmount prometheus.Gauge - ////// tx data + // //// tx data // current number of queued transitions (per transition) QueueSize prometheus.Gauge @@ -77,7 +159,7 @@ type Metrics struct { exceptionsCount uint64 exceptionsCountLen uint - ////// stats + // //// stats // steps per transition StepsAmount prometheus.Gauge @@ -95,14 +177,14 @@ type Metrics struct { txTimeLen uint } -func newMetrics(mach *machine.Machine, interval time.Duration) *Metrics { - machID := machine.NormalizeID(mach.ID) +func newMetrics(mach *am.Machine, interval time.Duration) *Metrics { + machID := am.NormalizeID(mach.ID) return &Metrics{ interval: interval, lastUpdate: time.Now(), - ///// mach definition + // /// mach definition StatesAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "states_amount", @@ -124,7 +206,7 @@ func newMetrics(mach *machine.Machine, interval time.Duration) *Metrics { Namespace: "mach", }), - ///// tx data + // /// tx data QueueSize: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "queue_size", @@ -282,16 +364,14 @@ func average(sum uint64, sampleLen uint) float64 { } // TransitionsToPrometheus bind transitions to Prometheus metrics. -// TODO debounce -// TODO bind via the tracer API (so it can be disabled/enabled) func TransitionsToPrometheus( - mach *machine.Machine, interval time.Duration, + mach *am.Machine, interval time.Duration, ) *Metrics { metrics := newMetrics(mach, interval) // state & relations // TODO bind to EventStatesChange - metrics.StatesAmount.Set(float64(len(mach.StateNames))) + metrics.StatesAmount.Set(float64(len(mach.StateNames()))) relCount := 0 stateRefCount := 0 for _, state := range mach.GetStruct() { @@ -315,131 +395,37 @@ func TransitionsToPrometheus( } metrics.RelAmount.Set(float64(relCount)) metrics.RefStatesAmount.Set(float64(stateRefCount)) - statesIndex := mach.StateNames[:] - - // TODO extract - go func() { - - // bind to transitions - var txStartTime time.Time - txInitCh := mach.OnEvent(machine.S{machine.EventTransitionInit}, nil) - txEndCh := mach.OnEvent(machine.S{machine.EventTransitionEnd}, nil) - errStateCh := mach.OnEvent(machine.S{machine.EventException}, nil) - prevTime := mach.TimeSum(nil) - - // consume the data from the channels - for mach.Ctx.Err() == nil { - select { - - case <-mach.Ctx.Done(): - break - - case <-errStateCh: - metrics.ExceptionsCount.Inc() - - case <-txInitCh: - txStartTime = time.Now() - - case event := <-txEndCh: - if metrics.closed { - return - } - - tx := event.Args["transition"].(*machine.Transition) - - // skip canceled txs - if !tx.Accepted { - continue - } - - // try to refresh, then lock - metrics.Refresh() - metrics.mx.Lock() - - queueLen := event.Args["queue_len"].(int) - metrics.queueSize = uint64(queueLen) - metrics.queueSizeLen++ - metrics.stepsAmount += uint64(len(tx.Steps)) - metrics.stepsAmountLen++ - // TODO log slow txs (Opts and default to 1ms) - metrics.txTime += uint64(time.Since(txStartTime).Milliseconds()) - metrics.txTimeLen++ - - // executed handlers - handlersCount := 0 - for _, step := range tx.Steps { - if step.Type == machine.StepHandler { - handlersCount++ - } - } - metrics.handlersAmount += uint64(handlersCount) - metrics.handlersAmountLen++ - - // tx states - added, removed, touched := getTxStates(tx, statesIndex) - metrics.statesAdded += uint64(len(added)) - metrics.statesAddedLen++ - metrics.statesRemoved += uint64(len(removed)) - metrics.statesRemovedLen++ - metrics.statesTouched += uint64(len(touched)) - - // time sum - currTime := mach.TimeSum(nil) - metrics.txTick += currTime - prevTime - metrics.txTickLen++ - prevTime = currTime - - // active / inactive states - active := 0 - inactive := 0 - for _, t := range tx.ClocksBefore { - if machine.IsActiveTick(t) { - active++ - } else { - inactive++ - } - } - metrics.statesActiveAmount += uint64(active) - metrics.statesActiveAmountLen++ - metrics.statesInactiveAmount += uint64(inactive) - - // unlock - metrics.mx.Unlock() - } - } - defer metrics.Close() - }() + mach.Tracers = append(mach.Tracers, &promTracer{m: metrics}) return metrics } // TODO move to helpers func getTxStates( - tx *machine.Transition, index machine.S, -) (added machine.S, removed machine.S, touched machine.S) { + tx *am.Transition, index am.S, +) (added am.S, removed am.S, touched am.S) { - before := tx.ClocksBefore - after := tx.ClocksAfter + before := tx.TimeBefore + after := tx.TimeAfter - is := func(clocks machine.Clocks, i string) bool { - // TODO use T - return machine.IsActiveTick(clocks[i]) + is := func(time am.Time, i int) bool { + return am.IsActiveTick(time[i]) } - for _, name := range index { - if is(before, name) && !is(after, name) { + for i, name := range index { + if is(before, i) && !is(after, i) { removed = append(removed, name) - } else if !is(before, name) && is(after, name) { + } else if !is(before, i) && is(after, i) { added = append(added, name) - } else if before[name] != after[name] { + } else if before[i] != after[i] { // treat multi states as added added = append(added, name) } } // touched - touched = machine.S{} + touched = am.S{} for _, step := range tx.Steps { if step.FromState != "" { touched = append(touched, step.FromState) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index c1eed6a..0b1b943 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -21,10 +21,12 @@ import ( ) // ///// ///// ///// + // ///// AM-DBG + // ///// ///// ///// -const DbgHost = "localhost:6831" +const DbgAddr = "localhost:6831" // DbgMsg is the interface for the messages to be sent to the am-dbg server. type DbgMsg interface { @@ -58,7 +60,8 @@ type DbgMsgTx struct { // Transition ID ID string // map of positions from the index to state clocks - Clocks am.T + // TODO refac to TimeAfter, re-gen all the assets + Clocks am.Time // result of the transition Accepted bool // mutation type @@ -75,16 +78,19 @@ type DbgMsgTx struct { IsAuto bool // queue length at the start of the transition Queue int - // don't send this over the wire + // Time is human time. Don't send this over the wire. + // TODO refac to HTime, re-gen all the assets Time *time.Time } func (d *DbgMsgTx) Clock(statesIndex am.S, state string) uint64 { idx := lo.IndexOf(statesIndex, state) return d.Clocks[idx] + } func (d *DbgMsgTx) Is(statesIndex am.S, states am.S) bool { + for _, state := range states { idx := lo.IndexOf(statesIndex, state) //nolint:typecheck if idx == -1 { @@ -92,6 +98,7 @@ func (d *DbgMsgTx) Is(statesIndex am.S, states am.S) bool { panic("unknown state: " + state) } + // TODO fix out of range panic, coming from am-dbg (not telemetry) if !am.IsActiveTick(d.Clocks[idx]) { return false } @@ -114,7 +121,8 @@ func (d *DbgMsgTx) TimeSum() uint64 { } type dbgClient struct { - client *rpc.Client + addr string + rpc *rpc.Client } func newDbgClient(url string) (*dbgClient, error) { @@ -124,13 +132,15 @@ func newDbgClient(url string) (*dbgClient, error) { return nil, err } - return &dbgClient{client: client}, nil + return &dbgClient{rpc: client}, nil } func (c *dbgClient) sendMsgTx(msg *DbgMsgTx) error { var reply string // TODO use Go() to not block - err := c.client.Call("RPCServer.DbgMsgTx", msg, &reply) + // DEBUG + // fmt.Printf("sendMsgTx %v\n", msg.CalledStates) + err := c.rpc.Call("RPCServer.DbgMsgTx", msg, &reply) if err != nil { return err } @@ -141,7 +151,7 @@ func (c *dbgClient) sendMsgTx(msg *DbgMsgTx) error { func (c *dbgClient) sendMsgStruct(msg *DbgMsgStruct) error { var reply string // TODO use Go() to not block - err := c.client.Call("RPCServer.DbgMsgStruct", msg, &reply) + err := c.rpc.Call("RPCServer.DbgMsgStruct", msg, &reply) if err != nil { return err } @@ -149,94 +159,72 @@ func (c *dbgClient) sendMsgStruct(msg *DbgMsgStruct) error { return nil } -// TransitionsToDBG sends transitions to the am-dbg server. -// TODO test TransitionsToDBG -// TODO support changes to mach.StateNames -// TODO rewrite to Tracer API? -func TransitionsToDBG(mach *am.Machine, url string) error { +type DbgTracer struct { + am.NoOpTracer + + c *dbgClient +} + +func (t *DbgTracer) StructChange(mach *am.Machine, _ am.Struct) { + // TODO support struct patches as DbgMsgTx.StructPatch + err := sendStructMsg(mach, t.c) + if err != nil { + log.Println("failed to send a msg to am-dbg: " + t.c.addr + err.Error()) + } +} + +func (t *DbgTracer) TransitionEnd(tx *am.Transition) { + mach := tx.Machine + + msg := &DbgMsgTx{ + MachineID: mach.ID, + ID: tx.ID, + Clocks: tx.TimeAfter, + Accepted: tx.Accepted, + Type: tx.Mutation.Type, + CalledStates: tx.CalledStates(), + Steps: lo.Map(tx.Steps, + func(step *am.Step, _ int) *am.Step { + return step + }), + // no locking necessary, as the tx is finalized (read-only) + LogEntries: removeLogPrefix(mach, tx.LogEntries), + PreLogEntries: removeLogPrefix(mach, tx.PreLogEntries), + IsAuto: tx.IsAuto(), + Queue: tx.QueueLen, + } + + // TODO retries + err := t.c.sendMsgTx(msg) + if err != nil { + log.Printf("failed to send a msg to am-dbg: %s %s", t.c.addr, err) + return + } +} + +// TransitionsToDbg sends transitions to the am-dbg server. +func TransitionsToDbg(mach *am.Machine, addr string) error { + // TODO test TransitionsToDbg + // TODO support changes to mach.StateNames + // TODO make sure the mach isnt already traced + // TODO prevent double debugging gob.Register(am.Relation(0)) - if url == "" { - url = DbgHost + if addr == "" { + addr = DbgAddr } - client, err := newDbgClient(url) + client, err := newDbgClient(addr) if err != nil { return fmt.Errorf("failed to connect to am-dbg: %w", err) } - err2 := sendStructMsg(mach, client) - if err2 != nil { - return err2 - } - go func() { - // bind to transitions steam and state changes - txEndCh := mach.OnEvent(am.S{am.EventTransitionEnd}, nil) - newStructCh := mach.OnEvent(am.S{am.EventStructChange}, nil) - - // send incoming transitions - for mach.Ctx.Err() == nil { - select { - - case <-mach.Ctx.Done(): - _ = client.client.Close() - return - - // states/relations have changed - case <-newStructCh: - // TODO support struct patches as DbgMsgTx.StructPatch - err := sendStructMsg(mach, client) - if err != nil { - log.Println("failed to send a msg to am-dbg: " + url + err.Error()) - } - - // new transition - case event := <-txEndCh: - - // params - tx, ok1 := event.Args["transition"].(*am.Transition) - preLogs, ok2 := event.Args["pre_logs"].([]*am.LogEntry) - queueLen, ok3 := event.Args["queue_len"].(int) - if !ok1 || !ok2 || !ok3 { - log.Println("invalid transition end event") - continue - } - - msg := &DbgMsgTx{ - MachineID: mach.ID, - ID: tx.ID, - Clocks: make([]uint64, len(tx.ClocksAfter)), - Accepted: tx.Accepted, - Type: tx.Mutation.Type, - CalledStates: tx.CalledStates(), - - Steps: lo.Map(tx.Steps, - func(step *am.Step, _ int) *am.Step { - return step - }), - // no locking necessary, as the tx is finalized (read-only) - LogEntries: tx.LogEntries, - PreLogEntries: preLogs, - IsAuto: tx.IsAuto(), - Queue: queueLen, - } - - if mach.LogID { - removeLogPrefix(msg) - } - - for i, state := range mach.StateNames { - msg.Clocks[i] = tx.ClocksAfter[state] - } - - // TODO retries - err := client.sendMsgTx(msg) - if err != nil { - log.Printf("failed to send a msg to am-dbg: %s %s", url, err) - return - } - } - } - }() + err = sendStructMsg(mach, client) + if err != nil { + return err + } + + // add the tracer + mach.Tracers = append(mach.Tracers, &DbgTracer{c: client}) return nil } @@ -245,7 +233,7 @@ func TransitionsToDBG(mach *am.Machine, url string) error { func sendStructMsg(mach *am.Machine, client *dbgClient) error { msg := &DbgMsgStruct{ ID: mach.ID, - StatesIndex: mach.StateNames, + StatesIndex: mach.StateNames(), States: mach.GetStruct(), } @@ -258,31 +246,35 @@ func sendStructMsg(mach *am.Machine, client *dbgClient) error { return nil } -func removeLogPrefix(msg *DbgMsgTx) { +func removeLogPrefix(mach *am.Machine, entries []*am.LogEntry) []*am.LogEntry { + clone := slices.Clone(entries) + if !mach.LogID { + return clone + } + maxIDlen := 5 addChars := 3 // "[] " - prefixLen := min(len(msg.MachineID)+addChars, maxIDlen+addChars) + prefixLen := min(len(mach.ID)+addChars, maxIDlen+addChars) - for i, le := range msg.LogEntries { - if len(msg.LogEntries[i].Text) < prefixLen { + ret := make([]*am.LogEntry, len(clone)) + for i, le := range clone { + if len(le.Text) < prefixLen { continue } - msg.LogEntries[i] = &am.LogEntry{ + + ret[i] = &am.LogEntry{ Level: le.Level, Text: le.Text[prefixLen:], } } - for i := range msg.PreLogEntries { - if len(msg.PreLogEntries[i].Text) < prefixLen { - continue - } - msg.PreLogEntries[i].Text = msg.PreLogEntries[i].Text[prefixLen:] - } + return ret } // ///// ///// ///// + // ///// OPEN TELEMETRY + // ///// ///// ///// // OtelMachTracer implements machine.Tracer for OpenTelemetry. @@ -524,18 +516,21 @@ func (ot *OtelMachTracer) TransitionInit(tx *am.Transition) { } } } + // build a regular trace ctx, span := ot.Tracer.Start(data.txGroup, name, trace.WithAttributes( attribute.String("tx_id", tx.ID), - attribute.Int64("time_before", timeSum(tx.ClocksBefore)), + attribute.Int64("time_before", timeSum(tx.TimeBefore)), attribute.String("mutation", mutLabel), )) + // decorate Exception trace if errAttr != nil { span.SetAttributes( attribute.String("error", errAttr.Error()), ) } + // trace logged args, if any argsMatcher := tx.Machine.GetLogArgs() if argsMatcher != nil { @@ -545,6 +540,7 @@ func (ot *OtelMachTracer) TransitionInit(tx *am.Transition) { ) } } + // expose data.txTrace = ctx } @@ -566,8 +562,9 @@ func (ot *OtelMachTracer) TransitionEnd(tx *am.Transition) { statesAdded := am.DiffStates(tx.TargetStates, tx.StatesBefore) statesRemoved := am.DiffStates(tx.StatesBefore, tx.TargetStates) // support multi states - for name, tick := range tx.ClocksAfter { - if tick > 1+tx.ClocksBefore[name] && !lo.Contains(statesAdded, name) { + before := tx.ClockBefore() + for name, tick := range tx.ClockAfter() { + if tick > 1+before[name] && !lo.Contains(statesAdded, name) { statesAdded = append(statesAdded, name) } } @@ -584,7 +581,7 @@ func (ot *OtelMachTracer) TransitionEnd(tx *am.Transition) { span := trace.SpanFromContext(data.txTrace) span.SetAttributes( attribute.String("states_diff", strings.Trim(statesDiff, " ")), - attribute.Int64("time_after", timeSum(tx.ClocksAfter)), + attribute.Int64("time_after", timeSum(tx.TimeAfter)), attribute.Bool("accepted", tx.Accepted), attribute.Int("steps_count", len(tx.Steps)), ) @@ -690,7 +687,9 @@ func (ot *OtelMachTracer) Inheritable() bool { func (ot *OtelMachTracer) QueueEnd(*am.Machine) {} // ///// ///// ///// + // ///// UTILS + // ///// ///// ///// // j joins state names @@ -703,7 +702,7 @@ func jw(states []string, sep string) string { return strings.Join(states, sep) } -func timeSum(clocks am.Clocks) int64 { +func timeSum(clocks am.Time) int64 { sum := int64(0) for _, clock := range clocks { sum += int64(clock)