Skip to content

Commit

Permalink
refac(telemetry): switch telemetry to Tracer API
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Sep 11, 2024
1 parent 5c8702c commit c271e42
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 225 deletions.
226 changes: 106 additions & 120 deletions pkg/telemetry/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package prometheus provides Prometheus metrics for asyncmachine.
// Package prometheus provides Prometheus metrics for asyncam.
// The metrics are collected from the machine's transitions and states.
//
// Exported metrics:
Expand All @@ -13,18 +13,100 @@ import (
"sync"
"time"

"github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/prometheus/client_golang/prometheus"

am "github.com/pancsta/asyncmachine-go/pkg/machine"
)

// Metrics is a set of Prometheus metrics for a machine.
type promTracer struct {
am.NoOpTracer

m *Metrics
txStartTime time.Time
prevTime uint64
}

func (t *promTracer) StructChange(machine *am.Machine, old am.Struct) {
// TODO refresh state and relation metrics
}

func (t *promTracer) TransitionInit(tx *am.Transition) {
t.txStartTime = time.Now()
}

func (t *promTracer) TransitionEnd(tx *am.Transition) {
if t.m.closed || !tx.Accepted {
return
}

statesIndex := tx.Machine.StateNames()

// try to refresh, then lock
t.m.Refresh()
t.m.mx.Lock()
defer t.m.mx.Unlock()

queueLen := tx.QueueLen
t.m.queueSize = uint64(queueLen)
t.m.queueSizeLen++
t.m.stepsAmount += uint64(len(tx.Steps))
t.m.stepsAmountLen++
// TODO log slow txs (Opts and default to 1ms)
t.m.txTime += uint64(time.Since(t.txStartTime).Milliseconds())
t.m.txTimeLen++

// executed handlers
handlersCount := 0
for _, step := range tx.Steps {
if step.Type == am.StepHandler {
handlersCount++
}
}
t.m.handlersAmount += uint64(handlersCount)
t.m.handlersAmountLen++

// tx states
added, removed, touched := getTxStates(tx, statesIndex)
t.m.statesAdded += uint64(len(added))
t.m.statesAddedLen++
t.m.statesRemoved += uint64(len(removed))
t.m.statesRemovedLen++
t.m.statesTouched += uint64(len(touched))

// time sum
currTime := tx.Machine.TimeSum(nil)
t.m.txTick += currTime - t.prevTime
t.m.txTickLen++
t.prevTime = currTime

// active / inactive states
active := 0
inactive := 0
for _, t := range tx.TimeBefore {
if am.IsActiveTick(t) {
active++
} else {
inactive++
}
}
t.m.statesActiveAmount += uint64(active)
t.m.statesActiveAmountLen++
t.m.statesInactiveAmount += uint64(inactive)
}

func (t *promTracer) ExceptionState(err error) {
t.m.ExceptionsCount.Inc()
}

