Skip to content

Commit

Permalink
[extension/encoding] Add avrologencodingextension (open-telemetry#31923)
Browse files Browse the repository at this point in the history
**Description:** Add new component `avrologencodingextension` to be able
to transform AVRO messages into log record body.

As requested in open-telemetry#31077, this is a parallel request to support the same
functionality as reusable encoding extension.

**Link to tracking Issue:** open-telemetry#21067

**Testing:** Unit-testing as well as testing code within the
`kafakreceiver` receiver.

**Documentation:** Added README within the component.
  • Loading branch information
thmshmm authored Mar 29, 2024
1 parent 19da245 commit b865505
Show file tree
Hide file tree
Showing 23 changed files with 756 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-avrologencodingextension.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: avrologencodingextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new encoding extension to support mapping of AVRO messages to logs.

# One or more tracking issues related to the change
issues: [21067]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ body:
- extension/basicauth
- extension/bearertokenauth
- extension/encoding
- extension/encoding/avrologencoding
- extension/encoding/jaegerencoding
- extension/encoding/jsonlogencoding
- extension/encoding/otlpencoding
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ body:
- extension/basicauth
- extension/bearertokenauth
- extension/encoding
- extension/encoding/avrologencoding
- extension/encoding/jaegerencoding
- extension/encoding/jsonlogencoding
- extension/encoding/otlpencoding
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ body:
- extension/basicauth
- extension/bearertokenauth
- extension/encoding
- extension/encoding/avrologencoding
- extension/encoding/jaegerencoding
- extension/encoding/jsonlogencoding
- extension/encoding/otlpencoding
Expand Down
1 change: 1 addition & 0 deletions extension/encoding/avrologencodingextension/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
14 changes: 14 additions & 0 deletions extension/encoding/avrologencodingextension/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# AVRO Log encoding extension

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development] |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Favrologencoding%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Favrologencoding) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Favrologencoding%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Favrologencoding) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@thmshmm](https://www.github.com/thmshmm) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->

The `avrolog` encoding extension is used to unmarshal AVRO and insert it into the body of a log record. Marshalling is not supported.
38 changes: 38 additions & 0 deletions extension/encoding/avrologencodingextension/avro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"

import (
"fmt"

"github.com/linkedin/goavro/v2"
)

type avroDeserializer interface {
Deserialize([]byte) (map[string]any, error)
}

type avroStaticSchemaDeserializer struct {
codec *goavro.Codec
}

func newAVROStaticSchemaDeserializer(schema string) (avroDeserializer, error) {
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, fmt.Errorf("failed to create avro codec: %w", err)
}

return &avroStaticSchemaDeserializer{
codec: codec,
}, nil
}

func (d *avroStaticSchemaDeserializer) Deserialize(data []byte) (map[string]any, error) {
native, _, err := d.codec.NativeFromBinary(data)
if err != nil {
return nil, fmt.Errorf("failed to deserialize avro record: %w", err)
}

return native.(map[string]any), nil
}
42 changes: 42 additions & 0 deletions extension/encoding/avrologencodingextension/avro_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewAvroLogsUnmarshaler(t *testing.T) {
schema, data := createAVROTestData(t)

deserializer, err := newAVROStaticSchemaDeserializer(schema)
if err != nil {
t.Errorf("Did not expect an error, got %q", err.Error())
}

logMap, err := deserializer.Deserialize(data)
if err != nil {
t.Fatalf("Did not expect an error, got %q", err.Error())
}

assert.Equal(t, int64(1697187201488000000), logMap["timestamp"].(time.Time).UnixNano())
assert.Equal(t, "host1", logMap["hostname"])
assert.Equal(t, int64(12), logMap["nestedRecord"].(map[string]any)["field1"])

props := logMap["properties"].([]any)
propsStr := make([]string, len(props))
for i, prop := range props {
propsStr[i] = prop.(string)
}

assert.Equal(t, []string{"prop1", "prop2"}, propsStr)
}

func TestNewAvroLogsUnmarshalerInvalidSchema(t *testing.T) {
_, err := newAVROStaticSchemaDeserializer("invalid schema")
assert.Error(t, err)
}
20 changes: 20 additions & 0 deletions extension/encoding/avrologencodingextension/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"

import "errors"

var errNoSchema = errors.New("no schema provided")

type Config struct {
Schema string `mapstructure:"schema"`
}

