From 778510a35ca7189ea7456ff47ed37361bea4af16 Mon Sep 17 00:00:00 2001 From: Matt Kruse Date: Tue, 1 Oct 2024 05:04:02 -0500 Subject: [PATCH] Feat: Add data transformations. Signed-off-by: Matt Kruse --- README.md | 14 +++++ config/config.go | 18 +++--- examples/config.yml | 31 +++++++++ examples/prometheus.yml | 15 +++++ examples/transform-data.json | 45 +++++++++++++ exporter/collector.go | 26 ++++++-- exporter/util.go | 19 ++++++ go.mod | 2 + go.sum | 4 ++ transformers/jq_transformer.go | 55 ++++++++++++++++ transformers/transformers.go | 21 +++++++ transformers/transformers_test.go | 101 ++++++++++++++++++++++++++++++ 12 files changed, 337 insertions(+), 14 deletions(-) create mode 100644 examples/transform-data.json create mode 100644 transformers/jq_transformer.go create mode 100644 transformers/transformers.go create mode 100644 transformers/transformers_test.go diff --git a/README.md b/README.md index f514a85d..db2549bc 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,20 @@ animal_population{name="deer",predator="false"} 456 animal_population{name="lion",predator="true"} 123 animal_population{name="pigeon",predator="false"} 789 +## Test transform module +$ curl "http://localhost:7979/probe?module=transform&target=http://localhost:8000/examples/transform-data.json" + +# HELP origin_health Health of each origin in the pool +# TYPE origin_health untyped +origin_health{address="10.0.0.1",endpoint_name="origin3",pool_id="2",pool_name="pool2"} 1 +origin_health{address="10.0.0.1",endpoint_name="origin4",pool_id="3",pool_name="pool3"} 0 +origin_health{address="127.0.0.1",endpoint_name="origin1",pool_id="1",pool_name="pool1"} 1 +origin_health{address="192.168.1.1",endpoint_name="origin2",pool_id="1",pool_name="pool1"} 0 +# HELP pool_health Health of the pools +# TYPE pool_health untyped +pool_health{pool_id="1",pool_name="pool1"} 1 +pool_health{pool_id="2",pool_name="pool2"} 1 +pool_health{pool_id="3",pool_name="pool3"} 0 ## TEST through prometheus: diff --git a/config/config.go b/config/config.go index 6cd0accb..829afc6e 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,7 @@ package config import ( + "github.com/prometheus-community/json_exporter/transformers" "os" pconfig "github.com/prometheus/common/config" @@ -22,14 +23,15 @@ import ( // Metric contains values that define a metric type Metric struct { - Name string - Path string - Labels map[string]string - Type ScrapeType - ValueType ValueType - EpochTimestamp string - Help string - Values map[string]string + Name string + Path string + Labels map[string]string + Type ScrapeType + ValueType ValueType + EpochTimestamp string + Help string + Values map[string]string + Transformations []transformers.TransformationConfig } type ScrapeType string diff --git a/examples/config.yml b/examples/config.yml index 9d0745c0..f6bba80b 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -43,6 +43,37 @@ modules: values: population: '{ .population }' + transform: + metrics: + - name: origin + transformations: + - type: jq + query: |- + .result[] | .name as $poolName | .id as $poolId | .origins[] | ( .name as $name | .healthy as $endpointHealth | {endpointName: $name, endpointHealthy: .healthy, poolName: $poolName, address:.address, poolId:$poolId, address:.address} ) + help: Health of each origin in the pool + path: '{ [*] }' + type: object + labels: + pool_id: '{.poolId}' + pool_name: '{.poolName}' + address: '{.address}' + endpoint_name: '{.endpointName}' + values: + health: '{.endpointHealthy}' # Extract only the `healthy` field + + + - name: pool + type: object + help: Health of the pools + path: '{.result[*]}' + labels: + pool_name: '{.name}' + pool_id: '{.id}' + values: + health: '{.healthy}' + + + ## HTTP connection configurations can be set in 'modules..http_client_config' field. For full http client config parameters, ref: https://pkg.go.dev/github.com/prometheus/common/config?tab=doc#HTTPClientConfig # # http_client_config: diff --git a/examples/prometheus.yml b/examples/prometheus.yml index 25f73582..c3f576d2 100644 --- a/examples/prometheus.yml +++ b/examples/prometheus.yml @@ -32,3 +32,18 @@ scrape_configs: ## Location of the json exporter's real : replacement: host.docker.internal:7979 # equivalent to "localhost:7979" +## Gather metrics from transform-data JSON source using the 'transform' module +- job_name: json_transform + metrics_path: /probe + params: + module: [transform] # Use the 'transform' module + static_configs: + - targets: + - http://localhost:8000/examples/transform-data.json + relabel_configs: + - source_labels: [__address__] + target_label: __param_target + - source_labels: [__param_target] + target_label: instance + - target_label: __address__ + replacement: host.docker.internal:7979 # json_exporter instance \ No newline at end of file diff --git a/examples/transform-data.json b/examples/transform-data.json new file mode 100644 index 00000000..65dbf2f2 --- /dev/null +++ b/examples/transform-data.json @@ -0,0 +1,45 @@ +{ + "result": [ + { + "name": "pool1", + "id": "1", + "healthy": true, + "origins": [ + { + "name": "origin1", + "address": "127.0.0.1", + "healthy": true + }, + { + "name": "origin2", + "address": "192.168.1.1", + "healthy": false + } + ] + }, + { + "name": "pool2", + "id": "2", + "healthy": true, + "origins": [ + { + "name": "origin3", + "address": "10.0.0.1", + "healthy": true + } + ] + }, + { + "name": "pool3", + "id": "3", + "healthy": false, + "origins": [ + { + "name": "origin4", + "address": "10.0.0.1", + "healthy": false + } + ] + } + ] +} diff --git a/exporter/collector.go b/exporter/collector.go index 4effc10f..342453a6 100644 --- a/exporter/collector.go +++ b/exporter/collector.go @@ -16,6 +16,7 @@ package exporter import ( "bytes" "encoding/json" + "github.com/prometheus-community/json_exporter/transformers" "time" "github.com/go-kit/log" @@ -39,6 +40,7 @@ type JSONMetric struct { LabelsJSONPaths []string ValueType prometheus.ValueType EpochTimestampJSONPath string + Transformers []transformers.Transformer } func (mc JSONMetricCollector) Describe(ch chan<- *prometheus.Desc) { @@ -49,11 +51,22 @@ func (mc JSONMetricCollector) Describe(ch chan<- *prometheus.Desc) { func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) { for _, m := range mc.JSONMetrics { + + jsonData := mc.Data + for _, transformer := range m.Transformers { + transformedData, err := transformer.Transform(jsonData) + if err != nil { + level.Error(mc.Logger).Log("msg", "Transformation failed", "err", err) + continue + } + jsonData = transformedData + } + switch m.Type { case config.ValueScrape: - value, err := extractValue(mc.Logger, mc.Data, m.KeyJSONPath, false) + value, err := extractValue(mc.Logger, jsonData, m.KeyJSONPath, false) if err != nil { - level.Error(mc.Logger).Log("msg", "Failed to extract value for metric", "path", m.KeyJSONPath, "err", err, "metric", m.Desc) + level.Error(mc.Logger).Log("msg", "Failed to extract value for metric", "path", m.KeyJSONPath, "err", err, "metric", m.Desc, "data", jsonData) continue } @@ -62,16 +75,17 @@ func (mc JSONMetricCollector) Collect(ch chan<- prometheus.Metric) { m.Desc, m.ValueType, floatValue, - extractLabels(mc.Logger, mc.Data, m.LabelsJSONPaths)..., + extractLabels(mc.Logger, jsonData, m.LabelsJSONPaths)..., ) - ch <- timestampMetric(mc.Logger, m, mc.Data, metric) + ch <- timestampMetric(mc.Logger, m, jsonData, metric) } else { - level.Error(mc.Logger).Log("msg", "Failed to convert extracted value to float64", "path", m.KeyJSONPath, "value", value, "err", err, "metric", m.Desc) + level.Error(mc.Logger).Log("msg", "Failed to convert extracted value to float64", "path", m.KeyJSONPath, "value", value, "err", err, "metric", m.Desc) // was getting error here! continue } case config.ObjectScrape: - values, err := extractValue(mc.Logger, mc.Data, m.KeyJSONPath, true) + values, err := extractValue(mc.Logger, jsonData, m.KeyJSONPath, true) + if err != nil { level.Error(mc.Logger).Log("msg", "Failed to extract json objects for metric", "err", err, "metric", m.Desc) continue diff --git a/exporter/util.go b/exporter/util.go index 8374ddce..6481e464 100644 --- a/exporter/util.go +++ b/exporter/util.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus-community/json_exporter/transformers" "io" "math" "net/http" @@ -80,6 +81,22 @@ func CreateMetricsList(c config.Module) ([]JSONMetric, error) { valueType prometheus.ValueType ) for _, metric := range c.Metrics { + metricTransfomers := []transformers.Transformer{} + for _, tConfig := range metric.Transformations { + transformer, err := transformers.NewTransformer(tConfig) // Use the package reference here + if err != nil { + return nil, err + } + metricTransfomers = append(metricTransfomers, transformer) + } + switch metric.ValueType { + case config.ValueTypeGauge: + valueType = prometheus.GaugeValue + case config.ValueTypeCounter: + valueType = prometheus.CounterValue + default: + valueType = prometheus.UntypedValue + } switch metric.ValueType { case config.ValueTypeGauge: valueType = prometheus.GaugeValue @@ -107,6 +124,7 @@ func CreateMetricsList(c config.Module) ([]JSONMetric, error) { LabelsJSONPaths: variableLabelsValues, ValueType: valueType, EpochTimestampJSONPath: metric.EpochTimestamp, + Transformers: metricTransfomers, // Add transformers here } metrics = append(metrics, jsonMetric) case config.ObjectScrape: @@ -130,6 +148,7 @@ func CreateMetricsList(c config.Module) ([]JSONMetric, error) { LabelsJSONPaths: variableLabelsValues, ValueType: valueType, EpochTimestampJSONPath: metric.EpochTimestamp, + Transformers: metricTransfomers, // Add transformers here } metrics = append(metrics, jsonMetric) } diff --git a/go.mod b/go.mod index 880ed840..b6959c9d 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,8 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/huandu/xstrings v1.3.3 // indirect github.com/imdario/mergo v0.3.11 // indirect + github.com/itchyny/gojq v0.12.16 // indirect + github.com/itchyny/timefmt-go v0.1.6 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/reflectwalk v1.0.0 // indirect diff --git a/go.sum b/go.sum index 1d1d6374..ac86ab26 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,10 @@ github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4 github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g= +github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM= +github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= +github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= diff --git a/transformers/jq_transformer.go b/transformers/jq_transformer.go new file mode 100644 index 00000000..30215c78 --- /dev/null +++ b/transformers/jq_transformer.go @@ -0,0 +1,55 @@ +package transformers + +import ( + "encoding/json" + "github.com/itchyny/gojq" +) + +// JQTransformer struct for jq transformation +type JQTransformer struct { + Query string +} + +// NewJQTransformer creates a new JQTransformer with a given query +func NewJQTransformer(query string) JQTransformer { + return JQTransformer{Query: query} +} + +// Transform applies the jq filter transformation to the input data +func (jq JQTransformer) Transform(data []byte) ([]byte, error) { + return applyJQFilter(data, jq.Query) +} + +// applyJQFilter uses gojq to apply a jq transformation to the input data +func applyJQFilter(jsonData []byte, jqQuery string) ([]byte, error) { + var input interface{} + if err := json.Unmarshal(jsonData, &input); err != nil { + return nil, err + } + + query, err := gojq.Parse(jqQuery) + if err != nil { + return nil, err + } + + iter := query.Run(input) + var results []interface{} + for { + v, ok := iter.Next() + if !ok { + break + } + if err, ok := v.(error); ok { + return nil, err + } + results = append(results, v) + } + + // Convert the transformed result back to JSON []byte + transformedJSON, err := json.Marshal(results) + if err != nil { + return nil, err + } + + return transformedJSON, nil +} diff --git a/transformers/transformers.go b/transformers/transformers.go new file mode 100644 index 00000000..5b560a84 --- /dev/null +++ b/transformers/transformers.go @@ -0,0 +1,21 @@ +package transformers + +import "fmt" + +type Transformer interface { + Transform(data []byte) ([]byte, error) +} + +type TransformationConfig struct { + Type string + Query string +} + +func NewTransformer(config TransformationConfig) (Transformer, error) { + switch config.Type { + case "jq": + return NewJQTransformer(config.Query), nil + default: + return nil, fmt.Errorf("unsupported transformer type: %s", config.Type) + } +} diff --git a/transformers/transformers_test.go b/transformers/transformers_test.go new file mode 100644 index 00000000..439ecc1b --- /dev/null +++ b/transformers/transformers_test.go @@ -0,0 +1,101 @@ +package transformers + +import "testing" + +func TestNewTransformerFactory(t *testing.T) { + // Define test cases for multiple jq transformations + tests := []struct { + Config TransformationConfig + Input string + ExpectedOutput string + ShouldSucceed bool + }{ + { + Config: TransformationConfig{ + Type: "jq", + Query: `.result[] | select(.name == "pool1")`, + }, + Input: `{ + "result": [ + {"name":"pool1","origins":[{"name":"origin1","healthy":true},{"name":"origin2","healthy":false}]}, + {"name":"pool2","origins":[{"name":"origin3","healthy":true}]} + ] + }`, + ExpectedOutput: `[{"name":"pool1","origins":[{"healthy":true,"name":"origin1"},{"healthy":false,"name":"origin2"}]}]`, + ShouldSucceed: true, + }, + { + Config: TransformationConfig{ + Type: "jq", + Query: `.result[] | select(has("origins")) | .origins[] | select(.healthy == true)`, + }, + Input: `{ + "result": [ + {"name":"pool1","origins":[{"name":"origin1","healthy":true},{"name":"origin2","healthy":false}]}, + {"name":"pool2","origins":[{"name":"origin3","healthy":true}]} + ] + }`, + ExpectedOutput: `[{"healthy":true,"name":"origin1"},{"healthy":true,"name":"origin3"}]`, + ShouldSucceed: true, + }, + { + Config: TransformationConfig{ + Type: "jq", + Query: `.result[] | .name as $poolName | .id as $poolId | .origins[] | {endpoint_name: .name, endpoint_health: .healthy, pool_name: $poolName, address: .address, pool_id: $poolId}`, + }, + Input: `{"result":[{"name":"pool1","id":"1","origins":[{"name":"origin1","healthy":true, "address":"127.0.0.1"}]}]}`, + ExpectedOutput: `[{"address":"127.0.0.1","endpoint_health":true,"endpoint_name":"origin1","pool_id":"1","pool_name":"pool1"}]`, + ShouldSucceed: true, + }, + { + Config: TransformationConfig{ + Type: "jq", + Query: `.result[] | select(.name == "pool2")`, + }, + Input: `{"result":[{"name":"pool1","id":"1","origins":[{"name":"origin1","healthy":true}]},{"name":"pool2","id":"2","origins":[{"name":"origin2","healthy":true}]}]}`, + ExpectedOutput: `[{"id":"2","name":"pool2","origins":[{"healthy":true,"name":"origin2"}]}]`, + ShouldSucceed: true, + }, + { + Config: TransformationConfig{ + Type: "jq", + Query: `.result[] | select(.name == "pool2")`, + }, + Input: `{"result":[{"name":"pool1","id":"1","origins":[{"name":"origin1","healthy":true}]}]}`, + ExpectedOutput: `null`, + ShouldSucceed: true, + }, + { + Config: TransformationConfig{ + Type: "jq", + Query: `.result[] | .origins[]`, + }, + Input: `{"result":[{"name":"pool1"}]}`, + ExpectedOutput: ``, + ShouldSucceed: false, + }, + } + + // Loop through each test case + for i, test := range tests { + // Create the transformer using NewTransformer + transformer, err := NewTransformer(test.Config) + if err != nil && test.ShouldSucceed { + t.Fatalf("Failed to create transformer %d: %s", i, err) + } + + // Apply the transformation + output, err := transformer.Transform([]byte(test.Input)) + if err != nil && test.ShouldSucceed { + t.Fatalf("Transformation %d failed: %s", i, err) + } + + // Compare the actual output with the expected output + if string(output) != test.ExpectedOutput { + t.Fatalf("Transformation %d failed. Expected: %s, Got: %s", i, test.ExpectedOutput, string(output)) + } + + // Log the successful transformation + t.Logf("Transformation %d succeeded. Output: %s", i, string(output)) + } +}