Skip to content

Commit

Permalink
add support for log and all type operators in changelist and revision
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitpatel96 committed Oct 15, 2024
1 parent e8ed8cc commit 8c2f952
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 26 deletions.
32 changes: 18 additions & 14 deletions processor/schemaprocessor/internal/changelist/changelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ package changelist // import "github.com/open-telemetry/opentelemetry-collector-
import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator"
)
Expand Down Expand Up @@ -47,22 +48,25 @@ func (c ChangeList) Do(ss migrate.StateSelector, signal any) error {
} else {
return fmt.Errorf("MetricOperator %T can't act on %T", thisMigrator, signal)
}
case operator.AllOperator:
if err := thisMigrator.Do(ss, signal); err != nil {
return err
case operator.LogOperator:
if log, ok := signal.(plog.LogRecord); ok {
if err := thisMigrator.Do(ss, log); err != nil {
return err
}
} else {
return fmt.Errorf("LogOperator %T can't act on %T", thisMigrator, signal)
}

// no log operator because the only log operation is an attribute changeset
// this block is for the `resource` block, and the `log` block
// todo(ankit) switch these to specific typed ones?
case migrate.AttributeChangeSet:
switch attributeSignal := signal.(type) {
case alias.Attributed:
if err := thisMigrator.Do(ss, attributeSignal.Attributes()); err != nil {
case operator.ResourceOperator:
if resource, ok := signal.(pcommon.Resource); ok {
if err := thisMigrator.Do(ss, resource); err != nil {
return err
}
default:
return fmt.Errorf("unsupported signal type %T for AttributeChangeSet", attributeSignal)
} else {
return fmt.Errorf("ResourceOperator %T can't act on %T", thisMigrator, signal)
}
case operator.AllOperator:
if err := thisMigrator.Do(ss, signal); err != nil {
return err
}
default:
return fmt.Errorf("unsupported migrator type %T", thisMigrator)
Expand Down
6 changes: 4 additions & 2 deletions processor/schemaprocessor/internal/translation/revision_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func newResourceChangeList(resource ast.Attributes) *changelist.ChangeList {
for _, at := range resource.Changes {
if renamed := at.RenameAttributes; renamed != nil {
attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap)
values = append(values, attributeChangeSet)
resourceOperator := operator.ResourceAttributeOperator{AttributeChange: attributeChangeSet}
values = append(values, resourceOperator)
}
}
return &changelist.ChangeList{Migrators: values}
Expand Down Expand Up @@ -141,7 +142,8 @@ func newLogsChangelist(logs ast.Logs) *changelist.ChangeList {
for _, at := range logs.Changes {
if renamed := at.RenameAttributes; renamed != nil {
attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap)
values = append(values, attributeChangeSet)
logOperator := operator.LogAttributeOperator{AttributeChange: attributeChangeSet}
values = append(values, logOperator)
}
}
return &changelist.ChangeList{Migrators: values}
Expand Down
25 changes: 15 additions & 10 deletions processor/schemaprocessor/internal/translation/revision_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ func TestNewRevisionV1(t *testing.T) {
"state": "status",
}),
},
ResourceMigrator: migrate.NewAttributeChangeSet(map[string]string{
"state": "status",
}),
ResourceMigrator: operator.ResourceAttributeOperator{
AttributeChange: migrate.NewAttributeChangeSet(map[string]string{
"state": "status",
}),
},
},
operator.AllOperator{
// initialize one of each operator with the attribute set
Expand All @@ -197,16 +199,18 @@ func TestNewRevisionV1(t *testing.T) {
"status": "state",
}),
},
ResourceMigrator: migrate.NewAttributeChangeSet(map[string]string{
"status": "state",
}),
ResourceMigrator: operator.ResourceAttributeOperator{
AttributeChange: migrate.NewAttributeChangeSet(map[string]string{
"status": "state",
}),
},
},
},
},
resources: &changelist.ChangeList{Migrators: []migrate.Migrator{
migrate.NewAttributeChangeSet(map[string]string{
"service_name": "service.name",
}),
operator.ResourceAttributeOperator{AttributeChange: migrate.NewAttributeChangeSet(
map[string]string{"service_name": "service.name"},
)},
}},
spans: &changelist.ChangeList{Migrators: []migrate.Migrator{
operator.SpanConditionalAttributeOperator{Migrator: migrate.NewConditionalAttributeSet(
Expand Down Expand Up @@ -243,9 +247,10 @@ func TestNewRevisionV1(t *testing.T) {
)},
}},
logs: &changelist.ChangeList{Migrators: []migrate.Migrator{
migrate.NewAttributeChangeSet(map[string]string{
operator.LogAttributeOperator{AttributeChange: migrate.NewAttributeChangeSet(map[string]string{
"ERROR": "error",
}),
},
}},
},
},
Expand Down

0 comments on commit 8c2f952

Please sign in to comment.