Skip to content

Commit

Permalink
series: docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Nov 5, 2023
1 parent bfba219 commit efe7918
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
21 changes: 13 additions & 8 deletions series/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@ import (
)

// MetricValueRenderer takes an event and writes the output to a
// buffer. This makes it possible to use the metrics system with
// arbitrary output formats and targets.
// buffer. This provides the ability to add support arbitrary output
// formats and targets via dependency injection.
type MetricValueRenderer func(writer *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time)

// Collector maintains the local state of collected metrics: metric
// series are registered lazily when they are first sent, and the
// collector tracks the value and is responsible for orchestrating.
type Collector struct {
CollectorConf

// synchronized map tracking metrics and periodic collection operations.
local adt.Map[string, *dt.List[*tracked]]
loops adt.Map[time.Duration, fun.Handler[*tracked]]
pool adt.Pool[*bytes.Buffer]

// broker is for cases where there are more than one output
// system. the broker is backed by the publish deque, but we
// use the deque directly when there's only one output.
broker *pubsub.Broker[MetricPublisher]
publish *pubsub.Deque[MetricPublisher]

CollectorConf

// lifecycle an error collection.
ctx context.Context
cancel context.CancelFunc
wg fun.WaitGroup
Expand Down Expand Up @@ -125,16 +130,16 @@ func (c *Collector) Close() error {
return c.errs.Resolve()
}

func (c *Collector) Push(events ...*Event) { c.Publish(events) }
func (c *Collector) Publish(events []*Event) { dt.Sliceify(events).Observe(c.PushEvent) }

func (c *Collector) Stream(
iter *fun.Iterator[*Event],
opts ...fun.OptionProvider[*fun.WorkerGroupConf],
) fun.Worker {
return iter.ProcessParallel(fun.Handle(c.PushEvent).Processor(), opts...)
}

func (c *Collector) Push(events ...*Event) { c.Publish(events) }
func (c *Collector) Publish(events []*Event) { dt.NewSlice(events).Observe(c.PushEvent) }

func (c *Collector) PushEvent(e *Event) {
if e.m == nil {
return
Expand Down Expand Up @@ -194,7 +199,7 @@ func (c *Collector) Iterator() *fun.Iterator[MetricSnapshot] {
}).Operation(ec.Handler()).Go().Once(),
).IteratorWithHook(erc.IteratorHook[*tracked](ec)),
// transformation function to convert the iterator of
// tracked
// trackedMetrics to metrics snapshots.
fun.Converter(func(tr *tracked) MetricSnapshot {
last := tr.lastMod.Load()
return MetricSnapshot{Name: tr.meta.ID, Labels: tr.meta.labelstr(), Value: last.Key, Timestamp: last.Value}
Expand Down
8 changes: 4 additions & 4 deletions series/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func hasMetrics[T extractableMessageTypes](in T) (isMetric bool) {
case fun.Future[Event], fun.Future[*Event], fun.Future[[]Event], fun.Future[[]*Event]:
return true
case map[string]any: // also mesage.Fields
dt.Mapify(ev).Values().Process(fun.MakeProcessor(func(in any) (err error) {
dt.NewMap(ev).Values().Process(fun.MakeProcessor(func(in any) (err error) {
isMetric, err = isEventTyped(in)
return
})).Ignore().Wait()
Expand All @@ -196,17 +196,17 @@ func hasMetrics[T extractableMessageTypes](in T) (isMetric bool) {
return
})).Ignore().Wait()
case []dt.Pair[string, any]:
dt.Sliceify(ev).Iterator().Process(fun.MakeProcessor(func(in dt.Pair[string, any]) (err error) {
dt.NewSlice(ev).Iterator().Process(fun.MakeProcessor(func(in dt.Pair[string, any]) (err error) {
isMetric, err = isEventTyped(in.Value)
return
})).Ignore().Wait()
case []*dt.Pair[string, any]:
dt.Sliceify(ev).Iterator().Process(fun.MakeProcessor(func(in *dt.Pair[string, any]) (err error) {
dt.NewSlice(ev).Iterator().Process(fun.MakeProcessor(func(in *dt.Pair[string, any]) (err error) {
isMetric, err = isEventTyped(in.Value)
return
})).Ignore().Wait()
case []any:
dt.Sliceify(ev).Iterator().Process(fun.MakeProcessor(func(in any) (err error) {
dt.NewSlice(ev).Iterator().Process(fun.MakeProcessor(func(in any) (err error) {
isMetric, err = isEventTyped(in)
return
})).Ignore().Wait()
Expand Down
13 changes: 11 additions & 2 deletions series/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@ import (
"github.com/tychoish/fun/risky"
)

// MetricType determines the kind of metric, in particular how the
// state of the metric is tracked over the lifetime of the
// application.
type MetricType string

const (
MetricTypeDeltas MetricType = "deltas"
// MetricTypeDeltas represents an integer/numeric value that
// is rendered as deltas: the difference since the last time
// the metric was reported.
MetricTypeDeltas MetricType = "deltas"
// MetricTypeCounter represents an incrementing metric that
// increases over the lifetime of the process' lifespan. When
// reported, the total value of the counter is displayed.
MetricTypeCounter MetricType = "counter"
MetricTypeGuage MetricType = "gauge"
MetricTypeHistogram MetricType = "histogram"
Expand Down Expand Up @@ -75,7 +84,7 @@ func Histogram(id string, opts ...HistogramOptionProvider) *Metric {
func (m *Metric) Label(k, v string) *Metric { m.labels.Add(dt.MakePair(k, v)); return m }
func (m *Metric) MetricType(t MetricType) *Metric { m.Type = t; return m }
func (m *Metric) Annotate(pairs ...dt.Pair[string, string]) *Metric {
m.labels.Populate(dt.Sliceify(pairs).Iterator())
m.labels.Populate(dt.NewSlice(pairs).Iterator())
return m
}

Expand Down

0 comments on commit efe7918

Please sign in to comment.