// Metrics is a set of Prometheus metrics for a am.
type Metrics struct {
mx sync.Mutex
closed bool
lastUpdate time.Time
interval time.Duration
tracer am.Tracer

////// mach definition
// //// mach definition

// number of registered states
StatesAmount prometheus.Gauge
Expand All @@ -35,7 +117,7 @@ type Metrics struct {
// number of state referenced by relations for all registered states
RefStatesAmount prometheus.Gauge

////// tx data
// //// tx data

// current number of queued transitions (per transition)
QueueSize prometheus.Gauge
Expand Down Expand Up @@ -77,7 +159,7 @@ type Metrics struct {
exceptionsCount uint64
exceptionsCountLen uint

////// stats
// //// stats

// steps per transition
StepsAmount prometheus.Gauge
Expand All @@ -95,14 +177,14 @@ type Metrics struct {
txTimeLen uint
}

func newMetrics(mach *machine.Machine, interval time.Duration) *Metrics {
machID := machine.NormalizeID(mach.ID)
func newMetrics(mach *am.Machine, interval time.Duration) *Metrics {
machID := am.NormalizeID(mach.ID)

return &Metrics{
interval: interval,
lastUpdate: time.Now(),

///// mach definition
// /// mach definition

StatesAmount: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "states_amount",
Expand All @@ -124,7 +206,7 @@ func newMetrics(mach *machine.Machine, interval time.Duration) *Metrics {
Namespace: "mach",
}),

///// tx data
// /// tx data

QueueSize: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "queue_size",
Expand Down Expand Up @@ -282,16 +364,14 @@ func average(sum uint64, sampleLen uint) float64 {
}

// TransitionsToPrometheus bind transitions to Prometheus metrics.
// TODO debounce
// TODO bind via the tracer API (so it can be disabled/enabled)
func TransitionsToPrometheus(
mach *machine.Machine, interval time.Duration,
mach *am.Machine, interval time.Duration,
) *Metrics {
metrics := newMetrics(mach, interval)

// state & relations
// TODO bind to EventStatesChange
metrics.StatesAmount.Set(float64(len(mach.StateNames)))
metrics.StatesAmount.Set(float64(len(mach.StateNames())))
relCount := 0
stateRefCount := 0
for _, state := range mach.GetStruct() {
Expand All @@ -315,131 +395,37 @@ func TransitionsToPrometheus(
}
metrics.RelAmount.Set(float64(relCount))
metrics.RefStatesAmount.Set(float64(stateRefCount))
statesIndex := mach.StateNames[:]

// TODO extract
go func() {

// bind to transitions
var txStartTime time.Time
txInitCh := mach.OnEvent(machine.S{machine.EventTransitionInit}, nil)
txEndCh := mach.OnEvent(machine.S{machine.EventTransitionEnd}, nil)
errStateCh := mach.OnEvent(machine.S{machine.EventException}, nil)
prevTime := mach.TimeSum(nil)

// consume the data from the channels
for mach.Ctx.Err() == nil {
select {

case <-mach.Ctx.Done():
break

case <-errStateCh:
metrics.ExceptionsCount.Inc()

case <-txInitCh:
txStartTime = time.Now()

case event := <-txEndCh:
if metrics.closed {
return
}

tx := event.Args["transition"].(*machine.Transition)

// skip canceled txs
if !tx.Accepted {
continue
}

// try to refresh, then lock
metrics.Refresh()
metrics.mx.Lock()

queueLen := event.Args["queue_len"].(int)
metrics.queueSize = uint64(queueLen)
metrics.queueSizeLen++
metrics.stepsAmount += uint64(len(tx.Steps))
metrics.stepsAmountLen++
// TODO log slow txs (Opts and default to 1ms)
metrics.txTime += uint64(time.Since(txStartTime).Milliseconds())
metrics.txTimeLen++

// executed handlers
handlersCount := 0
for _, step := range tx.Steps {
if step.Type == machine.StepHandler {
handlersCount++
}
}
metrics.handlersAmount += uint64(handlersCount)
metrics.handlersAmountLen++

// tx states
added, removed, touched := getTxStates(tx, statesIndex)
metrics.statesAdded += uint64(len(added))
metrics.statesAddedLen++
metrics.statesRemoved += uint64(len(removed))
metrics.statesRemovedLen++
metrics.statesTouched += uint64(len(touched))

// time sum
currTime := mach.TimeSum(nil)
metrics.txTick += currTime - prevTime
metrics.txTickLen++
prevTime = currTime

// active / inactive states
active := 0
inactive := 0
for _, t := range tx.ClocksBefore {
if machine.IsActiveTick(t) {
active++
} else {
inactive++
}
}
metrics.statesActiveAmount += uint64(active)
metrics.statesActiveAmountLen++
metrics.statesInactiveAmount += uint64(inactive)

// unlock
metrics.mx.Unlock()
}
}

defer metrics.Close()
}()
mach.Tracers = append(mach.Tracers, &promTracer{m: metrics})

return metrics
}

// TODO move to helpers
func getTxStates(
tx *machine.Transition, index machine.S,
) (added machine.S, removed machine.S, touched machine.S) {
tx *am.Transition, index am.S,
) (added am.S, removed am.S, touched am.S) {

before := tx.ClocksBefore
after := tx.ClocksAfter
before := tx.TimeBefore
after := tx.TimeAfter

is := func(clocks machine.Clocks, i string) bool {
// TODO use T
return machine.IsActiveTick(clocks[i])
is := func(time am.Time, i int) bool {
return am.IsActiveTick(time[i])
}

for _, name := range index {
if is(before, name) && !is(after, name) {
for i, name := range index {
if is(before, i) && !is(after, i) {
removed = append(removed, name)
} else if !is(before, name) && is(after, name) {
} else if !is(before, i) && is(after, i) {
added = append(added, name)
} else if before[name] != after[name] {
} else if before[i] != after[i] {
// treat multi states as added
added = append(added, name)
}
}

// touched
touched = machine.S{}
touched = am.S{}
for _, step := range tx.Steps {
if step.FromState != "" {
touched = append(touched, step.FromState)
Expand Down
Loading

0 comments on commit c271e42

Please sign in to comment.