func (c *Config) Validate() error {
if c.Schema == "" {
return errNoSchema
}

return nil
}
20 changes: 20 additions & 0 deletions extension/encoding/avrologencodingextension/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfigValidate(t *testing.T) {
cfg := &Config{}
err := cfg.Validate()
assert.ErrorIs(t, err, errNoSchema)

cfg.Schema = "schema1"
err = cfg.Validate()
assert.NoError(t, err)
}
5 changes: 5 additions & 0 deletions extension/encoding/avrologencodingextension/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml
package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"
89 changes: 89 additions & 0 deletions extension/encoding/avrologencodingextension/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding"
)

var (
_ encoding.LogsUnmarshalerExtension = (*avroLogExtension)(nil)
)

type avroLogExtension struct {
deserializer avroDeserializer
}

func newExtension(config *Config) (*avroLogExtension, error) {
deserializer, err := newAVROStaticSchemaDeserializer(config.Schema)
if err != nil {
return nil, err
}

return &avroLogExtension{deserializer: deserializer}, nil
}

func (e *avroLogExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
p := plog.NewLogs()

avroLog, err := e.deserializer.Deserialize(buf)
if err != nil {
return p, fmt.Errorf("failed to deserialize avro log: %w", err)
}

logRecords := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
logRecords.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))

// removes time.Time values as FromRaw does not support it
replaceLogicalTypes(avroLog)

// Set the unmarshaled avro as the body of the log record
if err := logRecords.Body().SetEmptyMap().FromRaw(avroLog); err != nil {
return p, err
}

return p, nil
}

func replaceLogicalTypes(m map[string]any) {
for k, v := range m {
m[k] = transformValue(v)
}
}

func transformValue(value any) any {
if timeValue, ok := value.(time.Time); ok {
return timeValue.UnixNano()
}

if mapValue, ok := value.(map[string]any); ok {
replaceLogicalTypes(mapValue)
return mapValue
}

if arrayValue, ok := value.([]any); ok {
for i, v := range arrayValue {
arrayValue[i] = transformValue(v)
}
return arrayValue
}

return value
}

func (e *avroLogExtension) Start(_ context.Context, _ component.Host) error {
return nil
}

func (e *avroLogExtension) Shutdown(_ context.Context) error {
return nil
}
53 changes: 53 additions & 0 deletions extension/encoding/avrologencodingextension/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
)

func TestExtension_Start_Shutdown(t *testing.T) {
avroExtention := &avroLogExtension{}

err := avroExtention.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

err = avroExtention.Shutdown(context.Background())
require.NoError(t, err)
}

func TestUnmarshal(t *testing.T) {
t.Parallel()

schema, data := createAVROTestData(t)

e, err := newExtension(&Config{Schema: schema})
assert.NoError(t, err)

logs, err := e.UnmarshalLogs(data)
logRecord := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)

assert.NoError(t, err)
assert.Equal(t, "{\"count\":5,\"hostname\":\"host1\",\"level\":\"warn\",\"levelEnum\":\"INFO\",\"mapField\":{},\"message\":\"log message\",\"nestedRecord\":{\"field1\":12,\"field2\":\"val2\"},\"properties\":[\"prop1\",\"prop2\"],\"severity\":1,\"timestamp\":1697187201488000000}", logRecord.Body().AsString())
}

func TestInvalidUnmarshal(t *testing.T) {
t.Parallel()

schema, err := loadAVROSchemaFromFile("testdata/schema1.avro")
if err != nil {
t.Fatalf("Failed to read avro schema file: %q", err.Error())
}

e, err := newExtension(&Config{Schema: string(schema)})
assert.NoError(t, err)

_, err = e.UnmarshalLogs([]byte("NOT A AVRO"))
assert.Error(t, err)
}
30 changes: 30 additions & 0 deletions extension/encoding/avrologencodingextension/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension/internal/metadata"
)

func NewFactory() extension.Factory {
return extension.NewFactory(
metadata.Type,
createDefaultConfig,
createExtension,
metadata.ExtensionStability,
)
}

func createExtension(_ context.Context, _ extension.CreateSettings, config component.Config) (extension.Extension, error) {
return newExtension(config.(*Config))
}

func createDefaultConfig() component.Config {
return &Config{Schema: ""}
}
Loading

0 comments on commit b865505

Please sign in to comment.