Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refac(telemetry): switch telemetry to Tracer API #125

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 106 additions & 120 deletions pkg/telemetry/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package prometheus provides Prometheus metrics for asyncmachine.
// Package prometheus provides Prometheus metrics for asyncam.
// The metrics are collected from the machine's transitions and states.
//
// Exported metrics:
Expand All @@ -13,18 +13,100 @@ import (
"sync"
"time"

"github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/prometheus/client_golang/prometheus"

am "github.com/pancsta/asyncmachine-go/pkg/machine"
)

// Metrics is a set of Prometheus metrics for a machine.
type promTracer struct {
am.NoOpTracer

m *Metrics
txStartTime time.Time
prevTime uint64
}

func (t *promTracer) StructChange(machine *am.Machine, old am.Struct) {
// TODO refresh state and relation metrics
}

func (t *promTracer) TransitionInit(tx *am.Transition) {
t.txStartTime = time.Now()
}

func (t *promTracer) TransitionEnd(tx *am.Transition) {
if t.m.closed || !tx.Accepted {
return
}

statesIndex := tx.Machine.StateNames()

// try to refresh, then lock
t.m.Refresh()
t.m.mx.Lock()
defer t.m.mx.Unlock()

queueLen := tx.QueueLen
t.m.queueSize = uint64(queueLen)
t.m.queueSizeLen++
t.m.stepsAmount += uint64(len(tx.Steps))
t.m.stepsAmountLen++
// TODO log slow txs (Opts and default to 1ms)
t.m.txTime += uint64(time.Since(t.txStartTime).Milliseconds())
t.m.txTimeLen++

// executed handlers
handlersCount := 0
for _, step := range tx.Steps {
if step.Type == am.StepHandler {
handlersCount++
}
}
t.m.handlersAmount += uint64(handlersCount)
t.m.handlersAmountLen++

// tx states
added, removed, touched := getTxStates(tx, statesIndex)
t.m.statesAdded += uint64(len(added))
t.m.statesAddedLen++
t.m.statesRemoved += uint64(len(removed))
t.m.statesRemovedLen++
t.m.statesTouched += uint64(len(touched))

// time sum
currTime := tx.Machine.TimeSum(nil)
t.m.txTick += currTime - t.prevTime
t.m.txTickLen++
t.prevTime = currTime

// active / inactive states
active := 0
inactive := 0
for _, t := range tx.TimeBefore {
if am.IsActiveTick(t) {
active++
} else {
inactive++
}
}
t.m.statesActiveAmount += uint64(active)
t.m.statesActiveAmountLen++
t.m.statesInactiveAmount += uint64(inactive)
}

func (t *promTracer) ExceptionState(err error) {
t.m.ExceptionsCount.Inc()
}

