diff --git a/series/collector.go b/series/collector.go index b983ce5..d2a65d7 100644 --- a/series/collector.go +++ b/series/collector.go @@ -18,23 +18,28 @@ import ( ) // MetricValueRenderer takes an event and writes the output to a -// buffer. This makes it possible to use the metrics system with -// arbitrary output formats and targets. +// buffer. This provides the ability to add support arbitrary output +// formats and targets via dependency injection. type MetricValueRenderer func(writer *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time) // Collector maintains the local state of collected metrics: metric // series are registered lazily when they are first sent, and the // collector tracks the value and is responsible for orchestrating. type Collector struct { + CollectorConf + + // synchronized map tracking metrics and periodic collection operations. local adt.Map[string, *dt.List[*tracked]] loops adt.Map[time.Duration, fun.Handler[*tracked]] pool adt.Pool[*bytes.Buffer] + // broker is for cases where there are more than one output + // system. the broker is backed by the publish deque, but we + // use the deque directly when there's only one output. broker *pubsub.Broker[MetricPublisher] publish *pubsub.Deque[MetricPublisher] - CollectorConf - + // lifecycle an error collection. ctx context.Context cancel context.CancelFunc wg fun.WaitGroup @@ -125,9 +130,6 @@ func (c *Collector) Close() error { return c.errs.Resolve() } -func (c *Collector) Push(events ...*Event) { c.Publish(events) } -func (c *Collector) Publish(events []*Event) { dt.Sliceify(events).Observe(c.PushEvent) } - func (c *Collector) Stream( iter *fun.Iterator[*Event], opts ...fun.OptionProvider[*fun.WorkerGroupConf], @@ -135,6 +137,9 @@ func (c *Collector) Stream( return iter.ProcessParallel(fun.Handle(c.PushEvent).Processor(), opts...) } +func (c *Collector) Push(events ...*Event) { c.Publish(events) } +func (c *Collector) Publish(events []*Event) { dt.NewSlice(events).Observe(c.PushEvent) } + func (c *Collector) PushEvent(e *Event) { if e.m == nil { return @@ -194,7 +199,7 @@ func (c *Collector) Iterator() *fun.Iterator[MetricSnapshot] { }).Operation(ec.Handler()).Go().Once(), ).IteratorWithHook(erc.IteratorHook[*tracked](ec)), // transformation function to convert the iterator of - // tracked + // trackedMetrics to metrics snapshots. fun.Converter(func(tr *tracked) MetricSnapshot { last := tr.lastMod.Load() return MetricSnapshot{Name: tr.meta.ID, Labels: tr.meta.labelstr(), Value: last.Key, Timestamp: last.Value} diff --git a/series/converter.go b/series/converter.go index d7f7444..145c6e8 100644 --- a/series/converter.go +++ b/series/converter.go @@ -186,7 +186,7 @@ func hasMetrics[T extractableMessageTypes](in T) (isMetric bool) { case fun.Future[Event], fun.Future[*Event], fun.Future[[]Event], fun.Future[[]*Event]: return true case map[string]any: // also mesage.Fields - dt.Mapify(ev).Values().Process(fun.MakeProcessor(func(in any) (err error) { + dt.NewMap(ev).Values().Process(fun.MakeProcessor(func(in any) (err error) { isMetric, err = isEventTyped(in) return })).Ignore().Wait() @@ -196,17 +196,17 @@ func hasMetrics[T extractableMessageTypes](in T) (isMetric bool) { return })).Ignore().Wait() case []dt.Pair[string, any]: - dt.Sliceify(ev).Iterator().Process(fun.MakeProcessor(func(in dt.Pair[string, any]) (err error) { + dt.NewSlice(ev).Iterator().Process(fun.MakeProcessor(func(in dt.Pair[string, any]) (err error) { isMetric, err = isEventTyped(in.Value) return })).Ignore().Wait() case []*dt.Pair[string, any]: - dt.Sliceify(ev).Iterator().Process(fun.MakeProcessor(func(in *dt.Pair[string, any]) (err error) { + dt.NewSlice(ev).Iterator().Process(fun.MakeProcessor(func(in *dt.Pair[string, any]) (err error) { isMetric, err = isEventTyped(in.Value) return })).Ignore().Wait() case []any: - dt.Sliceify(ev).Iterator().Process(fun.MakeProcessor(func(in any) (err error) { + dt.NewSlice(ev).Iterator().Process(fun.MakeProcessor(func(in any) (err error) { isMetric, err = isEventTyped(in) return })).Ignore().Wait() diff --git a/series/metric.go b/series/metric.go index b3f2a0e..1b195ef 100644 --- a/series/metric.go +++ b/series/metric.go @@ -12,10 +12,19 @@ import ( "github.com/tychoish/fun/risky" ) +// MetricType determines the kind of metric, in particular how the +// state of the metric is tracked over the lifetime of the +// application. type MetricType string const ( - MetricTypeDeltas MetricType = "deltas" + // MetricTypeDeltas represents an integer/numeric value that + // is rendered as deltas: the difference since the last time + // the metric was reported. + MetricTypeDeltas MetricType = "deltas" + // MetricTypeCounter represents an incrementing metric that + // increases over the lifetime of the process' lifespan. When + // reported, the total value of the counter is displayed. MetricTypeCounter MetricType = "counter" MetricTypeGuage MetricType = "gauge" MetricTypeHistogram MetricType = "histogram" @@ -75,7 +84,7 @@ func Histogram(id string, opts ...HistogramOptionProvider) *Metric { func (m *Metric) Label(k, v string) *Metric { m.labels.Add(dt.MakePair(k, v)); return m } func (m *Metric) MetricType(t MetricType) *Metric { m.Type = t; return m } func (m *Metric) Annotate(pairs ...dt.Pair[string, string]) *Metric { - m.labels.Populate(dt.Sliceify(pairs).Iterator()) + m.labels.Populate(dt.NewSlice(pairs).Iterator()) return m }