Skip to content

Commit

Permalink
[chore][connector/routing] Add ability to route metrics and traces by…
Browse files Browse the repository at this point in the history
… request context
  • Loading branch information
djaglowski committed Nov 1, 2024
1 parent 9d5a3ab commit 363cfb2
Show file tree
Hide file tree
Showing 71 changed files with 5,862 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .chloggen/routing-connector-by-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ change_type: enhancement
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add ability to route logs by request metadata.
note: Add ability to route by request metadata.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [19738]
Expand Down
1 change: 0 additions & 1 deletion connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ The following settings are available:
### Limitations

- The `match_once` setting is only supported when using the `resource` context. If any routes use `log` or `request` context, `match_once` must be set to `true`.
- The `request` context is only supported for logs at this time.
- The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.)

### Supported [OTTL] functions
Expand Down
5 changes: 5 additions & 0 deletions connector/routingconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics
route := c.router.routeSlice[i]
matchedMetrics := pmetric.NewMetrics()
switch route.statementContext {
case "request":
if route.requestCondition.matchRequest(ctx) {
groupAllMetrics(groups, route.consumer, md)
md = pmetric.NewMetrics() // all metrics have been routed
}
case "", "resource":
pmetricutil.MoveResourcesIf(md, matchedMetrics,
func(rs pmetric.ResourceMetrics) bool {
Expand Down
18 changes: 17 additions & 1 deletion connector/routingconnector/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,21 @@ func TestMetricsConnectorCapabilities(t *testing.T) {

func TestMetricsConnectorDetailed(t *testing.T) {
testCases := []string{
filepath.Join("testdata", "metrics", "request_context", "match_any_value"),
filepath.Join("testdata", "metrics", "request_context", "match_grpc_value"),
filepath.Join("testdata", "metrics", "request_context", "match_http_value"),
filepath.Join("testdata", "metrics", "request_context", "match_http_value2"),
filepath.Join("testdata", "metrics", "request_context", "match_no_grpc_value"),
filepath.Join("testdata", "metrics", "request_context", "match_no_http_value"),
filepath.Join("testdata", "metrics", "request_context", "no_request_values"),
filepath.Join("testdata", "metrics", "resource_context", "all_match_first_only"),
filepath.Join("testdata", "metrics", "resource_context", "all_match_last_only"),
filepath.Join("testdata", "metrics", "resource_context", "all_match_once"),
filepath.Join("testdata", "metrics", "resource_context", "each_matches_one"),
filepath.Join("testdata", "metrics", "resource_context", "match_none_with_default"),
filepath.Join("testdata", "metrics", "resource_context", "match_none_without_default"),
filepath.Join("testdata", "metrics", "mixed_context", "match_resource_then_grpc_request"),
filepath.Join("testdata", "metrics", "mixed_context", "match_resource_then_http_request"),
}

for _, tt := range testCases {
Expand Down Expand Up @@ -539,10 +548,17 @@ func TestMetricsConnectorDetailed(t *testing.T) {
)
require.NoError(t, err)

ctx := context.Background()
if ctxFromFile, readErr := createContextFromFile(t, filepath.Join(tt, "request.yaml")); readErr == nil {
ctx = ctxFromFile
} else if !os.IsNotExist(readErr) {
t.Fatalf("Error reading request.yaml: %v", readErr)
}

input, readErr := golden.ReadMetrics(filepath.Join("testdata", "metrics", "input.yaml"))
require.NoError(t, readErr)

require.NoError(t, conn.ConsumeMetrics(context.Background(), input))
require.NoError(t, conn.ConsumeMetrics(ctx, input))

assertExpected := func(actual []pmetric.Metrics, filePath string) {
expected, err := golden.ReadMetrics(filePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
routing:
match_once: true
default_pipelines:
- metrics/default
table:
- context: resource
condition: attributes["resourceName"] == "resourceA"
pipelines:
- metrics/0
- context: request
condition: request["X-Tenant"] == "acme"
pipelines:
- metrics/1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
grpc:
X-Tenant: acme
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
resourceMetrics:
- resource:
attributes:
- key: resourceName
value:
stringValue: resourceA
- key: resourceNameAgain
value:
stringValue: resourceA
schemaUrl: https://opentelemetry.io/schemas/1.6.1
scopeMetrics:
- attributes:
- key: scopeName
value:
stringValue: scopeA
- key: scopeNameAgain
value:
stringValue: scopeA
metrics:
- name: sumMonotonicCumulative
sum:
aggregationTemporality: 2 # AGGREGATION_TEMPORALITY_CUMULATIVE
isMonotonic: true
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "101"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "102"
- name: sumNonmonotonicCumulative
sum:
aggregationTemporality: 2 # AGGREGATION_TEMPORALITY_CUMULATIVE
isMonotonic: false
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "102"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "101"
- name: sumMonotonicDelta
sum:
aggregationTemporality: 1 # AGGREGATION_TEMPORALITY_DELTA
isMonotonic: true
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "1"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "2"
- name: sumNonmonotonicDelta
sum:
aggregationTemporality: 1 # AGGREGATION_TEMPORALITY_DELTA
isMonotonic: false
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "2"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "1"
- attributes:
- key: scopeName
value:
stringValue: scopeB
- key: scopeNameAgain
value:
stringValue: scopeB
metrics:
- name: sumMonotonicCumulative
sum:
aggregationTemporality: 2 # AGGREGATION_TEMPORALITY_CUMULATIVE
isMonotonic: true
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "101"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "102"
- name: sumNonmonotonicCumulative
sum:
aggregationTemporality: 2 # AGGREGATION_TEMPORALITY_CUMULATIVE
isMonotonic: false
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "102"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "101"
- name: sumMonotonicDelta
sum:
aggregationTemporality: 1 # AGGREGATION_TEMPORALITY_DELTA
isMonotonic: true
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "1"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "2"
- name: sumNonmonotonicDelta
sum:
aggregationTemporality: 1 # AGGREGATION_TEMPORALITY_DELTA
isMonotonic: false
dataPoints:
- attributes:
- key: dataPointName
value:
stringValue: dataPointA
- key: dataPointNameAgain
value:
stringValue: dataPointA
asInt: "2"
- attributes:
- key: dataPointName
value:
stringValue: dataPointB
- key: dataPointNameAgain
value:
stringValue: dataPointB
asInt: "1"
Loading

0 comments on commit 363cfb2

Please sign in to comment.