// Metrics is a set of Prometheus metrics for a am.
type Metrics struct {
mx sync.Mutex
closed bool
lastUpdate time.Time
interval time.Duration
tracer am.Tracer

////// mach definition
// //// mach definition

// number of registered states
StatesAmount prometheus.Gauge
Expand All @@ -35,7 +117,7 @@ type Metrics struct {
// number of state referenced by relations for all registered states
RefStatesAmount prometheus.Gauge

////// tx data
// //// tx data

// current number of queued transitions (per transition)
QueueSize prometheus.Gauge
Expand Down Expand Up @@ -77,7 +159,7 @@ type Metrics struct {
exceptionsCount uint64
exceptionsCountLen uint

////// stats
// //// stats

// steps per transition
StepsAmount prometheus.Gauge
Expand All @@ -95,14 +177,14 @@ type Metrics struct {
txTimeLen uint
}

func newMetrics(mach *machine.Machine, interval time.Duration) *Metrics {
machID := machine.NormalizeID(mach.ID)
func newMetrics(mach *am.Machine, interval time.Duration) *Metrics {
machID := am.NormalizeID(mach.ID)

return &Metrics{
interval: interval,
lastUpdate: time.Now(),

///// mach definition
// /// mach definition

StatesAmount: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "states_amount",
Expand All @@ -124,7 +206,7 @@ func newMetrics(mach *machine.Machine, interval time.Duration) *Metrics {
Namespace: "mach",
}),

///// tx data
// /// tx data

QueueSize: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "queue_size",
Expand Down Expand Up @@ -282,16 +364,14 @@ func average(sum uint64, sampleLen uint) float64 {
}

// TransitionsToPrometheus bind transitions to Prometheus metrics.
// TODO debounce
// TODO bind via the tracer API (so it can be disabled/enabled)
func TransitionsToPrometheus(
mach *machine.Machine, interval time.Duration,
mach *am.Machine, interval time.Duration,
) *Metrics {
metrics := newMetrics(mach, interval)

// state & relations
// TODO bind to EventStatesChange
metrics.StatesAmount.Set(float64(len(mach.StateNames)))
metrics.StatesAmount.Set(float64(len(mach.StateNames())))
relCount := 0
stateRefCount := 0
for _, state := range mach.GetStruct() {
Expand All @@ -315,131 +395,37 @@ func TransitionsToPrometheus(
}
metrics.RelAmount.Set(float64(relCount))
metrics.RefStatesAmount.Set(float64(stateRefCount))
statesIndex := mach.StateNames[:]

// TODO extract
go func() {

// bind to transitions
var txStartTime time.Time
txInitCh := mach.OnEvent(machine.S{machine.EventTransitionInit}, nil)
txEndCh := mach.OnEvent(machine.S{machine.EventTransitionEnd}, nil)
errStateCh := mach.OnEvent(machine.S{machine.EventException}, nil)
prevTime := mach.TimeSum(nil)

// consume the data from the channels
for mach.Ctx.Err() == nil {
select {

case <-mach.Ctx.Done():
break

case <-errStateCh:
metrics.ExceptionsCount.Inc()

case <-txInitCh:
txStartTime = time.Now()

case event := <-txEndCh:
if metrics.closed {
return
}

tx := event.Args["transition"].(*machine.Transition)

// skip canceled txs
if !tx.Accepted {
continue
}

// try to refresh, then lock
metrics.Refresh()
metrics.mx.Lock()

queueLen := event.Args["queue_len"].(int)
metrics.queueSize = uint64(queueLen)
metrics.queueSizeLen++
metrics.stepsAmount += uint64(len(tx.Steps))
metrics.stepsAmountLen++
// TODO log slow txs (Opts and default to 1ms)
metrics.txTime += uint64(time.Since(txStartTime).Milliseconds())
metrics.txTimeLen++

// executed handlers
handlersCount := 0
for _, step := range tx.Steps {
if step.Type == machine.StepHandler {
handlersCount++
}
}
metrics.handlersAmount += uint64(handlersCount)
metrics.handlersAmountLen++

// tx states
added, removed, touched := getTxStates(tx, statesIndex)
metrics.statesAdded += uint64(len(added))
metrics.statesAddedLen++
metrics.statesRemoved += uint64(len(removed))
metrics.statesRemovedLen++
metrics.statesTouched += uint64(len(touched))

// time sum
currTime := mach.TimeSum(nil)
metrics.txTick += currTime - prevTime
metrics.txTickLen++
prevTime = currTime

// active / inactive states
active := 0
inactive := 0
for _, t := range tx.ClocksBefore {
if machine.IsActiveTick(t) {
active++
} else {
inactive++
}
}
metrics.statesActiveAmount += uint64(active)
metrics.statesActiveAmountLen++
metrics.statesInactiveAmount += uint64(inactive)

// unlock
metrics.mx.Unlock()
}
}

defer metrics.Close()
}()
mach.Tracers = append(mach.Tracers, &promTracer{m: metrics})

return metrics
}

// TODO move to helpers
func getTxStates(
tx *machine.Transition, index machine.S,
) (added machine.S, removed machine.S, touched machine.S) {
tx *am.Transition, index am.S,
) (added am.S, removed am.S, touched am.S) {

before := tx.ClocksBefore
after := tx.ClocksAfter
before := tx.TimeBefore
after := tx.TimeAfter

is := func(clocks machine.Clocks, i string) bool {
// TODO use T
return machine.IsActiveTick(clocks[i])
is := func(time am.Time, i int) bool {
return am.IsActiveTick(time[i])
}

for _, name := range index {
if is(before, name) && !is(after, name) {
for i, name := range index {
if is(before, i) && !is(after, i) {
removed = append(removed, name)
} else if !is(before, name) && is(after, name) {
} else if !is(before, i) && is(after, i) {
added = append(added, name)
} else if before[name] != after[name] {
} else if before[i] != after[i] {
// treat multi states as added
added = append(added, name)
}
}

// touched
touched = machine.S{}
touched = am.S{}
for _, step := range tx.Steps {
if step.FromState != "" {
touched = append(touched, step.FromState)
Expand Down
Loading