Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Schema Processor Revamp [Part 2] - ChangeList and Revision #35267

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8dcf413
operator and migrator support
ankitpatel96 Sep 16, 2024
1043a0e
update metadata.yaml
ankitpatel96 Oct 1, 2024
abf096e
add support for log and all type operators in operators
ankitpatel96 Oct 9, 2024
f959921
excise Apply and Rollback in favor of Do
ankitpatel96 Oct 9, 2024
c5a6aa9
simple migrator fix
ankitpatel96 Oct 9, 2024
d1cc19f
new tests for attribute operators
ankitpatel96 Oct 10, 2024
c0740cf
table tests and metrics
ankitpatel96 Oct 14, 2024
0c0667c
move to correct file
ankitpatel96 Oct 14, 2024
e664d4a
format
ankitpatel96 Oct 14, 2024
fa6ca80
more operator tests
ankitpatel96 Oct 15, 2024
9138c4a
spanevent operator tests
ankitpatel96 Oct 15, 2024
698f168
lots of comments
ankitpatel96 Oct 15, 2024
97c4fb8
format
ankitpatel96 Oct 15, 2024
5b1b3b2
rename migrator
ankitpatel96 Oct 15, 2024
9247356
switch to generic operator interface
ankitpatel96 Oct 15, 2024
5c07ae7
rename operator to transformer
ankitpatel96 Oct 16, 2024
c3c2355
rename file
ankitpatel96 Oct 16, 2024
e1b21ab
add comment about ambiguous inverts
ankitpatel96 Oct 16, 2024
775fa43
license files :(
ankitpatel96 Nov 7, 2024
a2ac46a
change md links since there's a linter for it
ankitpatel96 Nov 7, 2024
2d1fb35
Include changelist and revision parts
ankitpatel96 Sep 17, 2024
01ff19b
go.mod
ankitpatel96 Sep 18, 2024
2c88392
add support for log and all type operators in changelist and revision
ankitpatel96 Oct 9, 2024
123174a
switch to struct not method
ankitpatel96 Oct 15, 2024
caa0626
generic operator
ankitpatel96 Oct 15, 2024
e733ff2
rename operator to transformer
ankitpatel96 Oct 16, 2024
3e8731a
tweak comment
ankitpatel96 Nov 27, 2024
e0c4af9
tweak comment again
ankitpatel96 Nov 27, 2024
0f2a55c
go mod tidy
ankitpatel96 Dec 2, 2024
5b49e9f
format with new formatter
ankitpatel96 Dec 2, 2024
63a58c6
newline...
ankitpatel96 Dec 2, 2024
28c15b3
Merge branch 'main' into ankit-schema-processor-changelist-revision
mx-psi Dec 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions processor/schemaprocessor/DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Design

The Schema Processor is split into several different components.

Here's a general structure diagram:

```mermaid
graph LR;
A[Previous Collector Component] --> B[Transformer]
B -- Schema URL --> C[Translation Manager]
C -- Translation --> B
B --> H[Translator]
H --> E[Revision]
E --> I[ChangeList]
subgraph Interpreter
direction RL
I --> F[Transformer]
F --> G[Migrator]
end

```
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager.
The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.

The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data.

Each Revision represents all the changes within a specific version. It consists of several ChangeLists (at internal/changelist/changelist.go in a future PR) - one for each type of change block (at the time of writing - `all`, `resources`, `spans`, `spanEvents`, `metrics`, `logs`). Each ChangeList is similar to a program in an interpreter - in this case the programming language is the schema file! They iterate through whatever changes they are constructed with, and call a [Transformer](internal/transformer) for each type of change. The Transformer accepts a typed value - a log, a metric, etc. It then, under the hood, calls one of a few Migrators. The Migrators do the fundamental work of changing attributes, changing names, etc. The Migrators generally operate on lower levels than the Transformers - they operate on `Attributes`, or an `alias.NamedSignal` (a signal that implements `Name()` and `SetName()`).
4 changes: 3 additions & 1 deletion processor/schemaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
| Stability | [development]: traces, metrics, logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->
Expand Down Expand Up @@ -59,3 +59,5 @@ processors:
```

For more complete examples, please refer to [config.yml](./testdata/config.yml).

There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file.
1 change: 1 addition & 0 deletions processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/schem
go 1.22.0

require (
github.com/google/go-cmp v0.6.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.114.1-0.20241202231142-b9ff1bc54c99
go.opentelemetry.io/collector/component/componenttest v0.114.1-0.20241202231142-b9ff1bc54c99
Expand Down
6 changes: 5 additions & 1 deletion processor/schemaprocessor/internal/alias/alias.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package Alias is a subset of the interfaces defined by pdata and family
// Package alias is a subset of the interfaces defined by pdata and family
// package to allow for higher code reuse without using generics.
package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"

Expand Down Expand Up @@ -30,6 +30,10 @@ type NamedSignal interface {
SetName(name string)
}

type Attributed interface {
Attributes() pcommon.Map
}

var (
_ Resource = (*plog.ResourceLogs)(nil)
_ Resource = (*pmetric.ResourceMetrics)(nil)
Expand Down
84 changes: 84 additions & 0 deletions processor/schemaprocessor/internal/changelist/changelist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package changelist // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer"
)

// ChangeList represents a list of changes within a section of the schema processor. It can take in a list of different migrators for a specific section and will apply them in order, based on whether Apply or Rollback is called
type ChangeList struct {
Migrators []migrate.Migrator
}

func (c ChangeList) Do(ss migrate.StateSelector, signal any) error {
for i := 0; i < len(c.Migrators); i++ {
var migrator migrate.Migrator
// todo(ankit) in go1.23 switch to reversed iterators for this
if ss == migrate.StateSelectorApply {
migrator = c.Migrators[i]
} else {
migrator = c.Migrators[len(c.Migrators)-1-i]
}
// switch between transformer types - what do the transformers act on?
switch thisMigrator := migrator.(type) {
// this one acts on both spans and span events!
case transformer.Transformer[ptrace.Span]:
if span, ok := signal.(ptrace.Span); ok {
if err := thisMigrator.Do(ss, span); err != nil {
return err
}
} else {
return fmt.Errorf("span Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.Transformer[pmetric.Metric]:
if metric, ok := signal.(pmetric.Metric); ok {
if err := thisMigrator.Do(ss, metric); err != nil {
return err
}
} else {
return fmt.Errorf("metric Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.Transformer[plog.LogRecord]:
if log, ok := signal.(plog.LogRecord); ok {
if err := thisMigrator.Do(ss, log); err != nil {
return err
}
} else {
return fmt.Errorf("log Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.Transformer[pcommon.Resource]:
if resource, ok := signal.(pcommon.Resource); ok {
if err := thisMigrator.Do(ss, resource); err != nil {
return err
}
} else {
return fmt.Errorf("resource Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.AllAttributes:
if err := thisMigrator.Do(ss, signal); err != nil {
return err
}
default:
return fmt.Errorf("unsupported migrator type %T", thisMigrator)
}
}
return nil
}

func (c ChangeList) Apply(signal any) error {
return c.Do(migrate.StateSelectorApply, signal)
}

func (c ChangeList) Rollback(signal any) error {
return c.Do(migrate.StateSelectorRollback, signal)
}
58 changes: 10 additions & 48 deletions processor/schemaprocessor/internal/migrate/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,22 @@ import (
"go.uber.org/multierr"
)

// AttributeChangeSet represents an unscoped entry that can be applied.
//
// AttributeChangeSet represents a rename_attributes type operation.
// The listed changes are duplicated twice
// to allow for simplified means of transition to or from a revision.
type AttributeChangeSet struct {
updates ast.AttributeMap
// The keys are the old attribute name used in the previous version, the values are the
// new attribute name starting from this version (comment from ast.AttributeMap)
updates ast.AttributeMap
// the inverse of the updates map
rollback ast.AttributeMap
}

// AttributeChangeSetSlice allows for `AttributeChangeSet`
// to be chained together as they are defined within the schema
// and be applied sequentially to ensure deterministic behavior.
type AttributeChangeSetSlice []*AttributeChangeSet

// NewAttributeChangeSet allows for typed strings to be used as part
// of the invocation that will be converted into the default string type.
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
attr := &AttributeChangeSet{
func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet {
// for ambiguous rollbacks (if updates contains entries with multiple keys that have the same value), rollback contains the last key iterated over in mappings
attr := AttributeChangeSet{
updates: make(map[string]string, len(mappings)),
rollback: make(map[string]string, len(mappings)),
}
Expand All @@ -39,15 +37,9 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
return attr
}

func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
return a.do(StateSelectorApply, attrs)
}
func (a AttributeChangeSet) IsMigrator() {}

func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
return a.do(StateSelectorRollback, attrs)
}

func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) {
var (
updated = make(map[string]struct{})
results = pcommon.NewMap()
Expand Down Expand Up @@ -81,33 +73,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error
results.CopyTo(attrs)
return errs
}

// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
// and allows them to be executed in the provided order.
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
values := new(AttributeChangeSetSlice)
for _, c := range changes {
(*values) = append((*values), c)
}
return values
}

func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
return slice.do(StateSelectorApply, attrs)
}

func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
return slice.do(StateSelectorRollback, attrs)
}

func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
for i := 0; i < len(*slice); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
}
}
return errs
}
104 changes: 5 additions & 99 deletions processor/schemaprocessor/internal/migrate/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) {
"hello": "world",
})

expect := &AttributeChangeSet{
expect := AttributeChangeSet{
updates: map[string]string{
"hello": "world",
},
Expand All @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAttributeChangeSetApply(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

err := tc.acs.Apply(tc.attrs)
err := tc.acs.Do(StateSelectorApply, tc.attrs)
if tc.errVal == "" {
assert.NoError(t, err, "Must not return an error")
} else {
Expand All @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

err := tc.acs.Rollback(tc.attrs)
err := tc.acs.Do(StateSelectorRollback, tc.attrs)
if tc.errVal == "" {
assert.NoError(t, err, "Must not return an error")
} else {
Expand All @@ -189,97 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) {
})
}
}

func TestNewAttributeChangeSetSliceApply(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Apply(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}

func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Rollback(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}
Loading
Loading