Skip to content

Commit

Permalink
kafkaexporter: add support for Kafka OTLP JSON encoding (#6358)
Browse files Browse the repository at this point in the history
This change allows for export json encoded otlp data to kafka, allowing
spikes to be done which would allow for future thoughtworks on how that
data can be consumed. This is considered experimental and not fit for
production use.
  • Loading branch information
MovieStoreGuy authored and PaurushGarg committed Dec 14, 2021
1 parent b1ed5ca commit c370ead
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 4 deletions.
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The following settings can be optionally configured:
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: ** EXPERIMENTAL ** payload is Json serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- The following encodings are valid *only* for **traces**.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
Expand Down
9 changes: 9 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (f *kafkaExporterFactory) createTracesExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultTracesTopic
}
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newTracesExporter(*oCfg, set, f.tracesMarshalers)
if err != nil {
return nil, err
Expand All @@ -136,6 +139,9 @@ func (f *kafkaExporterFactory) createMetricsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultMetricsTopic
}
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newMetricsExporter(*oCfg, set, f.metricsMarshalers)
if err != nil {
return nil, err
Expand All @@ -162,6 +168,9 @@ func (f *kafkaExporterFactory) createLogsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultLogsTopic
}
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newLogsExporter(*oCfg, set, f.logsMarshalers)
if err != nil {
return nil, err
Expand Down
10 changes: 8 additions & 2 deletions exporter/kafkaexporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ type LogsMarshaler interface {
// tracesMarshalers returns map of supported encodings with TracesMarshaler.
func tracesMarshalers() map[string]TracesMarshaler {
otlpPb := newPdataTracesMarshaler(otlp.NewProtobufTracesMarshaler(), defaultEncoding)
otlpJSON := newPdataTracesMarshaler(otlp.NewJSONTracesMarshaler(), "otlp_json")
jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
return map[string]TracesMarshaler{
otlpPb.Encoding(): otlpPb,
otlpJSON.Encoding(): otlpJSON,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
}
Expand All @@ -62,15 +64,19 @@ func tracesMarshalers() map[string]TracesMarshaler {
// metricsMarshalers returns map of supported encodings and MetricsMarshaler
func metricsMarshalers() map[string]MetricsMarshaler {
otlpPb := newPdataMetricsMarshaler(otlp.NewProtobufMetricsMarshaler(), defaultEncoding)
otlpJSON := newPdataMetricsMarshaler(otlp.NewJSONMetricsMarshaler(), "otlp_json")
return map[string]MetricsMarshaler{
otlpPb.Encoding(): otlpPb,
otlpPb.Encoding(): otlpPb,
otlpJSON.Encoding(): otlpJSON,
}
}

// logsMarshalers returns map of supported encodings and LogsMarshaler
func logsMarshalers() map[string]LogsMarshaler {
otlpPb := newPdataLogsMarshaler(otlp.NewProtobufLogsMarshaler(), defaultEncoding)
otlpJSON := newPdataLogsMarshaler(otlp.NewJSONLogsMarshaler(), "otlp_json")
return map[string]LogsMarshaler{
otlpPb.Encoding(): otlpPb,
otlpPb.Encoding(): otlpPb,
otlpJSON.Encoding(): otlpJSON,
}
}
82 changes: 80 additions & 2 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright The OpenTelemetry Authors
//
// Copyright The OpenTelemetry Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -15,15 +14,21 @@
package kafkaexporter

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/model/pdata"
semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0"
)

func TestDefaultTracesMarshalers(t *testing.T) {
expectedEncodings := []string{
"otlp_proto",
"otlp_json",
"jaeger_proto",
"jaeger_json",
}
Expand All @@ -41,6 +46,7 @@ func TestDefaultTracesMarshalers(t *testing.T) {
func TestDefaultMetricsMarshalers(t *testing.T) {
expectedEncodings := []string{
"otlp_proto",
"otlp_json",
}
marshalers := metricsMarshalers()
assert.Equal(t, len(expectedEncodings), len(marshalers))
Expand All @@ -56,6 +62,7 @@ func TestDefaultMetricsMarshalers(t *testing.T) {
func TestDefaultLogsMarshalers(t *testing.T) {
expectedEncodings := []string{
"otlp_proto",
"otlp_json",
}
marshalers := logsMarshalers()
assert.Equal(t, len(expectedEncodings), len(marshalers))
Expand All @@ -67,3 +74,74 @@ func TestDefaultLogsMarshalers(t *testing.T) {
})
}
}

func TestOTLPTracesJsonMarshaling(t *testing.T) {
t.Parallel()

now := time.Unix(1, 0)

traces := pdata.NewTraces()
traces.ResourceSpans().AppendEmpty()

rs := traces.ResourceSpans().At(0)
rs.SetSchemaUrl(semconv.SchemaURL)
rs.InstrumentationLibrarySpans().AppendEmpty()

ils := rs.InstrumentationLibrarySpans().At(0)
ils.SetSchemaUrl(semconv.SchemaURL)
ils.Spans().AppendEmpty()

span := ils.Spans().At(0)
span.SetKind(pdata.SpanKindInternal)
span.SetName(t.Name())
span.SetStartTimestamp(pdata.NewTimestampFromTime(now))
span.SetEndTimestamp(pdata.NewTimestampFromTime(now.Add(time.Second)))
span.SetSpanID(pdata.NewSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}))
span.SetParentSpanID(pdata.NewSpanID([8]byte{8, 9, 10, 11, 12, 13, 14}))

marshaler, ok := tracesMarshalers()["otlp_json"]
require.True(t, ok, "Must have otlp json marshaller")

msg, err := marshaler.Marshal(traces, t.Name())
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msg, 1, "Must have one entry in the message")

data, err := msg[0].Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")

// Since marshaling json is not guaranteed to be in order
// within a string, using a map to compare that the expected values are there
expectedJSON := map[string]interface{}{
"resourceSpans": []interface{}{
map[string]interface{}{
"resource": map[string]interface{}{},
"instrumentationLibrarySpans": []interface{}{
map[string]interface{}{
"instrumentationLibrary": map[string]interface{}{},
"spans": []interface{}{
map[string]interface{}{
"traceId": "",
"spanId": "0001020304050607",
"parentSpanId": "08090a0b0c0d0e00",
"name": t.Name(),
"kind": pdata.SpanKindInternal.String(),
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
"status": map[string]interface{}{},
},
},
"schemaUrl": semconv.SchemaURL,
},
},
"schemaUrl": semconv.SchemaURL,
},
},
}

var final map[string]interface{}
err = json.Unmarshal(data, &final)
require.NoError(t, err, "Must not error marshaling expected data")

assert.Equal(t, expectedJSON, final, "Must match the expected value")
}

0 comments on commit c370ead

Please sign in to comment.