Skip to content

Commit

Permalink
[chore] Schema Processor Revamp [Part 2] - ChangeList and Revision (#…
Browse files Browse the repository at this point in the history
…35267)

**Description:** <Describe what has changed.>
This is a slice of changes from
#35248

This PR details how operators are used to build the execution pipeline
for a given schemafile.



Changed files from the [previous
PR](#35214)
are:

processor/schemaprocessor/internal/changelist/changelist.go
processor/schemaprocessor/internal/translation/revision_v1.go
processor/schemaprocessor/internal/translation/revision_v1_test.go
processor/schemaprocessor/go.mod

I'm asking a maintainer if they would be willing to push a copy of the
previous PR's branch to the core repo so I can switch the base of this
PR to the previous PR - thus only the stacked changes would be shown.

Edit: this is apparently not easily supported - so asking reviewers to
just focus on the changed files listed above. Sorry about that!

**Testing:** <Describe what testing was performed and which tests were
added.>
Unit tests

---------

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
ankitpatel96 and mx-psi authored Dec 3, 2024
1 parent 6119d51 commit 9ae6a48
Show file tree
Hide file tree
Showing 26 changed files with 1,299 additions and 487 deletions.
28 changes: 28 additions & 0 deletions processor/schemaprocessor/DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Design

The Schema Processor is split into several different components.

Here's a general structure diagram:

```mermaid
graph LR;
A[Previous Collector Component] --> B[Transformer]
B -- Schema URL --> C[Translation Manager]
C -- Translation --> B
B --> H[Translator]
H --> E[Revision]
E --> I[ChangeList]
subgraph Interpreter
direction RL
I --> F[Transformer]
F --> G[Migrator]
end
```
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager.
The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.

The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data.

Each Revision represents all the changes within a specific version. It consists of several ChangeLists (at internal/changelist/changelist.go in a future PR) - one for each type of change block (at the time of writing - `all`, `resources`, `spans`, `spanEvents`, `metrics`, `logs`). Each ChangeList is similar to a program in an interpreter - in this case the programming language is the schema file! They iterate through whatever changes they are constructed with, and call a [Transformer](internal/transformer) for each type of change. The Transformer accepts a typed value - a log, a metric, etc. It then, under the hood, calls one of a few Migrators. The Migrators do the fundamental work of changing attributes, changing names, etc. The Migrators generally operate on lower levels than the Transformers - they operate on `Attributes`, or an `alias.NamedSignal` (a signal that implements `Name()` and `SetName()`).
4 changes: 3 additions & 1 deletion processor/schemaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
| Stability | [development]: traces, metrics, logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->
Expand Down Expand Up @@ -59,3 +59,5 @@ processors:
```
For more complete examples, please refer to [config.yml](./testdata/config.yml).
There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file.
1 change: 1 addition & 0 deletions processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/schem
go 1.22.0

require (
github.com/google/go-cmp v0.6.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.114.1-0.20241202231142-b9ff1bc54c99
go.opentelemetry.io/collector/component/componenttest v0.114.1-0.20241202231142-b9ff1bc54c99
Expand Down
6 changes: 5 additions & 1 deletion processor/schemaprocessor/internal/alias/alias.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package Alias is a subset of the interfaces defined by pdata and family
// Package alias is a subset of the interfaces defined by pdata and family
// package to allow for higher code reuse without using generics.
package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"

Expand Down Expand Up @@ -30,6 +30,10 @@ type NamedSignal interface {
SetName(name string)
}

type Attributed interface {
Attributes() pcommon.Map
}

var (
_ Resource = (*plog.ResourceLogs)(nil)
_ Resource = (*pmetric.ResourceMetrics)(nil)
Expand Down
84 changes: 84 additions & 0 deletions processor/schemaprocessor/internal/changelist/changelist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package changelist // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer"
)

// ChangeList represents a list of changes within a section of the schema processor. It can take in a list of different migrators for a specific section and will apply them in order, based on whether Apply or Rollback is called
type ChangeList struct {
Migrators []migrate.Migrator
}

func (c ChangeList) Do(ss migrate.StateSelector, signal any) error {
for i := 0; i < len(c.Migrators); i++ {
var migrator migrate.Migrator
// todo(ankit) in go1.23 switch to reversed iterators for this
if ss == migrate.StateSelectorApply {
migrator = c.Migrators[i]
} else {
migrator = c.Migrators[len(c.Migrators)-1-i]
}
// switch between transformer types - what do the transformers act on?
switch thisMigrator := migrator.(type) {
// this one acts on both spans and span events!
case transformer.Transformer[ptrace.Span]:
if span, ok := signal.(ptrace.Span); ok {
if err := thisMigrator.Do(ss, span); err != nil {
return err
}
} else {
return fmt.Errorf("span Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.Transformer[pmetric.Metric]:
if metric, ok := signal.(pmetric.Metric); ok {
if err := thisMigrator.Do(ss, metric); err != nil {
return err
}
} else {
return fmt.Errorf("metric Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.Transformer[plog.LogRecord]:
if log, ok := signal.(plog.LogRecord); ok {
if err := thisMigrator.Do(ss, log); err != nil {
return err
}
} else {
return fmt.Errorf("log Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.Transformer[pcommon.Resource]:
if resource, ok := signal.(pcommon.Resource); ok {
if err := thisMigrator.Do(ss, resource); err != nil {
return err
}
} else {
return fmt.Errorf("resource Transformer %T can't act on %T", thisMigrator, signal)
}
case transformer.AllAttributes:
if err := thisMigrator.Do(ss, signal); err != nil {
return err
}
default:
return fmt.Errorf("unsupported migrator type %T", thisMigrator)
}
}
return nil
}

func (c ChangeList) Apply(signal any) error {
return c.Do(migrate.StateSelectorApply, signal)
}

func (c ChangeList) Rollback(signal any) error {
return c.Do(migrate.StateSelectorRollback, signal)
}
58 changes: 10 additions & 48 deletions processor/schemaprocessor/internal/migrate/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,22 @@ import (
"go.uber.org/multierr"
)

// AttributeChangeSet represents an unscoped entry that can be applied.
//
// AttributeChangeSet represents a rename_attributes type operation.
// The listed changes are duplicated twice
// to allow for simplified means of transition to or from a revision.
type AttributeChangeSet struct {
updates ast.AttributeMap
// The keys are the old attribute name used in the previous version, the values are the
// new attribute name starting from this version (comment from ast.AttributeMap)
updates ast.AttributeMap
// the inverse of the updates map
rollback ast.AttributeMap
}

// AttributeChangeSetSlice allows for `AttributeChangeSet`
// to be chained together as they are defined within the schema
// and be applied sequentially to ensure deterministic behavior.
type AttributeChangeSetSlice []*AttributeChangeSet

// NewAttributeChangeSet allows for typed strings to be used as part
// of the invocation that will be converted into the default string type.
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
attr := &AttributeChangeSet{
func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet {
// for ambiguous rollbacks (if updates contains entries with multiple keys that have the same value), rollback contains the last key iterated over in mappings
attr := AttributeChangeSet{
updates: make(map[string]string, len(mappings)),
rollback: make(map[string]string, len(mappings)),
}
Expand All @@ -39,15 +37,9 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
return attr
}

func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
return a.do(StateSelectorApply, attrs)
}
func (a AttributeChangeSet) IsMigrator() {}

func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
return a.do(StateSelectorRollback, attrs)
}

func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) {
var (
updated = make(map[string]struct{})
results = pcommon.NewMap()
Expand Down Expand Up @@ -81,33 +73,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error
results.CopyTo(attrs)
return errs
}

// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
// and allows them to be executed in the provided order.
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
values := new(AttributeChangeSetSlice)
for _, c := range changes {
(*values) = append((*values), c)
}
return values
}

func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
return slice.do(StateSelectorApply, attrs)
}

func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
return slice.do(StateSelectorRollback, attrs)
}

func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
for i := 0; i < len(*slice); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
}
}
return errs
}
104 changes: 5 additions & 99 deletions processor/schemaprocessor/internal/migrate/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) {
"hello": "world",
})

expect := &AttributeChangeSet{
expect := AttributeChangeSet{
updates: map[string]string{
"hello": "world",
},
Expand All @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAttributeChangeSetApply(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

err := tc.acs.Apply(tc.attrs)
err := tc.acs.Do(StateSelectorApply, tc.attrs)
if tc.errVal == "" {
assert.NoError(t, err, "Must not return an error")
} else {
Expand All @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

err := tc.acs.Rollback(tc.attrs)
err := tc.acs.Do(StateSelectorRollback, tc.attrs)
if tc.errVal == "" {
assert.NoError(t, err, "Must not return an error")
} else {
Expand All @@ -189,97 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) {
})
}
}

func TestNewAttributeChangeSetSliceApply(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Apply(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}

func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Rollback(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}
Loading

0 comments on commit 9ae6a48

Please sign in to comment.