Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Processor Revamp implementation #35213

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
b134a0f
[Schema Processor] Updating internal package from schema to translation
MovieStoreGuy Jul 7, 2022
f502fb6
[schema processor] Additional healper packages
MovieStoreGuy Jul 7, 2022
596590d
[schema processor] Modifier
MovieStoreGuy Jul 7, 2022
cbc0e3d
[schema processor] Adding revisions
MovieStoreGuy Jul 7, 2022
14600b5
Adding changelog
MovieStoreGuy Jul 17, 2022
f32de8b
adding goporto statements
MovieStoreGuy Jul 18, 2022
e4c086d
[schema processor] Translation and Revisions
MovieStoreGuy Jul 7, 2022
6a961da
Fixing mod files
MovieStoreGuy Jul 18, 2022
059e480
Adding schemaprocessor component
MovieStoreGuy Jul 19, 2022
3586545
Fixing sum
MovieStoreGuy Jul 19, 2022
cb55be9
Fixing lint issues
MovieStoreGuy Jul 19, 2022
13e04e1
remove cruft from rebase
ankitpatel96 Aug 2, 2024
4abeca5
go sum fixup and remove build directives
ankitpatel96 Aug 2, 2024
bd75237
fix attributemap change
ankitpatel96 Aug 2, 2024
5cdad65
include deleted schema file
ankitpatel96 Aug 5, 2024
a2c1f87
rename
ankitpatel96 Aug 5, 2024
ce7fe7c
rename alias.Signal to alias.NamedSignal
ankitpatel96 Aug 5, 2024
1fca27f
change insert to put
ankitpatel96 Aug 5, 2024
2af13df
add back version aliases
ankitpatel96 Aug 5, 2024
4dbda27
convert metric datatype -> type
ankitpatel96 Aug 5, 2024
34f01d1
update old pdata api
ankitpatel96 Aug 5, 2024
1fbceb8
fix old pdata
ankitpatel96 Aug 5, 2024
5fbdb1e
fix indentation on schema file
ankitpatel96 Aug 5, 2024
adc8219
correctly replace insert
ankitpatel96 Aug 6, 2024
ae77f14
fix schema rename_attributes again
ankitpatel96 Aug 6, 2024
2157a21
seems like this should work
ankitpatel96 Aug 6, 2024
5bb5913
final tests. this is part 4 rebased on top of the unmerged pt 3
ankitpatel96 Aug 6, 2024
f98ac1e
adjustments for revv1
ankitpatel96 Aug 7, 2024
70cad59
propogate error
ankitpatel96 Aug 7, 2024
8d421ec
converted to part 3 merged migrators - metrics work but other stuff s…
ankitpatel96 Aug 8, 2024
1b02be3
rename revision fields
ankitpatel96 Aug 8, 2024
0e6726e
logs in revision
ankitpatel96 Aug 8, 2024
0aae900
adding spanevents and logs transformers (Spanevents is still wrong
ankitpatel96 Aug 8, 2024
53e5fd1
preliminatry lambda conditional
ankitpatel96 Aug 12, 2024
1221ea9
tests in progress
ankitpatel96 Aug 13, 2024
294218c
rollback tests
ankitpatel96 Aug 13, 2024
2d61d74
finish testing
ankitpatel96 Aug 13, 2024
3a6ad23
logs fixed
ankitpatel96 Aug 13, 2024
dd8e393
capitalize
ankitpatel96 Aug 13, 2024
4920229
we just need the attributes
ankitpatel96 Aug 13, 2024
344f866
new modifier
ankitpatel96 Aug 13, 2024
199cc8c
put in span events OR
ankitpatel96 Aug 19, 2024
58d2dc6
remove dead span event code
ankitpatel96 Aug 19, 2024
5b8e4a6
refactoring the translator constructor
ankitpatel96 Aug 20, 2024
8518666
all changes tests
ankitpatel96 Aug 20, 2024
2a045bf
add new section all test - doesn't pass yet
ankitpatel96 Aug 21, 2024
4efad06
refactor so that schemaurl is passed into translator
ankitpatel96 Aug 23, 2024
caf14cd
fix bug in iterator - need to start iterating on the next version, si…
ankitpatel96 Aug 26, 2024
faddd6d
log schema precedence test
ankitpatel96 Aug 26, 2024
e0bd77a
metrics and traces schema precedence
ankitpatel96 Aug 26, 2024
75411a5
switch to downgrade tests
ankitpatel96 Aug 26, 2024
f7636fb
section all tests
ankitpatel96 Aug 26, 2024
8ab2cdb
use helpers
ankitpatel96 Aug 26, 2024
e02e488
rename
ankitpatel96 Aug 26, 2024
0326707
more complex all test
ankitpatel96 Aug 26, 2024
346ff57
fix schema urls
ankitpatel96 Aug 27, 2024
08891d8
organization
ankitpatel96 Aug 27, 2024
080718f
renaming test files
ankitpatel96 Aug 28, 2024
8c2d457
table driven
ankitpatel96 Aug 29, 2024
ffcb3f0
spans
ankitpatel96 Aug 29, 2024
14c2acb
span rename
ankitpatel96 Aug 30, 2024
8e13f2e
this code is very strange and acts differently under a debugger
ankitpatel96 Aug 30, 2024
6fc8318
fix spans and testing code
ankitpatel96 Aug 30, 2024
82e466d
backfill complicated span tests to TestTranslationSpanChanges
ankitpatel96 Aug 30, 2024
8225176
span events rename working
ankitpatel96 Aug 30, 2024
5fd468e
metrics rename
ankitpatel96 Sep 1, 2024
ea54f40
test cleanup
ankitpatel96 Sep 1, 2024
a31815c
move around test files and consolidate directories
ankitpatel96 Sep 1, 2024
81c0736
logs test
ankitpatel96 Sep 1, 2024
dbab9ce
logs
ankitpatel96 Sep 1, 2024
90bdd59
use pdatatest
ankitpatel96 Sep 1, 2024
010e190
span events rename attributes is busted
ankitpatel96 Sep 1, 2024
0a361ad
delete old modifiers
ankitpatel96 Sep 1, 2024
00c91ac
delete some todos
ankitpatel96 Sep 2, 2024
4988e1b
attempt on new interface
ankitpatel96 Sep 6, 2024
78047ae
more changelist migrations:
ankitpatel96 Sep 9, 2024
b37332a
migrate to Do
ankitpatel96 Sep 9, 2024
2dd02a0
switch to Do interface
ankitpatel96 Sep 9, 2024
781451d
tests for multi conditional, and fixes for matches
ankitpatel96 Sep 10, 2024
e3fab37
fixup revision tests, add back obsolete transformers for now
ankitpatel96 Sep 10, 2024
3932f88
excise logs attributes
ankitpatel96 Sep 10, 2024
42914dc
get rid of spanevents attr
ankitpatel96 Sep 10, 2024
caa6286
put in span events
ankitpatel96 Sep 10, 2024
f1d8596
move to resource and all
ankitpatel96 Sep 10, 2024
6fac10a
unused struct member
ankitpatel96 Sep 10, 2024
234aed4
migrate span
ankitpatel96 Sep 11, 2024
1a1184e
provisional metric support
ankitpatel96 Sep 11, 2024
df07cc6
fix test
ankitpatel96 Sep 11, 2024
f7ff89d
signal name change
ankitpatel96 Sep 11, 2024
b7ddcbf
metric rename
ankitpatel96 Sep 11, 2024
d673dac
cleanup
ankitpatel96 Sep 11, 2024
cf05115
unused migrators
ankitpatel96 Sep 11, 2024
9c5a391
rid the world of the evil that is pointers
ankitpatel96 Sep 11, 2024
495bb12
span event conditional
ankitpatel96 Sep 12, 2024
fd34339
next iteration of changelist
ankitpatel96 Sep 12, 2024
e352da7
renames and comments
ankitpatel96 Sep 12, 2024
b9be988
licenses
ankitpatel96 Sep 12, 2024
3518aa3
formatting
ankitpatel96 Sep 12, 2024
42a05ec
cleanup
ankitpatel96 Sep 12, 2024
45fb5c0
even more lints
ankitpatel96 Sep 16, 2024
585ab04
goporto
ankitpatel96 Sep 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor

go 1.21.0
go 1.22.0

require (
github.com/google/go-cmp v0.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.108.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.106.1
go.opentelemetry.io/collector/config/confighttp v0.106.1
go.opentelemetry.io/collector/confmap v0.106.1
go.opentelemetry.io/collector/consumer v0.106.1
go.opentelemetry.io/collector/consumer/consumertest v0.106.1
go.opentelemetry.io/collector/pdata v1.12.0
go.opentelemetry.io/collector/pdata v1.14.1
go.opentelemetry.io/collector/processor v0.106.1
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/schema v0.0.8
Expand All @@ -20,6 +22,7 @@ require (
)

require (
github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -42,6 +45,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.108.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
12 changes: 10 additions & 2 deletions processor/schemaprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion processor/schemaprocessor/internal/alias/alias.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package Alias is a subset of the interfaces defined by pdata and family
// Package alias is a subset of the interfaces defined by pdata and family
// package to allow for higher code reuse without using generics.
package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"

Expand Down Expand Up @@ -30,6 +30,10 @@ type NamedSignal interface {
SetName(name string)
}

type Attributed interface {
Attributes() pcommon.Map
}

var (
_ Resource = (*plog.ResourceLogs)(nil)
_ Resource = (*pmetric.ResourceMetrics)(nil)
Expand Down
75 changes: 75 additions & 0 deletions processor/schemaprocessor/internal/changelist/changelist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package changelist // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator"
)

// ChangeList represents a list of changes within a section of the schema processor. It can take in a list of different migrators for a specific section and will apply them in order, based on whether Apply or Rollback is called
type ChangeList struct {
Migrators []migrate.Migrator
}

func (c ChangeList) Do(ss migrate.StateSelector, signal any) error {
for i := 0; i < len(c.Migrators); i++ {
var migrator migrate.Migrator
// todo(ankit) in go1.23 switch to reversed iterators for this
if ss == migrate.StateSelectorApply {
migrator = c.Migrators[i]
} else {
migrator = c.Migrators[len(c.Migrators)-1-i]
}
// switch between operator types - what do the operators act on?
switch thisMigrator := migrator.(type) {
// this one acts on both spans and span events!
case operator.SpanOperator:
if span, ok := signal.(ptrace.Span); ok {
if err := thisMigrator.Do(ss, span); err != nil {
return err
}
} else {
return fmt.Errorf("SpanOperator %T can't act on %T", thisMigrator, signal)
}
case operator.MetricOperator:
if metric, ok := signal.(pmetric.Metric); ok {
if err := thisMigrator.Do(ss, metric); err != nil {
return err
}
} else {
return fmt.Errorf("MetricOperator %T can't act on %T", thisMigrator, signal)
}
// no log operator because the only log operation is an attribute changeset
// this block is for the `all` block, the `resource` block, and the `log` block
// todo(ankit) switch these to specific typed ones?
case migrate.AttributeChangeSet:
switch attributeSignal := signal.(type) {
case alias.Attributed:
if err := thisMigrator.Do(ss, attributeSignal.Attributes()); err != nil {
return err
}
default:
return fmt.Errorf("unsupported signal type %T for AttributeChangeSet", attributeSignal)
}
default:
return fmt.Errorf("unsupported migrator type %T", thisMigrator)
}
}
return nil
}

func (c ChangeList) Apply(signal any) error {
return c.Do(migrate.StateSelectorApply, signal)
}

func (c ChangeList) Rollback(signal any) error {
return c.Do(migrate.StateSelectorRollback, signal)
}
55 changes: 12 additions & 43 deletions processor/schemaprocessor/internal/migrate/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@ import (
"go.uber.org/multierr"
)

// AttributeChangeSet represents an unscoped entry that can be applied.
//
// AttributeChangeSet represents a rename_attributes type operation.
// The listed changes are duplicated twice
// to allow for simplified means of transition to or from a revision.
type AttributeChangeSet struct {
updates ast.AttributeMap
// The keys are the old attribute name used in the previous version, the values are the
// new attribute name starting from this version (comment from ast.AttributeMap)
updates ast.AttributeMap
// the inverse of the updates map
rollback ast.AttributeMap
}

// AttributeChangeSetSlice allows for `AttributeChangeSet`
// to be chained together as they are defined within the schema
// and be applied sequentially to ensure deterministic behavior.
type AttributeChangeSetSlice []*AttributeChangeSet

// NewAttributeChangeSet allows for typed strings to be used as part
// of the invocation that will be converted into the default string type.
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
attr := &AttributeChangeSet{
func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet {
attr := AttributeChangeSet{
updates: make(map[string]string, len(mappings)),
rollback: make(map[string]string, len(mappings)),
}
Expand All @@ -39,15 +36,17 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
return attr
}

func (a AttributeChangeSet) IsMigrator() {}

func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
return a.do(StateSelectorApply, attrs)
return a.Do(StateSelectorApply, attrs)
}

func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
return a.do(StateSelectorRollback, attrs)
return a.Do(StateSelectorRollback, attrs)
}

func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) {
var (
updated = make(map[string]struct{})
results = pcommon.NewMap()
Expand Down Expand Up @@ -81,33 +80,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error
results.CopyTo(attrs)
return errs
}

// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
// and allows them to be executed in the provided order.
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
values := new(AttributeChangeSetSlice)
for _, c := range changes {
(*values) = append((*values), c)
}
return values
}

func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
return slice.do(StateSelectorApply, attrs)
}

func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
return slice.do(StateSelectorRollback, attrs)
}

func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
for i := 0; i < len(*slice); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
}
}
return errs
}
101 changes: 3 additions & 98 deletions processor/schemaprocessor/internal/migrate/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) {
"hello": "world",
})

expect := &AttributeChangeSet{
expect := AttributeChangeSet{
updates: map[string]string{
"hello": "world",
},
Expand All @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -189,98 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) {
})
}
}

func TestNewAttributeChangeSetSliceApply(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Apply(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}

func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")

}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Rollback(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}
Loading