Skip to content

Commit

Permalink
Merge pull request #1 from wandera/migration
Browse files Browse the repository at this point in the history
  • Loading branch information
Tantalor93 authored Apr 3, 2023
2 parents 49cd38b + d19e5c3 commit 4dd8f8d
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 342 deletions.
39 changes: 39 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
run:
timeout: 5m
modules-download-mode: readonly
linters:
disable-all: true
enable:
- gosimple
- govet
- ineffassign
- staticcheck
- typecheck
- unused
- gofmt
- revive
- gci
- gofumpt
- whitespace
- godot
- unparam
- gocritic
- gosec

issues:
include:
- EXC0012 # disable excluding of issues about comments from revive
exclude-rules:
- path: _test\.go
linters:
- gosec

linters-settings:
godot:
# list of regexps for excluding particular comment lines from check
exclude:
- '^ @.*' # swaggo comments like // @title
- '^ (\d+)(\.|\)).*' # enumeration comments like // 1. or // 1)
gosec:
global:
audit: true
32 changes: 32 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
export GOPROXY := go-proxy.oss.wandera.net
export GONOSUMDB := github.com/wandera/*

all: check test

MAKEFLAGS += --no-print-directory

prepare:
@echo "Downloading tools"
@cat tools.go | grep _ | cut -f2 -d " " | xargs -tI % sh -c "go install %"

check: prepare
@echo "Running check"
ifeq (, $(shell which golangci-lint))
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.52.2
endif
golangci-lint run
go mod tidy

test: prepare
@echo "Running tests"
mkdir -p report
go test -race -v ./... -coverprofile=report/coverage.txt | tee report/report.txt
go-junit-report -set-exit-code < report/report.txt > report/report.xml
gocov convert report/coverage.txt | gocov-xml > report/coverage.xml
go mod tidy

clean:
@echo "Running clean"
rm -rf "report/"

.PHONY: all check test prepare
93 changes: 6 additions & 87 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,88 +1,7 @@
# saramaprom
[![GoDoc](https://godoc.org/github.com/iimos/saramaprom?status.png)](http://godoc.org/github.com/iimos/saramaprom)
[![Go Report](https://goreportcard.com/badge/github.com/iimos/saramaprom)](https://goreportcard.com/report/github.com/iimos/saramaprom)

This is a prometheus metrics reporter for the [sarama](https://github.com/Shopify/sarama) library.
It is based on https://github.com/deathowl/go-metrics-prometheus library.

## Why
Because `go-metrics-prometheus` is a general solution it reports metrics with no labels so it's hard to use. Thus a sarama specific solution was made, it reports metrics with labels for brokers, topics and consumer/producer instance.

## Installation
```console
go get github.com/iimos/saramaprom
```

## Usage

```go
import (
"context"
"github.com/Shopify/sarama"
"github.com/iimos/saramaprom"
)

ctx := context.Background()
cfg := sarama.NewConfig()
err := saramaprom.ExportMetrics(ctx, cfg.MetricRegistry, saramaprom.Options{})
```

Posible options:
```go
type Options struct {
// PrometheusRegistry is prometheus registry. Default prometheus.DefaultRegisterer.
PrometheusRegistry prometheus.Registerer

// Namespace and Subsystem form the metric name prefix.
// Default Subsystem is "sarama".
Namespace string
Subsystem string

// Label specifies value of "label" label. Default "".
Label string

// FlushInterval specifies interval between updating metrics. Default 1s.
FlushInterval time.Duration

// OnError is error handler. Default handler panics when error occurred.
OnError func(err error)

// Debug turns on debug logging.
Debug bool
}
```

Metric names by default:
```
Gauges:
sarama_batch_size
sarama_compression_ratio
sarama_incoming_byte_rate
sarama_outgoing_byte_rate
sarama_record_send_rate
sarama_records_per_request
sarama_request_latency_in_ms
sarama_request_rate
sarama_request_size
sarama_requests_in_flight
sarama_response_rate
sarama_response_size
Histograms:
sarama_batch_size_histogram
sarama_compression_ratio_histogram
sarama_records_per_request_histogram
sarama_request_latency_in_ms_histogram
sarama_request_size_histogram
sarama_response_size_histogram
```

Every metric have three labels:
* broker – kafka broker id
* topic – kafka topic name
* label – custom label to distinguish different consumers/producers


## Requirements

Go 1.13 or above.
is a library for exporting sarama metrics (provided through [go-metrics](https://github.com/rcrowley/go-metrics)) to Prometheus. It is
a fork of [saramaprom](https://github.com/iimos/saramaprom/tree/ab69b9d3b9e65611e5377c2fd40882124e491f50) with few fixes
and tweaks:
* go-metrics histograms are registered as Prometheus summaries (to better present client side quantiles)
* removed histogram and timer words from metric names
* removed configuration of optional labels from saramaprom (we never configure it and it was creating additional unnecessary dimension to metrics due to bad implementation)
81 changes: 30 additions & 51 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
)

type exporter struct {
opt Options
registry MetricsRegistry
promRegistry prometheus.Registerer
gauges map[string]prometheus.Gauge
customMetrics map[string]*customCollector
histogramBuckets []float64
timerBuckets []float64
mutex *sync.Mutex
opt Options
registry MetricsRegistry
promRegistry prometheus.Registerer
gauges map[string]prometheus.Gauge
customMetrics map[string]*customCollector
histogramQuantiles []float64
timerQuantiles []float64
mutex *sync.Mutex
}

func (c *exporter) sanitizeName(key string) string {
Expand All @@ -41,9 +41,6 @@ func (c *exporter) createKey(name string) string {
func (c *exporter) gaugeFromNameAndValue(name string, val float64) error {
shortName, labels, skip := c.metricNameAndLabels(name)
if skip {
if c.opt.Debug {
fmt.Printf("[saramaprom] skip metric %q because there is no broker or topic labels\n", name)
}
return nil
}

Expand All @@ -57,7 +54,6 @@ func (c *exporter) gaugeFromNameAndValue(name string, val float64) error {
Namespace: c.sanitizeName(c.opt.Namespace),
Subsystem: c.sanitizeName(c.opt.Subsystem),
Name: c.sanitizeName(shortName),
Help: shortName,
}, labelNames)

if err := c.promRegistry.Register(g); err != nil {
Expand Down Expand Up @@ -100,7 +96,6 @@ func (c *exporter) metricNameAndLabels(metricName string) (newName string, label
labels = map[string]string{
"broker": broker,
"topic": topic,
"label": c.opt.Label,
}
return newName, labels, false
}
Expand All @@ -119,7 +114,7 @@ func parseMetricName(name string) (newName, broker, topic string) {
return name, "", ""
}

func (c *exporter) histogramFromNameAndMetric(name string, goMetric interface{}, buckets []float64) error {
func (c *exporter) summaryFromNameAndMetric(name string, goMetric interface{}, quantiles []float64) error {
key := c.createKey(name)
collector, exists := c.customMetrics[key]
if !exists {
Expand All @@ -131,28 +126,25 @@ func (c *exporter) histogramFromNameAndMetric(name string, goMetric interface{},
var ps []float64
var count uint64
var sum float64
var typeName string

switch metric := goMetric.(type) {
case metrics.Histogram:
snapshot := metric.Snapshot()
ps = snapshot.Percentiles(buckets)
ps = snapshot.Percentiles(quantiles)
count = uint64(snapshot.Count())
sum = float64(snapshot.Sum())
typeName = "histogram"
case metrics.Timer:
snapshot := metric.Snapshot()
ps = snapshot.Percentiles(buckets)
ps = snapshot.Percentiles(quantiles)
count = uint64(snapshot.Count())
sum = float64(snapshot.Sum())
typeName = "timer"
default:
return fmt.Errorf("unexpected metric type %T", goMetric)
}

bucketVals := make(map[float64]uint64)
for ii, bucket := range buckets {
bucketVals[bucket] = uint64(ps[ii])
quantilesVals := make(map[float64]float64)
for i, quantile := range quantiles {
quantilesVals[quantile] = ps[i]
}

name, labels, skip := c.metricNameAndLabels(name)
Expand All @@ -164,27 +156,19 @@ func (c *exporter) histogramFromNameAndMetric(name string, goMetric interface{},
prometheus.BuildFQName(
c.sanitizeName(c.opt.Namespace),
c.sanitizeName(c.opt.Subsystem),
c.sanitizeName(name)+"_"+typeName,
c.sanitizeName(name),
),
c.sanitizeName(name),
nil,
labels,
)

hist, err := prometheus.NewConstHistogram(desc, count, sum, bucketVals)
if err != nil {
return err
}
c.mutex.Lock()
collector.metric = hist
c.mutex.Unlock()
collector.setMetric(prometheus.MustNewConstSummary(desc, count, sum, quantilesVals))

return nil
}

func (c *exporter) update() error {
if c.opt.Debug {
fmt.Print("[saramaprom] update()\n")
}
var err error
c.registry.Each(func(name string, i interface{}) {
switch metric := i.(type) {
Expand All @@ -193,31 +177,20 @@ func (c *exporter) update() error {
case metrics.Gauge:
err = c.gaugeFromNameAndValue(name, float64(metric.Value()))
case metrics.GaugeFloat64:
err = c.gaugeFromNameAndValue(name, float64(metric.Value()))
case metrics.Histogram: // sarama
samples := metric.Snapshot().Sample().Values()
if len(samples) > 0 {
lastSample := samples[len(samples)-1]
err = c.gaugeFromNameAndValue(name, float64(lastSample))
}
if err == nil {
err = c.histogramFromNameAndMetric(name, metric, c.histogramBuckets)
}
case metrics.Meter: // sarama
err = c.gaugeFromNameAndValue(name, metric.Value())
case metrics.Histogram:
err = c.summaryFromNameAndMetric(name, metric, c.histogramQuantiles)
case metrics.Meter:
lastSample := metric.Snapshot().Rate1()
err = c.gaugeFromNameAndValue(name, float64(lastSample))
err = c.gaugeFromNameAndValue(name, lastSample)
case metrics.Timer:
lastSample := metric.Snapshot().Rate1()
err = c.gaugeFromNameAndValue(name, float64(lastSample))
if err == nil {
err = c.histogramFromNameAndMetric(name, metric, c.timerBuckets)
}
err = c.summaryFromNameAndMetric(name, metric, c.timerQuantiles)
}
})
return err
}

// for collecting prometheus.constHistogram objects
// customCollector is a collector of prometheus.constSummary objects.
type customCollector struct {
prometheus.Collector

Expand All @@ -231,6 +204,12 @@ func newCustomCollector(mu *sync.Mutex) *customCollector {
}
}

func (c *customCollector) setMetric(metric prometheus.Metric) {
c.mutex.Lock()
c.metric = metric
c.mutex.Unlock()
}

func (c *customCollector) Collect(ch chan<- prometheus.Metric) {
c.mutex.Lock()
if c.metric != nil {
Expand Down
30 changes: 25 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
module github.com/iimos/saramaprom
module github.com/wandera/saramaprom

go 1.13
go 1.19

require (
github.com/prometheus/client_golang v1.7.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/stretchr/testify v1.4.0
github.com/AlekSi/gocov-xml v1.1.0
github.com/axw/gocov v1.1.0
github.com/jstemmer/go-junit-report v1.0.0
github.com/prometheus/client_golang v1.14.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/stretchr/testify v1.8.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.0.0-20190617190820-da514acc4774 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 4dd8f8d

Please sign in to comment.